PDA

View Full Version : Cassandra input and output - enhancements, suggestions etc



ronans
03-26-2012, 06:08 PM
Hi all Based on some feedback, it was suggested that I post an overview of some proposed Cassandra enhancements here for people to comment on and review.


My scenario is that I am using cassandra to read and write medium to large numbers of documents and document fragments as part of some data integration processes involving multiple map/reduce jobs. The numbers of rows written or read in each run range into the 6 million+ range, and the jobs may read/write to or from multiple column families per run (some for auditing purposes, some for base data i/o).

From examining the source code of the Cassandra input and output plugins, i note a couple of potential areas for improvement, both in terms of general capabilities and performance improvements.

In general, the cassandra output writer writes via CQL inserts and Cassandra input reader reads via CQL select statements. For large reads and writes, it is also necessary to alter the timeout for the socket connections .

I have experimented with using direct thrift reads and writes in conjunction with the user defined java class and found that thrift based APis provide far better performance and availability (at least with the pre-release version I am using). As the user defined java class does not support generics, i have been using a wrapper class that I created to abstract out the basic services for batch mutate without use of generics in the exposed class.

My proposal is that the cassandra input and output steps could be modified to use the thift APIs directly without use of the CQL engine when a certain option (ie a checkbox such as "Use Thrift I/O") is selected in the UI. For the reader step, there would also need to be a way to specify what columns to retrieve - this could still be via CQL (i.e use CQL for getting metadata, use thrift for I/O) or via a simple column list.

In addition I would propose the addition of a timeout option that would allow overriding the connection timeouts without having to reconfigure the cassandra.yaml files.I have implementations of this for UTF-8 string fields using the user defined java class and some custom utility libraries.Using thrift api's or other low level apis would also allow for addressing support for range slice queries.

Here are some other alternatives that would also address some of the issues:

1) for cassandra reader, dont execute the cql only in the first row fetch
2) adding timeout could be added independent of support for thrift.
3) Support column input and output formats for hadoop jobs without requiring custom code

Comments, suggestions anyone ?

satjo
03-26-2012, 10:56 PM
Thanks for the suggestions and sharing your experience. These are very helpful.

I am wondering if you were able to read all the 6 million rows in a single operation. Did you have to break them into smaller ranges while using the latest Thrift based APIs?

ronans
03-27-2012, 03:27 PM
I am still working on verifying the full read side - write used batch mutate with batch size of 300 and wrote multiple million rows over two hour period running on 7 machine hadoop / cassandra cluster. Content is documents ranging in size from 4k to about 10mb for the largest.

As the cassandra data size is on the order of 50gb+, I do not intend to read everything in one operation or anything close to that. I also did not want to be too version specific so I am using apis available since 0.80 of cassandra.

One approach to processing Key ranges and slice predicates with unknown quantities of results or very large quantities of results is to iterate over the rows or columns in batches using null start and end points in the keyrange or predicate and a relatively small batch size

When processing the rows or column , when reading to the end of the batch, you then use the last column name or row key as the starting point for the next column or row - its pretty similar to how you might use a cursor in relational systems.

This is then wrapped up in a wrapper class and used from the user defined java step - the wrapper class is to isolate Java generics dependencies from Janino which is used by Pentaho to compile user defined java classes - it does not support Java features such as generics.

for example, with row data (initialization and error handling code omitted) :

// set up predicate and key range
predicate.setSlice_range(new SliceRange(NullByteBuffer, NullByteBuffer, false, batchSize));
KeyRange keyRange = new KeyRange(batchSize);
keyRange.setStart_key(new byte[0]);
keyRange.setEnd_key(new byte[0]);
// other code omitted ...
do {
keySlices = connection.getClient().get_range_slices(columnParent, predicate, keyRange, ConsistencyLevel.ONE);

if ((keySlices == null) || (keySlices.size() == 0))
keysReturned = 0;
else
{
keysReturned = keySlices.size();

// process row here

keyRange.setStart_key(keySlices.get(keySlices.size()-1).getKey());


}
}
while (keysReturned > 1);

ronans
03-28-2012, 11:02 PM
As a follow on to my earlier post , for thrift based reading in a user defined java class, I get approx 4500 - 5000 rows/second when reading from a single remote cassandra node in batches of 100 or so where the thrift query constrains the columns via specifying the columns for the SlicePredicate to only return a relative small amount of data (ranging from 100 to 400 bytes per row). This is used with an openended key range query (ie start and end keys are zero length byte arrays)

When reading more complex data such as entire documents of up to 2 mb, the throughput drops to approx 500 rows per seconds, and when performing exact key lookups with no batching, throughput is averages out at 125 rows per second ( but as these key specific operations are i/o bound, they are good candidates for using kettle partitioning with 1 partition per cassandra node)

So to get good throughput on thrift reads from a cassandra based store with large rows, a good pattern to use might be to small amounts of metadata for each row, then perform any row selection or removal, and finally reread from the cassandra store for the remaining keys to get the larger data columns. See the attached diagram

Obviously the numbers will vary according to server load, type of machine etc but the scenarios above are based on running the pentaho transformation on
laptop with cassandra data being read from a remote server node initially and only running the key lookup against multiple server nodes.

These numbers reflect tests run on 50,000; 150,000 and 300,000 reads from the initial cassandra node

8523

satjo
03-30-2012, 03:52 PM
Thanks for the useful information. It is nice to know that you could get 4500-5000 rows/sec for read operations and you perform the remaining steps in Kettle. Is it possible for you to share a simple example with the custom defined Thrift based Java class?
This will be a good use case to read a large number of rows.

ronans
04-20-2012, 07:47 PM
satjo, Sorry for the delay - I still need to sort out licensing issues regarding releasing the code directly.
However here is the outline of how it works if you want to try directly.
1 - define a slice predicate for the range of columns to return. This can be based on a list of column names or using a column name range.
For example :

===
predicate = new SlicePredicate();
predicate.setSlice_range(
new SliceRange(NullByteBuffer, NullByteBuffer, false, batchSize));
====

if you know the exact columns you want to use, you can use something like

===
List<ByteBuffer> columns = new ArrayList<ByteBuffer>();

for (String s : columnNames)
{
columns.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}

predicate.setColumn_names(columns);
===

2 - define a key range
for example

queryRange = new KeyRange(rowBatchSize)
queryRange.setStart_key(new byte[0]);
queryRange.setEnd_key(new byte[0]);

3 - repeated get key slices for the key range.
Each fetch will get up to the amount of slices specified above as rowBatchSize.

Iterate through the batch of results processing them.
If you have many columns per row, you will need to do some form of iterative processing over the columns
When you process the last row of the batch - use its row key as the starting point for the next get
for example:

queryRange.setStart_key(keySlices.get(keySlices.size()-1).getKey());

followed by:

myClient.get_range_slices(columnParent, predicate, queryRange, ConsistencyLevel.ONE);

satjo
04-29-2012, 08:07 PM
Thanks ronans! This is really helpful.