PDA

View Full Version : Cassandra Kettle 4.3.0 Preview - Read large number of rows



satjo
03-24-2012, 02:09 PM
I am testing Kettle 4.3 with the bigdata plug-in (i.e, Cassandra 1.0 jar files).
I have a need to read a large number of records, say a million rows and I am wondering what are good options in uisng with Kettle/CassandraInput step. The performance with CassandraInput would not be good (compared with similar operation against a file of database) as it requires all the rows to be loaded in memory first. That could be the constraint with implementation of Cassandra 1.0 that is used in this plug-in. It does not have any streaming operation and it requires having the rows in memory.
I am wondering if the feature of reading from a stream, i.e., reading the rows one after other from a stream instead of fetching all the rows once, is available by using latest versions of Cassandra, i.e 1.1.0.

satjo
03-26-2012, 10:09 AM
I am testing Kettle 4.3 with the bigdata plug-in (i.e, Cassandra 1.0 jar files).
I have a need to read a large number of records, say a million rows and I am wondering what are good options in uisng with Kettle/CassandraInput step. The performance with CassandraInput would not be good (compared with similar operation against a file of database) as it requires all the rows to be loaded in memory first. That could be the constraint with implementation of Cassandra 1.0 that is used in this plug-in. It does not have any streaming operation and it requires having the rows in memory.
I am wondering if the feature of reading from a stream, i.e., reading the rows one after other from a stream instead of fetching all the rows once, is available by using latest versions of Cassandra, i.e 1.1.0.

Looking at Cassandra 1.1.0 release notes (https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12310865&version=12317615), there are a lot of improvements regarding streaming. I am wondering if these improvements would cause the data from ‘CassandraInput step’ to be read as stream as opposed to processing after all the rows are loaded in the memory.

dmoran
03-26-2012, 11:25 AM
satjo,

Thanks for the info. I will add a task for the developers look into the Cassandra 1.1.0 beta and check out the performance. Have you tried 1.1 and does it seem stable? Would you be able to swap out the JARs in Kettle and see if it "just works" with Kettle?

Thanks for your help,
Doug

satjo
03-26-2012, 03:55 PM
Doug,

I think you are referring to the libs from 1.1.0 (http://archive.apache.org/dist/cassandra/). I tried them and I get the following exception.
I think CassandraInput in Kettle needs to be modified as the apis might have been changed.


8512

ronans
03-26-2012, 06:01 PM
Hi Satjo,
I have been experimenting with similar scenarios (multi million row reads and writes, where each row may contain from 4k to 1mb+ of data) and so far have been using thrift apis in conjunction with the User Defined java class to perform high volume cassandra reads and writes.

The current big-data plugins for cassandra use CQL inserts for output and CQL selects for input. In the Cassandra Input step, the CQL is executed in on the first row and the subsequent rows only perform move operations on the iterator returned from the execution of the CQL - not sure how much this buffers in the original execute but it does seem that it wont handle queries with very large limits (ie 5 million rows etc)

If you are using hadoop, you may be able to use the ColumnFamilyInputFormat for large volume data input.

I was about to post a separate thread on cassandra issues and suggestions - feel free to add to that

Mark
03-27-2012, 03:18 AM
As far as I understand it, there is no streaming in Cassandra yet - the thrift interface (which I believe the CQL interface sits on top of) does not support it. There is some talk of moving to custom transports and the such:

http://wiki.apache.org/cassandra/CassandraLimitations
https://issues.apache.org/jira/browse/CASSANDRA-2478

The Datastax/Cassandra guys were really pushing everyone towards the CQL interface when I talked to them last year before starting work on the CassandraInput/Output steps. I assumed that - even if the CQL interface was immature - this would be the interface that would persist regardless of any underlying changes to the low-level protocol and transport.

I would have thought that the CQL interface would have similar behavior to using thrift directly in terms of how many rows are shifted over to the client in a batch. I have no idea if any paging goes on behind the scenes and, if so, whether the client can control the page size etc. in order to manage memory for large scans. If anyone has any info on this I'd love to hear it.

Cheers,
Mark.

ronans
03-27-2012, 06:06 PM
Just to provide some numbers for the write based scenario,
when using the CQL based Cassandra output step, I am seeing up to 10x the time to make similar writes over the use of the thrift based batch mutate writes.
A run that takes 2 minutes with the thrift based apis, takes up to 20 minutes - this is with a row output sizes ranging from 4k to 10mb - average 71k.
In addition, the CQL based output produces more unavailable errors and in some cases heap errors.
If we look at the implementation of the cassandra output step we can find some reasons for this.
Each row write is turned into a CQL insert statement and added to a batch buffer. As part of the generation of the insert statement,
the values are turned into string serialized forms. So with rows averaging 71k of data, we will take the perf overhead of stringizing the values,
along with the increased memory consumption of potentially multiple copies of 71k+ of data and the hit for managing large string buffers
when using batch sizes of 100 or more.
For the read scenario, the Cassandra input reader reads all of the data during processing of the first row and simply uses the iterator returned during the
processing of the subsequent rows. It is possible that the CQL driver uses the incremental KeyRange fetch apis behind the scenes, but read times for the first row
grow in proportion to the limit size - so it would appear that the incremental key range fetch is not used by default.
I presume that there may be ways to optimize this while continuing to use CQL and I am sure that the efficiency of the CQL drivers will grow over time.

Mark
03-27-2012, 09:40 PM
Looking at the CQLResult class I can't see anything that could be used to control how many rows are buffered - there is just a list of CQLRow objects, that as you say, probably get pulled over en mass when the CQL query is executed. It's probably the case that the CQL interface isn't really designed for your use cases, which are basically bulk imports and exports. I have a community contribution (currently waiting to be processed) that provides an SSTableOutput step and an SSTableUpload step based on the SSTableLoader/BulkLoader code. Have you looked at bulk loading your data into Cassandra?

http://www.datastax.com/dev/blog/bulk-loading

Cheers,
Mark.

ronans
03-28-2012, 11:50 AM
Not sure about others but for my case the bulk upload scenario does not fit directly - (that is not to say that the SSTable support will not be a useful addition).

The data I am dealing with is not pre-existing but only as the result of some analytics and source data manipulation . The purpose of using cassandra was as an alternative to writing millions of files to hdfs or having to deal with archive or har issues.

In any case the thrift based solution I have combined with the user defined java class works for my scenario.

satjo
03-28-2012, 12:22 PM
Mark,

I did not previously use SSTableLoader/BulkLoader, but I can use it if it is available as SSTableOutput/SSTableUpload steps. I do have some use cases for this type of scenarios.

But I am still interested in reading a large number of of rows after loading them in Cassandra. May be we can use some of Kettle features to read the data in smaller volumes.

Also as 'ronans' has suggested if a Thrift based solution with custom Java class would be another option. This option might be integrated with Kettle so that other operations can still be used with CQL.