Hitachi Vantara Pentaho Community Forums
Results 1 to 5 of 5

Thread: libext/spring upgrade

  1. #1
    rpm2 Guest

    Default libext/spring upgrade

    Minor question...

    When I developed my CobolInput Input step I designed it with
    dependency injection using the Spring framework 2.5.6. I think Kettle
    is at an older version. I guess I want to know if it's wrong to
    upgrade to 2.5.6, and include both spring-core.jar and spring.jar. So
    far I'm not seeing any affects of this. But then I haven't used
    everything in Kettle yet.

    Here is how I'm using Spring.

    public final void configure() throws Exception {

    beanFactory = new GenericApplicationContext();
    XmlBeanDefinitionReader xmlReader = new XmlBeanDefinitionReader(
    beanFactory);
    /**
    * The underlying configuration application-context.xml is
    implemented
    * as a class path resource.
    */
    xmlReader
    .loadBeanDefinitions(new ClassPathResource(APPLICATION_CONTEXT));

    /**
    * Obtain the bean for the instantiated application manager
    */
    this.applicationManager = (ApplicationManager) beanFactory
    .getBean(APPLICATION_MANAGER_BEAN);
    this.runParameters = beanFactory.getBean(RUN_PARAMETERS_BEAN);
    applicationManager.setBeanFactory(beanFactory);
    applicationManager.configure();

    --~--~---------~--~----~------------~-------~--~----~
    You received this message because you are subscribed to the Google Groups "kettle-developers" group.
    To post to this group, send email to kettle-developers (AT) googlegroups (DOT) com
    To unsubscribe from this group, send email to kettle-developers+unsubscribe (AT) googlegroups (DOT) com
    For more options, visit this group at http://groups.google.com/group/kettle-developers?hl=en
    -~----------~----~----~----~------~----~------~--~---

  2. #2
    Matt Casters Guest

    Default Re: libext/spring upgrade

    The only reason Spring's there is because some developer at Pentaho ignored my wish to NOT use spring.
    Now that this developer no longer works for the company I had planned to get rid of it like a bad disease.
    We only use it to read a couple of XML configuration files and let's be honest here: that's way over the top. That's especially the case since you can't really see from the code what it's doing.
    The real workings are covered 4 layers deep in interfaces, factories and spring XML. In essence we're using all the bad from spring without seeing any benefits.

    Now to your question... Extending spring dependency because of a single step seems rather excessive as well.
    The code you quoted is meaningless to me : what does it do?

    Take care,

    Matt
    ____________________________________________
    Matt Casters
    Chief Data Integration - Kettle founder
    Pentaho, Open Source Business Intelligence
    http://www.pentaho.org -- mcasters (AT) pentaho (DOT) org
    Tel. +32 (0) 486 97 29 37

    On Thursday 05 February 2009 05:28:08 rpm2 wrote:
    >
    > Minor question...
    >
    > When I developed my CobolInput Input step I designed it with
    > dependency injection using the Spring framework 2.5.6. I think Kettle
    > is at an older version. I guess I want to know if it's wrong to
    > upgrade to 2.5.6, and include both spring-core.jar and spring.jar. So
    > far I'm not seeing any affects of this. But then I haven't used
    > everything in Kettle yet.
    >
    > Here is how I'm using Spring.
    >
    > public final void configure() throws Exception {
    >
    > beanFactory = new GenericApplicationContext();
    > XmlBeanDefinitionReader xmlReader = new XmlBeanDefinitionReader(
    > beanFactory);
    > /**
    > * The underlying configuration application-context.xml is
    > implemented
    > * as a class path resource.
    > */
    > xmlReader
    > .loadBeanDefinitions(new ClassPathResource(APPLICATION_CONTEXT));
    >
    > /**
    > * Obtain the bean for the instantiated application manager
    > */
    > this.applicationManager = (ApplicationManager) beanFactory
    > .getBean(APPLICATION_MANAGER_BEAN);
    > this.runParameters = beanFactory.getBean(RUN_PARAMETERS_BEAN);
    > applicationManager.setBeanFactory(beanFactory);
    > applicationManager.configure();
    >
    > >

    >
    >



    --~--~---------~--~----~------------~-------~--~----~
    You received this message because you are subscribed to the Google Groups "kettle-developers" group.
    To post to this group, send email to kettle-developers (AT) googlegroups (DOT) com
    To unsubscribe from this group, send email to kettle-developers+unsubscribe (AT) googlegroups (DOT) com
    For more options, visit this group at http://groups.google.com/group/kettle-developers?hl=en
    -~----------~----~----~----~------~----~------~--~---

  3. #3
    rpm2 Guest

    Default Re: libext/spring upgrade

    Matt,
    I would agree that I was unconventional and maybe I went overboard
    with my plug-in coding. But I had reasons for taking
    some of the approaches I did. It wasn't without some thought.

    Regarding how I'm using Spring specifically, well to instantiate a
    bunch of processes and define how these processes work together. The
    documentation is in my APPLICATION-CONTEXT.XML file.

    On the fly I can reconfigure other libraries, wire in different
    classes or define how my processes work together meaning multi thread
    them.

    One example of this is I use a library called CB2XML which parses a
    cobol copy-book. I wire this into the application so that if I
    decided to change the utility later on it is done with a configuration
    change. I was also thinking at the time of designing a Cobol Output
    step, and the utilities I built would be reusable for this new plug-
    in.

    I'll start with removing this spring dependency with a replacement
    class file. Its not a big deal.

    Thanks for your feedback.









    On Feb 5, 2:07

  4. #4
    rpm2 Guest

    Default Re: libext/spring upgrade

    Okay--- I'm not using the Spring framework. It wasn't a big deal to
    take out.

    Can I present another potential issue..... I'm hoping you don't
    disagree with my own use of threads in my plug-in step.
    I broke out threads to have single purposes that write some result to
    a concurrent blocking queue -so they're thread safe.

    The next thread picks up the result and does its specific processing.
    Sometimes the result is a byte[], a batch of records decoded from the
    native encoding and then a converted object[] result. The last
    thread is responsible for passing the object[] record to be consumed
    by the running transformation or preview.

    I'm getting fantastic results reading large datasets because I'm
    reading way ahead of Kettle. When Kettle is ready for my record -I
    have a whole bunch of them ready to be consumed. An example of this
    was a 20,000 record file with
    one integer field in it. I read it in less than 1 second. I think this
    was fast.

    Yes I'm killing my threads based on my testing of previewing or
    debugging a transformation or stopping the execution. I'm detecting
    caught exceptions and reporting them so I'm not concerned yet.


    So now this is how the application is configured. I can pass you the
    Process class and the subclasses that do the work.
    This code is hot off the press. It hasn't been debugged.


    import java.math.BigDecimal;
    import java.math.MathContext;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicLong;

    public class CobolApplicationManager {

    // Create a locking object to protect certain instance variables;
    threads access these variables
    private Object appMgrLock = new Object();

    //Create threads which contain the Process
    private ThreadFactory defaultThreadFactory;

    //First trapped exception kills all the threads
    private Exception exception;

    //Try to make counts thread safe
    private AtomicLong linesConsumed = new AtomicLong();

    //This blocking queue is used by Kettle CobolInput
    implementation. processRow reads this queue for lines
    private BlockingQueue<Object> outputQueue;

    //The thread pool of processes
    private Thread[] threads;

    //A bunch of distinct process ordered in how the data
    is being passed and processed
    private Process[] processes;

    //A maximum queue depth of each of the output queues
    so I can protect the limited memory on my processor
    private int[] queueDepth;

    //The container to hold the input and output queue of
    a process
    private ProcessQueue[] processqueues;

    //A pool of statuses indicating true or false if they
    are done with their work
    private boolean[] completed;

    //A count of total lines processed set by processRow
    in CobolInput
    private AtomicLong totallinesProcessed = new AtomicLong();

    //I limit how far I read and process data just in case
    processRow is way too slow asking for lines
    private Semaphore work;

    //A counter of work given to the filehandler. The
    limit is reached it slows down until processRow starts
    //consuming lines again
    private int workAvailableDefault;

    //There is always one driver in all the processes- the
    first one
    private Process driver;

    //A value representing how many processes represent my
    application
    private final static int processCount = 3;

    //Local statuses of the CobolApplicationMgr
    public boolean complete;
    public boolean interrupted;
    public boolean stopped;

    //A repository of instance variables and reference
    objects used by all the processes
    private CobolInputRunParameters cobolRunParameters = new
    CobolInputRunParameters();

    //The defined first process which reads the dataset
    using a file channel
    private Process fileReader = new EbcdicByteReader();

    //A middle process to decode the native encoding into
    a bytes in batches of records
    private Process byteConverter = new EbcdicByteConverter();

    //The heavylifter process to convert bytes into EBCDIC
    alpha Strings and unpack integers and packed decimals
    private Process dataConverter = new DataTypeToStringConverter();

    //Input and output queues for all the processes
    private ProcessQueue qFileReader = new ProcessQueue();
    private ProcessQueue qByteConverter = new ProcessQueue();
    private ProcessQueue qDataConverter = new ProcessQueue();

    //The resource used by CobolInput to control opening
    and closing a file resource
    private FileInputHandlerInterface resource;

    //Default constructor to create instances of the
    process objects, threads and other run parameters
    public CobolApplicationManager() {
    complete = false;
    stopped = false;
    interrupted = false;
    cobolRunParameters.setFieldBuilderFactory(new CobolFieldFactory());
    cobolRunParameters.setAlphacomputationConverter(new
    InternalType2String());
    processes = new Process[] {fileReader, byteConverter,
    dataConverter};
    processqueues = new ProcessQueue[] {qFileReader,
    qByteConverter, qDataConverter};
    threads = new Thread[processCount];
    completed = new boolean[] {false, false, false};
    resource = (FileInputHandlerInterface) fileReader;
    driver = (Process) fileReader;
    outputQueue = qDataConverter.getOutputQueue();
    queueDepth = new int[] {1000, 1000, 5000};
    this.setWorkAvailableDefault(1000); //all this means 10% has to be
    consumed before work is released again
    }

    public final long addLinesConsumed(long line) throws Exception {
    requestMoreRows();
    return this.linesConsumed.addAndGet(line);
    }

    /*
    * Acquire work from the semaphore
    */
    public final void acquireWork() throws InterruptedException {
    work.acquire();
    }

    /*
    * This is where it all happens in configuring the processes together
    so they interact properly
    * At this point the Cobol Process threads are not
    active.
    * Queues, connections and processes are configured
    */
    public final void configure() throws Exception {
    work = new Semaphore(workAvailableDefault, true);

    for (int i = 0; i < processes.length; i++) {
    Process process = processes[i];
    // The Producer always has a null input queue since its the
    filehandler

    if (i == 0) {
    process.setInputQueue(null);
    process.setOutputQueue(processqueues[i].getOutputQueue
    ());
    driver.setProcessType(Process.PRODUCER);
    }
    else // The Consumer always takes the output queue from
    the last process
    {
    process.setInputQueue(processqueues[i-1].getOutputQueue
    ());
    process.setOutputQueue(processqueues[i].getOutputQueue
    ());
    process.setProcessType(Process.CONSUMER);
    }
    process.setApplicationManager(this);
    process.setOutputQueueCapacity(queueDepth[i]);
    process.setRunParameters(cobolRunParameters);
    process.setProcessNbr(i);
    defaultThreadFactory = Executors.defaultThreadFactory();
    threads[i] = defaultThreadFactory.newThread(process);
    }

    }

    /*
    * All the processes defined beforehand must be stopped first
    * before stopping the current process
    */
    public final boolean canStop(int processNbr) {
    int index = processNbr - 1;

    while (index >= 0) {
    if (!completed[index]) return false;
    index--;
    }
    synchronized(appMgrLock) {
    completed[processNbr] = true;
    }

    return true;
    }

    /*
    * As the client consumes the rows this opens the producer to read
    ahead. If the client
    * isn't consuming rows then the producer is paused. Right now 25%
    must be consumed to
    * open up the producer.
    */
    public final void requestMoreRows() throws InterruptedException {

    BigDecimal producedLines = new BigDecimal
    (this.totallinesProcessed.get());
    BigDecimal consumedLines = new BigDecimal(this.getLinesConsumed());
    MathContext mc_0 = new MathContext(0);
    mc_0 = MathContext.DECIMAL32;
    this.totallinesProcessed.set(resource.getTotalProcessed());
    if (producedLines.doubleValue() > 0) {
    Double consumed = consumedLines.divide(producedLines,
    mc_0).doubleValue();
    if (consumed > 0.5) { //read ahead if the client can keep up to
    at least 10%
    this.refillWork();
    }
    }

    }

    /*
    * Kill all underlying processes and interrupt the application;
    because we're done
    */
    public final void end() {
    stopAllThreads();
    this.interrupted = true;
    }

    /*
    * Return any trapped exception
    */
    public final Exception getException() {
    return exception;
    }

    /*
    * Retrieve the lines consumed so far
    */
    public final long getLinesConsumed() {
    return this.linesConsumed.get();
    }

    /*
    * Retrieve the results queue that lines are processed to
    */
    public final BlockingQueue<Object> getOutputQueue() {
    return outputQueue;
    }


    /**
    * @return the processes
    */
    public final Process[] getProcesses() {
    return processes;
    }

    /*
    * Retrieve the threads
    */
    public final Thread[] getThreads() {
    return threads;
    }

    /*
    * Retrieve the total lines processed so far
    */
    public final long getTotallinesProcessed() {
    return this.totallinesProcessed.get();
    }

    /*
    * Return the producer working semaphore.
    */
    public final Semaphore getWork() {
    return work;
    }

    /*
    * Define the semaphore default depth
    */
    public final int getWorkAvailable() {
    return workAvailableDefault;
    }

    /*
    * Retrieve the semaphore default depth value
    */
    public final int getWorkAvailableDefault() {
    return workAvailableDefault;
    }

    /**
    * Returns if the underlying application is being interrupted
    */
    public final boolean isInterrupted() {
    return interrupted;
    }

    /*
    * Has the process producer communicated its complete
    */
    public final boolean isComplete() {
    return complete;
    }

    /*
    * Return if permits are available.
    */
    public final boolean availableWork() {
    return this.work.availablePermits() > 0;
    }

    /*
    * Refill the semaphore so the producer process can do more work
    */
    public final void refillWork() throws InterruptedException {
    int refill = this.getWorkAvailable() - work.availablePermits();
    work.release(refill); // top off the work
    }

    /*
    * Begin executing the application
    */
    public final void execute() throws Exception {

    for (int i = 0; i < threads.length; i++)
    try {
    threads[i].start();
    } catch (RuntimeException e) { continue; }

    }

    /*
    * Trap exceptions happening in the processes and report them back to
    the client
    */
    public final void applicationException() throws Exception {

    for (int i = 0; i < processes.length; i++) {
    if (processes[i].getException() != null) {
    this.exception = processes[i].getException();
    this.stopAllThreads();
    if (this.exception.toString().equals
    ("java.lang.InterruptedException")) throw new Exception
    (this.exception);
    }
    }
    }

    /*
    * Set the application exception to be picked up by the client
    */
    public final void setException(Exception exception) {
    this.exception = exception;
    }


    /*
    * Sets the application process complete
    */
    public final void setComplete(int processNbr) {
    synchronized(appMgrLock) {
    this.completed[processNbr] = true;
    this.complete = true;
    }
    }

    /*
    * Set the work available semaphore default value
    */
    public final void setWorkAvailableDefault(int workAvailableDefault) {
    this.workAvailableDefault = workAvailableDefault;
    }

    /*
    * Stop all underlying process threads
    */
    public final void stopAllThreads() {
    this.setInterrupted();

    for (int i = 0; i < threads.length; i++) {
    threads[i].interrupt();
    }
    System.gc();
    }

    /*
    * Interrupt the application
    */
    public final void setInterrupted() {
    synchronized(appMgrLock) {
    this.interrupted = true;
    }
    }

    public FileInputHandlerInterface getResource() {
    return resource;
    }

    public CobolInputRunParameters getCobolRunParameters() {
    return cobolRunParameters;
    }
    }



    On Feb 5, 7:39

  5. #5
    Nicholas Goodman Guest

    Default Re: libext/spring upgrade

    You can also increase the number of rows that kettle will consume and
    buffer ahead("Nr of Rows in Rowset") of other steps which should, in
    theory, accomplish much of the same. With your own multithreaded
    buffer you have to manage the "throttling" in your own code (you
    can't outpace the rest of the steps by too much or you'll blow out
    memory). Kettle has already sorted out this issue but you'd have to
    write your own (max out and not read after configurable N (10,000)
    rows in your own buffer or something of the like).

    Whether you keep it in memory in your own buffer ("whole bunch of
    them ready to be consumed") or kettle keeps a bigger buffer ("Nr of
    Rows in Rowset") your transformation will only be as fast as the
    downstream steps that process them.

    I *personally* would stay away from .run()ing my own threads but
    assuming that you've got the starting/stopping down I don't see why
    it can't work. As always in BI - do what works and works well. If
    you have something that works well by all means - kudos.

    Nick

    On Feb 5, 2009, at 8:36 PM, rpm2 wrote:

    > I'm getting fantastic results reading large datasets because I'm
    > reading way ahead of Kettle. When Kettle is ready for my record -I
    > have a whole bunch of them ready to be consumed. An example of this
    > was a 20,000 record file with
    > one integer field in it. I read it in less than 1 second. I think this
    > was fast.



    --~--~---------~--~----~------------~-------~--~----~
    You received this message because you are subscribed to the Google Groups "kettle-developers" group.
    To post to this group, send email to kettle-developers (AT) googlegroups (DOT) com
    To unsubscribe from this group, send email to kettle-developers+unsubscribe (AT) googlegroups (DOT) com
    For more options, visit this group at http://groups.google.com/group/kettle-developers?hl=en
    -~----------~----~----~----~------~----~------~--~---

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Privacy Policy | Legal Notices | Safe Harbor Privacy Policy

Copyright © 2005 - 2019 Hitachi Vantara Corporation. All Rights Reserved.