View Full Version : How does reduce transformation work

05-29-2012, 03:22 AM
Hi everyone:
I have a file to be joined 15 times. this file is 300M and just has parent and child fields. But when I execute first join, it is failed.This is my logic.

Hadoop tell me the error is failed to report status for 600 seconds. Killing! so I guess may be kettle need download file firstly to local (I test it if i just directly download this file kettle need more than 2 hours). Then I chose to cut down the records to 10 and run this job (change Hadoop File input file no map input file, map input file also is 300M file). That's successful.

After that I change my logic to this.

just change Hadoop File Input to MapReduce Input (Map input file also chose 300M file). The MapReduce job can run, but the output file is empty. (The same logic I use Hadoop File Input and Hadoop File Output is correct 8811)
I saw Hadoop combine output records is 0.

my issue is
1 whether kettle execute Hadoop File Input/output step in local machine?
2 In one Map/Reduce transformation could not join two mapReduce input?(or means just have only one mapReduce input/output in one map/reduce transformation)
3 If I want to implement this logic, How can I change or what can I do?

05-30-2012, 01:24 PM
Hi lyllbl,

1) Kettle will execute all transformations used in a Pentaho MapReduce step as the mapper, combiner, or reducer within Hadoop. We provide a thin wrapper around the Kettle execution engine and translate data types to/from Hadoop Writables into PDI data types.

2) You may join as much data as you'd like; however, you can only have a single MapReduce Input and Output steps. These are the steps we will use to pass data between Kettle and the Hadoop MapReduce APIs.

3) You can increase the task timeout property for your job to account for the large file you're reading in from HDFS by setting mapred.task.timeout something larger than 600000 (in milliseconds). An alternative is to register the file in HDFS with Hadoop's DistributedCache so the file is copied down to the working directory of every node before execution. To do this create the user defined property "mapred.cache.files" with the value "hdfs://namenode:port/lookup.file". The "lookup.file" will be copied into the working directory of the task JVM. To reference it with a Hadoop File Input (or any other file input step, e.g. Text File Input) use the relative short name, "lookup.file".

Hope this helps. Chances are the mapred.cache.files will be the best option. Looking forward to hearing how it works out!

- Jordan

05-31-2012, 04:34 AM
Hi jganoff:
Thanks for your warmhearted help. I choose register the file in HDFS with Hadoop's DistrbutedCache to change my MapReduce job.


I use Hadoop Copy Files step to copy HDFS to under the directory of 'mapred.cache.files' .


Then in my Reduce transformation I edit the file of 'Hadoop File Input' step (use relative path name)


After that, I overwrite the value of 'mapred.cache.file' Under User Defined of MapReduce job.


But unfortunately, The MapReduce job can not run successfully. In Reduce section appear the error 'failed to report status for 600 seconds. Killing!'
Is there any thing I can do to reslove it?

05-31-2012, 10:55 AM
Sounds like you need to increase the mapred.task.timeout since your tasks are taking longer than 600s to execute. Try setting it to a a higher value.

05-31-2012, 01:05 PM
Another thought - do you really want a cartesian product of your two input streams? If what you want to do is join data to you should use a Stream Lookup step which will allow you to choose which fields are used to look up a row in the lookup stream and copy (join) values on to the data stream. Try that before increasing the mapred.task.timeout. Chances are with this change you won't need to increase the timeout at all.