Query streaming in Kognitio

In this article I’m going to explain the workings of Kognitio’s query streaming feature. This is a core part of the Kognitio server’s architecture which we use to run a wide range of queries regardless of available workspace, adapting its behaviour based on system load and resource availability.

Query streaming takes query plans we get from compiling SQL and transforms them into dynamic data pipelines, with many plan steps running at once and data flowing between them. This is useful because it reduces the memory overhead of queries, increases the efficiency of execution (by allowing multiple parts to run at once, reducing the impact of IO and network delays) and allows queries to adapt at runtime to varying concurrency and workspace memory availability.

Static query plans

Let’s start with a simple example:

select c.region_name, count(*), sum(o.price) 
    from customers c, orders o 
    where = o.customer_id

Here we have a query which joins together customer and order tables, groups the results by customer region and computes a couple of aggregations before returning the answer, producing a total number of orders and total spend for each region. This is the sort of thing that goes on all the time on an analytical database server and even the big, complex queries tend to contain operations like this. We’ll say that both tables are big and that the user has loaded them both into memory, hashed on their respective primary keys, meaning that each row has an in-memory copy and the row’s primary key dictates which node in the cluster each row will be stored on.

SQL compilation could generate a query plan that approximates to this (Kognito has various optimisations I’m ignoring here because they aren’t relevant to streaming):

query streaming diagram 1

This is a static query plan – it is laid out as a series of data transformations which happen one at a time, each one outputting into a RAM-based temporary table to be consumed by subsequent steps. In this example we first have to rearrange the two tables so that rows which need join to one another are on the same node as each other. The ID column in customers is the primary key so, as we said above, we know which node each customer row will sit on based on this value. All we need to do before joining is move the data from orders to arrange the rows across nodes based on their customer_id column and the rows will be ready to join.

This is what the first step does and TT1 will have the relevant order rows arranged so they’re on the same node as the customer rows they join to (with unused columns removed). The next step does a join and sends the resulting rows to TT2, using region_name to choose a target node for each join result row because the next step wants to group on that. Now the final step can just aggregate the results together because all the nodes for any particular result group reside on a single node.

This is the way nearly all queries work in Kognitio. Each step is fully parallel and can run on every node in the system allowing the full power of the cluster to be brought to bear on each individual step.

Enter streaming

The static plan is a good way to represent the data transformations needed to run the query but it also forces steps to be run one at a time and the temporary tables which hold the results for each step can often take up a lot of memory. Both of these things are a problem – large temporary tables place an obvious burden on resources and by running only one step at once we lose opportunities to do other work while a particular step is blocked waiting for IO or network messages.

So we introduced query streaming to solve these problems. Query streaming happens at runtime and automatically transforms static query plans into dynamic trees of operations in which multiple query plan steps can run concurrently. Temporary tables are transformed into query streams which are used to pipe data between the steps. So the query plan from our simple example would end up running like this:

query streaming diagram 2

In this example TT1 has been turned into a stream. Both of the steps which use it will be running at the same time. The first step will produce rows from the orders table and send them to the appropriate node where they will sit in the stream’s incoming row buffer. The second step will then pull rows out of the stream as they arrive, join them to the appropriate customer rows using a hash lookup and then discard them. So we’ve saved the memory cost of storing the whole of TT1 and we can still run the query just as quickly as before. In fact it might run faster now because previously we may have had CPU idle time while waiting for the TT1 rows to transfer over the network, but now we can run the join and aggregate steps on the data we have received while we wait for the remaining rows to arrive.

Buffered Streams

So now lets make our example more complicated by saying that the customers table isn’t hashed on the ID column so we can’t just send the order rows to where the matching customer ones are. This is quite a common scenario – memory images can only be distributed in one way and users don’t always want to join using the obvious primary->foreign key relationships. This scenario also happens when the queries are more complex and the joins are between temporary tables coming from other joins, aggregations, rollups, etc.
So now we have two choices for re-arranging the data. Either ‘replicate’ one of the tables (send each row to every node) or move the rows from both tables resulting in two temp tables distributed based on the join column. Replication would allow the join to run in a similar way to the above example but it’s costly and is typically only used for tables under a certain size. For this second example we’ll say our customer table is too big for replication so we’ll choose the second option, giving a static query plan like this:

query streaming diagram 3

This is like the one above but with an extra step and a new temporary table so we can move the rows from the customer table to the right nodes. The extra temporary table seems like a small addition but it has big implications for the streaming transformation. The last row to come out of one of the streams might join to the first row to come out of the other one, meaning that throwing away rows from both streams after we first look at them won’t work. We could simply leave one side as a temporary table and everything would work, but that would suffer from the limitations of the static plans so we really want to be able to stream both sides of the join.

Kognitio solves this problem with a different kind of data stream. The stream in the first example is called a passthrough stream, because data passes through it and is then discarded. To solve the join problem we also need to use a buffered stream. The buffered stream works like a passthrough stream except that rows are not discarded after being looked at. Instead they are put into a ‘history buffer’ in case they are needed again. If it wants to, the join algorithm can tell the buffered stream to restart and it will use the history buffer to replay all the rows back through the stream a second time.

