Hitachi Vantara Pentaho Community Forums
Results 1 to 10 of 10

Thread: Data(Row) flow question

  1. #1

    Default Data(Row) flow question

    Hi,

    Have a very basic question - dont think I have seen it explained anywhere. Apologize ahead of time if its been mentioned.

    How are the rows processed in PDI?

    Background on the question...

    I ran a transformation which reads products from the CSV file and upsert categories it belongs to. I have a sub transformation for inserting category if it doesnt exist. I ran the wrapper transformation with 10 rows where all the products belonged to the same category. What I should have seen in the log was corresponding to 1 insert and 9 passes but what I see instead is all 10 inserts and there is only 1 category inserted in the db which is good and is what I expect.
    hence the question "how are the rows processed?" The only logical way to explain what I see is (that too I am not too sure is correct) that all the rows are processed parallely in different threads and all of them think they are inserting, but in the end due to merge of transaction or whatever is causing only 1 insert in the db. Is that correct? or may be there is an error somewhere in the transformation which causes my variables/fields to not get updated between row processing?

    And if the rows are indeed processed in parallel then steps that involve sorting are really "blocking steps" since they have to wait for all the rows to come in, sort and then pass it on to the next step. Is this a correct assumption?

    On the contrary I know I have read that the steps are definitely exectured in parallel which is also a bit of a mystry given the serial "nature" of the transformation and job flowchart/diagram.

    Overall I am still not very clear on relation between field order/position, row order and PDI transformation/job execution flow (steps like select values/merge/sort/ split to rows make things bit more complicated to understand).

    Will appreciate some insight.

  2. #2
    Join Date
    Nov 1999
    Posts
    9,729

    Default

    How are the rows processed in PDI?
    One by one. Each step represents a thread.

    And if the rows are indeed processed in parallel then steps that involve sorting are really "blocking steps" since they have to wait for all the rows to come in, sort and then pass it on to the next step. Is this a correct assumption?
    Yes.

    On the contrary I know I have read that the steps are definitely exectured in parallel which is also a bit of a mystry given the serial "nature" of the transformation and job flowchart/diagram.
    Transformation steps are executed in parallel.
    Jobs entries are executed in a serial fashion.

    Overall I am still not very clear on relation between field order/position, row order and PDI transformation/job execution flow (steps like select values/merge/sort/ split to rows make things bit more complicated to understand).
    Yawn.

  3. #3
    DEinspanjer Guest

    Default

    Your example is a little complicated since you mention both an upsert step and a sub-transformation to insert new categories. You would need to attach the transformations here so someone could take a look at them to tell for sure.

    If you have five CSV steps (or one CSV step running in 5 copies) then those five threads are all executing in parallel. If they are all feeding into one insert/update step, then each CSV thread will process as many rows as it can in the time allocated to it by the CPU to execute. Those processed rows will be sitting in a bucket waiting for the next step to pick them up. That bucket is shown as the output buffer for each of the CSV copies, and as part of the input buffer on the upsert step. Whenever the CPU schedules the upsert step to execute, it will look in the next input buffer and process as many rows from it as it can in the time allowed. In this scenario, you should see only one output on the upsert (this is the insert of the one category you have) and no updates (since nothing changed on the category).

    If you try to run the upsert step in multiple copies then things get a little hairier. You have to deal with the fact that each of those upsert threads would be trying to get a lock on the table to perform their inserts or updates. In order for that to work properly, you'd have to partition the data by category such that no two upserts would ever be trying to insert or update the same category.

  4. #4

    Default

    Thanks DEinspanjer for the response.

    Actually apologize for the confusion - I dont have both upsert and insert - just have an insert and its a sub transformation. It was a typo and I only run an insert sub-transformation which could do an upsert, but for now I am ignoring updates.

    So from what you describe on how rows are processed, seems like reading csv file step was really fast and it put all the rows in its output buffer while the insert categories step was relatively slow and before it could finish completely it was called in parallel for all the rows that were ready in its input buffer and so all the rows thought they were getting inserted but eventually only one got inserted as I only see 1 category inserted which is the correct behavior in the end. So that suggests there is some magic going on at the end - may be in the Mapping output step which combines all the db transactions and does an "aggregate" operation of just inserting category once. Is my explanation correct?

    Please find attached both wrapper/calling transformation and sub transformation and also the output screenshot.



    Quote Originally Posted by DEinspanjer View Post
    Your example is a little complicated since you mention both an upsert step and a sub-transformation to insert new categories. You would need to attach the transformations here so someone could take a look at them to tell for sure.
    If you have five CSV steps (or one CSV step running in 5 copies) then those five threads are all executing in parallel. If they are all feeding into one insert/update step, then each CSV thread will process as many rows as it can in the time allocated to it by the CPU to execute. Those processed rows will be sitting in a bucket waiting for the next step to pick them up. That bucket is shown as the output buffer for each of the CSV copies, and as part of the input buffer on the upsert step. Whenever the CPU schedules the upsert step to execute, it will look in the next input buffer and process as many rows from it as it can in the time allowed. In this scenario, you should see only one output on the upsert (this is the insert of the one category you have) and no updates (since nothing changed on the category).

    If you try to run the upsert step in multiple copies then things get a little hairier. You have to deal with the fact that each of those upsert threads would be trying to get a lock on the table to perform their inserts or updates. In order for that to work properly, you'd have to partition the data by category such that no two upserts would ever be trying to insert or update the same category.[/quote]
    Attached Files Attached Files

  5. #5
    DEinspanjer Guest

    Default

    Not exactly. I imagine the reason Matt didn't go into this level of detail is that it can be quite confusing to someone who hasn't been poking around in the code and debugging live transformations for a while.

    If you have a step that is running only one copy (the default), that step runs in a single thread. Every time that thread is scheduled for execution, it will pick up rows out of its input buffer and feed them one by one into the "work method" of the step (processRow()).

    There isn't really any magic at the end to tell the step whether to do an insert or not. I'll download your transformations now and take a look and see if I can shed some light on what's going on.

  6. #6

    Default

    Daniel,

    If I put additional calls to Insert category sub transformation one after the other back to back for 3 categories which I have in each row from the CSV input, I see even more weird behavior. I get error for the Primary Key already exists, which definitely points to something wrong with the "local" additional fields created in the sub transformation. They seem like they are not getting reassigned new values each time the sub transformation is called. I also should see m x n output rows in the output file (corresponding to Step - Write info to output.txt in sub transformation) for my test input of 8 products with each product row containing 3 categories, I should see 24 lines in the output file, but I sometimes see 7, 8, sometimes even 16 !!

    I think there is some "field sharing" happening between threads.
    Last edited by ritesht; 11-07-2008 at 04:02 PM.

  7. #7

    Default

    Hi Daniel,

    Checking if you got a chance to look into this further.

    In the meantime, I stripped down the complexity even further by normalizing the rows.

    Note the database I am working with does not have auto increment and the PK fields are string and not integer. That forces me to create my own Primary keys by using another table to create "sequences". Since the rows seem to be getting processed in parallel the same sequence value is read from the table for all rows currently being processed parallely causing insert error. This is classic transaction issue. not sure if there is step to create/commit a transaction or to lock the table.

    Thanks in advance.

    Quote Originally Posted by ritesht View Post
    Daniel,

    If I put additional calls to Insert category sub transformation one after the other back to back for 3 categories which I have in each row from the CSV input, I see even more weird behavior. I get error for the Primary Key already exists, which definitely points to something wrong with the "local" additional fields created in the sub transformation. They seem like they are not getting reassigned new values each time the sub transformation is called. I also should see m x n output rows in the output file (corresponding to Step - Write info to output.txt in sub transformation) for my test input of 8 products with each product row containing 3 categories, I should see 24 lines in the output file, but I sometimes see 7, 8, sometimes even 16 !!

    I think there is some "field sharing" happening between threads.

  8. #8
    Join Date
    Nov 1999
    Posts
    9,729

    Default

    "unique connections" option in the transformation settings dialog (Misc)

  9. #9

    Default

    Matt,

    Thanks for the quick response - I really appreciate it. I had tried that before posting the question both in calling transformation and sub transformation. It did not help.

    If anyone is interested in helping, I can provide my modified transformations.

    Also as a side note - seems like Abort step in sub transformation is not closing all the open db connections from both calling transformation and sub transformation. It seems like its closing connection for its own transformation but not parent. I see lingering connection which holds the lock and I have to explicitly kill the process in mysql (show processlist and kill process id).


    Quote Originally Posted by MattCasters View Post
    "unique connections" option in the transformation settings dialog (Misc)

  10. #10

    Default Solved

    I was able to get pass all the issues by using Normalization and sequence generation by using combination of in memory (Add Sequence) and sequence table from the DB as I mentioned numerous previous posts.

    If you are in the same boat and need specific details, let me know.

    Thanks to all who helped or tried to help.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Privacy Policy | Legal Notices | Safe Harbor Privacy Policy

Copyright © 2005 - 2019 Hitachi Vantara Corporation. All Rights Reserved.