Hi,

While I've been hanging around in the #pentaho irc channel for a few weeks now, I feel like this question is better suited to the forum medium.


To simplify the presentation of the problem I'm encountering, I'll use the "Sale" analogy.

I have a data warehouse with the following tables:

DIM_DATE (standard date dimension)
--------
date_key int
date_value date
...

DIM_STORE
---------
store_key int
store_name string
hasAnomaly boolean *****
...

FCT_SALES
date_key
store_key
amount


I know how to use Kettle jobs and transformations to populate all these tables. The only problem is the DIM_STORE.hasAnomaly column. The goal of this column is to flag stores with

anomalous sales. The user may then print reports showing only those stores.

To decide wether a store has anomalous sales, I have a complex algorithm involving multiple rules as well as time serie analysis. This algorithm has been coded in Java, and I have the

following API:

interface Sale {
Date getDate();
double getAmount();
int getStoreId();
}

interface ComplexSaleAnomalyDetector {
boolean areSalesAnomalous(Iterable<Sale> sales);
}

This algorithm takes a list of the last 3 years of sales for a given store, and returns the needed information.


My problem is that I don't know how I should go about calling my algorithm from Kettle.


My idea was to do the following:

- Use Kettle jobs/transformations to populate the fact table and update the slowly changing dimensions. The hasAnomaly stays empty for now.

- Now that all the tables are populated, use a kettle step that takes as an input the FCT_SALES table, ordered by store, filtering on the last 3 years

- I now have a stream of all the interesting sales records, ordered by store

- ???? this is the part I'm not sure about

- Call the java method once for each store (grouping the store's sales on the store_key field and using some kind of adapter KettleRecord --> Sale interface )

- ??? Find a way to retrieve the result, and update the DIM_STORE table. Ideally, by now, I would have a stream of records of the form [STORE_KEY, HAS_ANOMALY], stripped from all the

sales data (and I would have less records than before)



I am not sure of the best way of doing this. I thought about writing a Kettle plugin that would store all the records until it detects a break in the store_key field, then call my

areSalesAnomalous() method with these records, then repeat until the record stream is empty...

But I'm not sure if that's the right way of solving this. I feel like trying to "artificially group" records in the stream goes against Kettle's design and the idea that records are

handled as a stream, and might cause problem.

Are there better / simpler ways of doing this? Is there a way to group records and pass them to a Java method? Could the new User Defined Java Class step help with this?
If creating a custom plugin is the only way, do you have examples of plugins doing similar things (grouping input rows, then returning a result for each group)?


I'd appreciate any advice / suggestion.

Thanks in advance,

Later in the forums and in #pentaho


PS: I'm french, so sorry if there are any spelling mistakes...