Hitachi Vantara Pentaho Community Forums
Results 1 to 7 of 7

Thread: Batch/Group rows for streaming data from Amazon Kinesis

  1. #1
    Join Date
    May 2014
    Posts
    4

    Default Batch/Group rows for streaming data from Amazon Kinesis

    Tried on PDI 5.01 CE and 5.1 CE.

    I have streaming data coming from an Amazon Kinesis stream that I'd like to batch up and load into Redshift. I figured the Job Executor would be good for this since it has grouping options. I've been testing with both row and time based grouping, but I can't seem to get it working. The rows get passed to the Job Executor, but it doesn't seem to do anything with it. I was hoping that the rows would get passed on to the transformation (which simply writes the rows out using Text File Output.). However, the step just hangs after reading the initial batch of rows it gets and continues to block while data from the kinesis stream keeps coming in. No error is given. The Job itself only has one transformation. The transformation has a mapping input and output specification step.

    Will what I'm trying to do work? I've tried also with the transformation executor, but got the same result. I'm pulling in the Kinesis data using a kettle plugin that I've developed and seems to work well. I've taken that out and used a data grid to generate rows and got the same result. Am I using these steps correctly? I've used the Job Executor before with great success, but I was only processing parameters rather than rows. I figured the Trans Executor would be able to consume incoming rows.

    Is there another way of doing what I want to do? I tried the single-threader also, but that seemed to always write to the same file (even though I include the date and time in the filename). I thought that it would re-initialize the step with every new group.

    Hopefully someone has done this before and can help out. Thanks in advance!

    David

  2. #2
    Join Date
    Jul 2013
    Posts
    6

    Default

    Hi David,

    We are also working on a project to process data pulled from a Kinesis stream. My first attempt was to use the User Defined Java Class but I'm stuck at the RecordProcessorFactory since this will require a separate thread to execute. So it seems like I'll need to go the full Plugin route instead. Is there a reason you are choosing to do a Job plugin instead of a Transformation plugin? I'm leaning towards the latter with the hope that I can simply let the transformation run continuously, but knowing that the checkpointer should let me restart if necessary. We are using Vertica for our data warehouse so it has no problems with trickle loading (something Redshift didn't seem to handle when we tested it at the beginning of the year), hence my preference for a transformation plugin.

    Would you be willing to share your plugin? I would like to take a stab at converting it to a transformation.

    Thanks,

    Greg

  3. #3
    njain111 Guest

    Default

    I have not dealt with Kinesis or Redshift before. But have worked with both job and transformation executor.
    They are a bit awkward when it comes to logging
    How are you verifying that transformation is hanging? Do you have any logging steps which show any activity?

    Any logging inside trans executor is by default not logged, which one would expect to work but it does not.
    Connect 'Trans Executor' to 'Write to log' and pick 'this output will contain execution result'.
    This might give you some insight on what is happening under the box.

  4. #4
    Join Date
    Dec 2005
    Posts
    531

    Default

    Hi Greg,

    We are currently working on a simmilar feature, too, and got stuck at the same point.

    Have you found a solution for the UDJC or did you end up going down the plugin route?

    Regards,
    Ingo

  5. #5
    Join Date
    Apr 2014
    Posts
    14

    Default

    Hi All,

    I am also trying to implement same thing by using various ways, In my case, I am using Twitter Streaming API to collect a real time stream, But getting some errors.
    I already explained my scenario here: How to Implement Twitter "Streaming" API.
    If somebody found any resolution or clue please reply and share.
    Any suggestion or Idea are appreciated.

    Thanks and Regards,
    Rahul Trivedi

  6. #6
    Join Date
    Jul 2013
    Posts
    6

    Default

    Hi Ingo,

    I actually abandoned the my attempts to access Amazon Kinesis through Kettle and instead followed AWS examples on GitHub to build a standalone test application. There is something about the way the AWS Kinesis Client manages threads behind the scenes that just didn't play nicely with the Kettle environment. Hopefully they will make adjustments for that. But basically what I found was that there is no clean way to close down an application. You can't interrupt the RecordProcessors and call the CheckPoint function. So even if I could get the application to run inside Kettle it would end up processing duplicate rows. The only way I've found to handle that so far is to have the application checkpoint after every batch is processed which could end up requiring a lot of back and forth with DynamoDB.

    I haven't spent much time on this since my last post but hopefully I can get back to it in the next couple of weeks. I'll follow up with any progress.

    Thanks,

    Greg

  7. #7
    Join Date
    May 2014
    Posts
    4

    Default

    Sorry, I haven't responded to this thread. Didn't realize anyone had replied to it. My kinesis plugin works fine. I'm using the KCL with async checkpoints. One of the biggest problems I had was making sure my plugin/job was fast enough to keep up with the flood of data from kinesis. If your job falls behind it causes a lot of stop/restarting of record processors which makes things really slow. We have 32 shards so a lot of data flowing through. I had to set the maxRecords and failoverTime to reasonable values so that none of the steps got backed up (too much data). In order to do micro-batching, I ended up creating another plugin based on the TextFileOutput step that basically batched up the rows into files for me and sent the filenames to output. I then had another job that would just process these files as they were ready.

    As far as sharing the plugin, I do want to open source it, but can't right now. Gotta clear it with management and that has been pending for a while.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Privacy Policy | Legal Notices | Safe Harbor Privacy Policy

Copyright © 2005 - 2019 Hitachi Vantara Corporation. All Rights Reserved.