For the query plan above, the resulting dynamic query tree looks like this:

query streaming diagram 4

In our example here, TT2 transforms into a buffered stream while TT1 transforms into a passthrough stream. The join algorithm will pull sets of of records from step 1 into a memory buffer and then attempt to match these to all of the rows from TT2 using a hash lookup. The rows from step 1 get thrown away once the scan is complete but the rows from step 2 are kept in the history buffer for subsequent passes. Then the step 2 stream is reset so it can be replayed from the history buffer to be join the rows against the next batch from step1 and the process is repeated. Because both sides of the join are streaming, the server can be joining records and aggregating the results while some of the rows for TT1 and/or TT2 are still being transmitted over the network, making efficient use of the CPUs and network bandwidth throughout the query.

Discarding history and re-computation

After the first scan through the data, the history buffer in a buffered stream is effectively storing the entire table, which has the same memory overhead that using a temporary table would have. There are various optimisations we can do here to reduce the impact of this, we can pick the smallest table to be the buffered stream for example, but this is still not ideal.

The way Kognitio deals with this is to make stream history buffers discardable. All buffered streams keep a history buffer while the server has enough memory for this, but once memory starts to get low the server has the option to throw away one or more history buffers and use the memory for something else. When a history buffer for a stream has been thrown away it effectively becomes a passthrough stream, throwing away incoming rows after they have been used.

This means the scanning operation is unable to replay history to see the rows again. If the scanning operation needs to see the rows again, it must instead restart the operation which generated the streaming data. At that point the stream will receive the data again and will have a new opportunity to build a history buffer if enough memory has become available since the previous history was discarded.

So, taking our example again, the query will start streaming from both source tables and things would start like this:

query streaming diagram 5

And then when we decide we have enough rows from step 1:

query streaming diagram 6

And then when we’ve been all the way through the step 2 results:

query streaming diagram 7

Then, during the operation, if the history buffer is discarded we would need to repopulate it:

query streaming diagram 8

While repeating whole steps in this way seems wasteful at first glance, the alternatives when under memory pressure are spilling data to disk or failing to execute a query. Failing to execute a query is not really an option in production environments and Kognitio is designed primarily for memory-resident data. When the source data is resident in memory, recomputing a result is usually faster than writing that result to disk and reading it back again. The server aims to discard and recompute as little as possible, but this mechanism allows the server to carry on and keep running queries during periods of unusually heavy load.

Optimisation and Memory Management

All of Kognitio’s data processing algorithms have been architected to access data in a sequential manner (with restarts used when necessary) to be compatible with streaming. This means we are able to transform the vast majority of temporary tables in Kognitio’s query plans into streaming operations, fulfilling the original design goal that the server can run more or less any query the user throws at it, given enough time, regardless of intermediate result size, concurrency levels and available workspace memory.

Once memory gets tight, though, the server has a lot of difficult decisions to make about how much memory to give to each memory buffer and which history buffers to keep and discard, and these are not always obvious choices. For example, in the join we just looked at, the server could choose to make the join buffer on the passthrough table bigger and that would reduce the number of times the history buffer is replayed, potentially reducing the number of times step 2 restarts. But making that buffer big reduces memory available for history buffers, increasing the likelihood that the step 2 history buffer will get dropped before the query ends.

These problems are compounded by the fact that every step is happening on all the nodes and each node is making its own decisions about memory management independently of the others. The diagrams above have abstracted a lot of this away, in the join example what’s really going on looks a lot more like this:

query streaming diagram 9

Each node is having to keep history buffers for each step as well as manage input queues and flow control for every sender for every stream while sizing internal buffers for join lookups, aggregation results, and various other things. Now also bear in mind that busy Kognitio systems often have hundreds of plan steps running concurrently and you’ll start to get an idea of the challenges the server faces.

Early releases of the streaming feature exhibited a performance ‘cliff edge’ because of this. Everything worked well when there was enough memory available but when memory became tight performance of everything would degrade very quickly as buffers got smaller, operations made extra passes and accesses started repeating. However, the streaming feature has now been in production for well over a decade and has been iterated on many times since then, based on feedback gained from running in real production environments.

This led to us adding the passthrough stream type and a lot of heuristics which are used to set buffer sizes and decide which history buffers to drop or keep. Modern versions of Kognitio are now very good at making this sort of decision and are usually able to choose discards that have fairly low impact while allowing the important history buffers to remain. The engine uses a scoring mechanism to make decisions which takes multiple factors into account including the cost of rebuilding an image, the number of times it has been rescanned, decisions made on other nodes and the streams position within a hierarchy of streams feeding into each other. This is level of product maturity is one of the things that sets Kognitio apart from the other SQL on hadoop offerings.

To be continued

And that wraps up the streaming discussion for now. You may have noticed that I’ve concentrated on joins a lot here and not really said very much about the aggregation part of the operation. A future article will talk through other types of data processing in Kognitio query plans (aggregations, rollups, multiple distinctions, analytical functions with sliding windows, etc) and explain how we’ve designed these to work well with streaming.

Download Kognitio on Hadoop

Leave a Reply

Your email address will not be published nor used for any other purpose. Required fields are marked *