Hitachi Vantara Pentaho Community Forums
Page 1 of 2 12 LastLast
Results 1 to 10 of 12

Thread: Bulk Load with named pipes example

  1. #1
    Join Date
    Jan 2007
    Posts
    29

    Default Bulk Load with named pipes example

    With some great help from Vertica's Patrick Toole after he suggested this route and helped troubleshoot, worked out a way to get Kettle to send the data to a named piped and into a bulk loader. Speeds are really nice. Feedback appreciated.

    (Using Kettle 3.2M1, 3.1)
    Kettle TextFileOuput step lets you pipe data to a command instead of a file. So a user can make a simple shell like below to accept that and send it along a named pipe into your bulk loader of choice. The script below was designed for Vertica, but just edit the commands to send to another bulk loader. The nice thing with this method is the bulk loader only needs to accept files, which means this could work for MySql too.

    In the TextFileOutput step:
    1. Paste in the path to the script below into the filename box. Set the params to pass as well (see script). Note that quoted params with spaces will be sent as different params at this time.
    2. Check the 'Run this as a command?'
    3. Check the 'Don’t Create file at Startup?' box.
    4. Make sure no extension for the file is set.


    #!/bin/bash


    if [ $# -lt 2 ] ; then
    echo "Usage: stdinToNamedPipeVerticaCopy.sh {schema.tablename} {delimiter in single quotes} {null valuein single quotes} {rejectedRows filename, defaults to {table name}.rej in step_log dir}";
    echo " Example: sendStdinToVerticaCopy.sh star.d_time '\t' '\\N' d_time";
    echo " (To have it fail on any rejected data, pass in 'fail' as the 4th param)";
    exit 127;
    fi


    table_name=$1;
    delimiter=$2;
    nullValue=$3;
    rejectFileName=$4;
    rejectFilePath="/home/dbadmin/etl/step_logs/";

    pipepath="/tmp/nm_pipe".$$

    mkfifo ${pipepath}


    if [ -z $rejectFileName ]; then
    rejectFilePath="${rejectFilePath}${table_name#*.}.rej";
    else
    if [ "$rejectFileName" == "fail" ]; then
    rejectFilePath="${rejectFilePath}${table_name#*.}.rej";
    else
    rejectFilePath="${rejectFilePath}${rejectFileName}.rej";
    fi
    fi


    if [ "$rejectFileName" == "fail" ]; then
    /opt/vertica/bin/vsql -U dbadmin -d warehouse -c "copy ${table_name} from '${pipepath}' delimiter ${delimiter} null ${nullValue} REJECTED DATA '${rejectFilePath}' ABORT ON ERROR direct;" &
    else
    /opt/vertica/bin/vsql -U dbadmin -d warehouse -c "copy ${table_name} from '${pipepath}' delimiter ${delimiter} null ${nullValue} REJECTED DATA '${rejectFilePath}' direct;" &
    fi

    cat - > ${pipepath}

    wait

    rm -rf ${pipepath}
    Last edited by dmccaffrey; 03-07-2009 at 05:17 AM.

  2. #2
    Join Date
    Nov 1999
    Posts
    9,729

    Default

    OK, just be careful, because the last time I got a script like that in my hands, I created a bulk loader ;-)

    On a more serious note : you can take the LucidDB bulk loader step source code and turn into a Vertica bulk loader in probably less than a day... if you would have a Vertica instance to test with.

    Take care,
    Matt

  3. #3
    Join Date
    Jan 2007
    Posts
    29

    Default

    Thanks Matt. That Lucid DB page was actually the key for getting this right. Had some issues initially, but that page showed the proper ordering of steps.

    I may just do that then and create the plugin. There are lots of other flags their copy command takes than what I exposed. I know Daniel E. has done some work on the plugin end already. The benefit of running the sql copy command directly through the jdbc connection instead of the command line vsql is probably significant enough to chase.

  4. #4
    DEinspanjer Guest

    Default

    While we could make a bulk loader step using the fifo pipes, it would be best to take full advantage of the API within the Vertica JDBC to read from a Java Input Stream. I've started some preliminary work on that this weekend and we'll see how far I get. It would be nice to get an experimental step into 3.2 for it.

  5. #5
    Join Date
    Jan 2007
    Posts
    29

    Default

    Dang. It works nicely on small row sizes. But I tested a large table. Looks like the last batch of rows Kettle is trying to send and the pipe is closed however. Hoping to solve this. Not sure why yet that the bulk loader command finished reading the pipe early it seems.


    INFO 08-03 03:58:14,360 - TestGet.0 - linenr 8900000
    INFO 08-03 03:58:14,508 - s_yv_users .0 - linenr 8900000
    INFO 08-03 03:58:18,898 - (stdout) - Rows Loaded
    INFO 08-03 03:58:18,899 - (stdout) - -------------
    INFO 08-03 03:58:18,899 - (stdout) - 8896030
    INFO 08-03 03:58:18,899 - (stdout) - (1 row)
    INFO 08-03 03:58:18,899 - (stdout) -
    ERROR 08-03 03:58:18,899 - s_yv_users .0 - Unexpected error :
    ERROR 08-03 03:58:18,900 - s_yv_users .0 - org.pentaho.di.core.exception.KettleStepException:
    Error writing line

    Error writing field content to file
    Broken pipe


    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.writeRowToFile(TextFileOutput.java:249)
    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.processRow(TextFileOutput.java:165)
    at org.pentaho.di.trans.step.BaseStep.runStepThread(BaseStep.java:2678)
    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.run(TextFileOutput.java:849)
    Caused by: org.pentaho.di.core.exception.KettleStepException:
    Error writing field content to file
    Broken pipe

    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.writeField(TextFileOutput.java:420)
    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.writeRowToFile(TextFileOutput.java:237)
    ... 3 more
    Caused by: java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:260)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:80)
    at org.pentaho.di.trans.steps.textfileoutput.TextFileOutput.writeField(TextFileOutput.java:393)
    ... 4 more

    INFO 08-03 03:58:18,900 - s_yv_users .0 - Finished processing (I=0, O=8903642, R=8903643, W=8903642, U=8903642, E=1)
    INFO 08-03 03:58:18,900 - testPipeToCopy2 - Transformation detected one or more steps with errors.
    INFO 08-03 03:58:18,901 - testPipeToCopy2 - Transformation is killing the other steps!
    ERROR 08-03 03:58:18,901 - testPipeToCopy2 - Errors detected!
    ERROR 08-03 03:58:18,901 - testPipeToCopy2 - Errors detected!
    Last edited by dmccaffrey; 03-08-2009 at 12:23 AM.

  6. #6
    DEinspanjer Guest

    Default

    What does the vertica.log file tell you about the transaction?

  7. #7
    Join Date
    Jan 2007
    Posts
    29

    Default

    Hi Daniel. To vertica it appears as if it were successful. I also tested my stdin script with the larger table, and now I see it completes with no broken pipe but not all the rows are there.

    What is interesting with this named pipe script is it bombs at the same place every time with this table. Wonder if it's a size issue related to the named pipe and when vertica is clearing/reading from it.


    Vertica log:
    2009-03-08 05:29:27.741 nameless:0x4abef940 [EE] <INFO> COPY: Loaded 8896030 rows and Rejected 6426 rows from inputfile: nm_pipe.30882 Txn:
    a000000000023a
    2009-03-08 05:29:32.103 Init Session:0x2aaab40124a0 [Txn] <INFO> Starting Commit: Txn: a000000000023a 'copy staging.s_yv_users from '/tmp/nm
    _pipe.30882' delimiter '|' null '\\N' REJECTED DATA '/home/dbadmin/etl/step_logs/s_yv_users.rej' direct;'
    2009-03-08 05:29:32.104 Init Session:0x2aaab40124a0 [Txn] <INFO> Commit Complete: Txn: a000000000023a 'copy staging.s_yv_users from '/tmp/nm
    _pipe.30882' delimiter '|' null '\\N' REJECTED DATA '/home/dbadmin/etl/step_logs/s_yv_users.rej' direct;', at epoch 0x9
    2009-03-08 05:29:32.105 Init Session:0x2aaab40124a0 [Command] <INFO> [Bulkload] Direct copy to s_yv_users from pipe committed at epoch 9.
    Last edited by dmccaffrey; 03-08-2009 at 01:39 AM.

  8. #8
    DEinspanjer Guest

    Default

    There isn't any chance that the rejected rows are causing a problem? Or maybe that you have the "end of stream" \. (backslash-dot) sequence somewhere in your input stream? I had a case where I was getting an ugly termination because I was running across the sequence. My reject logs had a suspicious message that lead me to that solution though. Something about the line ending not matching the previous something or other.

  9. #9
    Join Date
    Jan 2007
    Posts
    29

    Default

    I was thinking that. It's a possibility. I tried a 6 million row table and it completed. The problem table has user input. Probably hacker types, but every frustrating combination of delimiter and escape chars are found in user data. Love the emails fields that end with \. I'll let you know what I find. Thanks for the help.


    Edit: I blanked out the potential problem fields like name, email etc and all rows loaded. Maybe as I pipe the data I'll try a sed to remove bad data to avoid any extra transformation steps. I can't remove the final end of stream though. Thanks
    Last edited by dmccaffrey; 03-08-2009 at 02:25 AM.

  10. #10
    Join Date
    Jan 2007
    Posts
    29

    Default

    Sure enough it sppears to be bad data. I added a slow javascript step that appears to clean it. All the data loaded.

    if (email2) {
    email2 = email.replace(/(\||\n|\t|\\|")/g,'_');
    if (email2.length() > 100) {
    email2 = email2.substr(0,99);
    }
    }


    I have some other java socket stream closing issues, but I think that's related to keeping the server too busy.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Privacy Policy | Legal Notices | Safe Harbor Privacy Policy

Copyright © 2005 - 2019 Hitachi Vantara Corporation. All Rights Reserved.