This 30 min presentation, by Kognitio CEO Roger Gaskell, explains a feature of Kognitio called “Query Streaming”. You can also read about query streaming in this blog.
This 30 min presentation, by Kognitio CEO Roger Gaskell, explains a feature of Kognitio called “Query Streaming”. You can also read about query streaming in this blog.
Hello. Today I’d like to talk to you about Kognitio query streaming.
This is a very important feature of the Kognitio product which allows us to run queries in memory even when the memory is limited and constrained.
Firstly, I’d like to talk about what I think the problem is with in-memory and that’s that there is never enough memory.
Let me qualify this slightly, there is never enough memory in our field, which is big data analytics. While the rapid decrease in the cost and increase in density of computer memory over the last few years has made it possible to build systems with vast amounts of memory, data volumes have increased at an even greater rate.
When we (Kognitio) first started selling our in-memory solutions (I will explain what are technology is in a minute). We measured total memory in Mbytes, 100s of MBs, but still MBs, now we talk about systems with memory capacities in 10s and 100s of TB, but in all that time I have never heard a client say “oh we have plenty of memory”. It never happens!
So memory is still a precious commodity and as a provider of in-memory technology it is beholden on us to use what memory is available as efficiently and that is where the query streaming feature that I am going to talk about today comes in.
But first I would like to explain who Kognitio are and what our technology does. I do hope this does not sound like a product pitch but I think it is important to understand a little of the product and its background to appreciate why we developed the query streaming feature
Kognitio (then called White Cross) was founded in 1988 with the goal of developing a database specifically for, what was then called decision support (what we would now call analytics). At that time most databases were designed for transaction processing (accessing and updating single records) and were very poor at data analysis.
MPP or Massively Parallel Processing was just emerging as a technique for developing faster solutions. Very briefly, MPP divides a problem into a number of tasks that can be executed at the same time and then combines the result. A very simple, commonly used illustration of MPP is how to manually add up 1M numbers. It would take one accountant an awful long time, but if I had 10,000 accountants and gave them 100 numbers each to add up, and then took the 10,000 results and gave a 100 each to 100 of the accountants… Well I think you get the picture.
The problem we had with adopting MPP in the data analytics space was that data was conventionally held on slow mechanical spinning disks. The speed at which we could access that data meant that we needed very little parallelism before we saturated the disk IO bandwidth and our multiple parallel processes were sitting around waiting for data.
So we took a very bold, some would say stupid, decision to do everything in fast computer memory. We went “in-memory”.
Now in 1988 memory was very expensive, but we were young (hard to believe now) and naïve and thought memory prices would quickly drop to the point where holding large quantities of data in memory would be realistic. We were right, it just took 20 years longer than we thought
Kognitio is a software product that can run on any x86 based platform running Linux.
It provides ultra-fast SQL access to big data. Alongside the SQL we also support the embedding of Non-SQL programs or scripts that allow for the parallel execution of complex advanced analytics algorithms. The SQL is fully functional and supports all the relevant parts of the SQL standard.
An important differentiator for us is the ability to support high concurrency, mixed workloads. One customer of our, a major credit card provider, has 10,000 Tableau users connected to a Kognitio instance and the platform is capable of supporting a peak load of 2500 Tableau queries a second.
Architecturally we are still using the design concepts we adopted in 1988, shared nothing MPP and in-memory, which means we have been continually developing this type of technology for 30 years. I mention this simply because exposing the product to the real world over many years has not only cost me my hair but drives the development of features such as the query streaming that I am presenting here. It’s a result of a product maturing.
Today Kognitio can run on a dedicated compute cluster with just a Linux OS installed, on an existing Hadoop clusters under the YARN resource manager or in the cloud.
Kognitio sits between where the data is stored and the data visualization / analysis / reporting application. The storage layer could be an existing data warehouse, for example Teradata, a general purpose database like Oracle or data stored in a Hadoop file system.
We also have our own file system that can be used for persistent data storage. This is normally used when Kognitio is installed on its own dedicated cluster and uses local storage attached to each node to build a large distributed file system. This effectively turns Kognitio into a high performance data warehouse and many customers use us for exactly that e.g. Bet365 the UK on-line betting company.
Tools and applications connect directly to the Kognitio layer, using the ODBC or JDBC industry standard APIs and then submit SQL queries, which Kognitio answers, returning the results directly to the app. Kognitio just looks like another data source to the tool.
Kognitio is generally used when conventional technologies are struggling to cope or to enable new applications that were previously thought too difficult or impossible to implement.
Data volumes are generally large. Although “large” means different things to different people, we tend to work with active data volumes of between ½ and 100s of Terabytes. Unfortunately Petabytes in-memory is still unrealistic but when we started out TB in memory was just a dream and now it’s the norm. By active data I mean the size of the data-set that is required for a particular piece of analysis. The total stored data could be much, much bigger than this.
Kognitio applications also generally have a need for speed. This could be to support interactive, train of thought analysis. To be effective and to avoid user frustration interactive queries need answers in a couple of seconds max regardless of data volume or workload.
Workload is often the biggest cause of performance problems. Modern visualization tools such as Tableau, Qlik, MicroStrategy, PowerBI etc have brought self-service data analytics directly in the hands of business users rather than just a few data analysts / data scientists, allowing them to make better informed and insightful business decisions. With potentially hundreds and sometimes thousands of people able to directly and interactively query the data the potential workload for the platform answering the questions is huge and performance becomes essential.
Speed is also essential for data as a service applications. There are many companies whose business model is to sell access to data. They have brought together some interesting data sets, coupled with an innovative application and the more queries they can answer in a given time frame the more money they make. Higher query throughput = more money
So back to “Never enough memory”
We have a fixed amount of memory and, for the reasons I explained earlier (disk IO bandwidth limitations) we have the data of interest loaded into a portion of it.
Efficient use of this space is important and we are constantly striving to get more into less using techniques like compression, columnization with compression, tokenization etc. But this is not what I want to talk about today.
The bit I want to talk about is this, the often forgotten “workspace”. I have spoken with hundreds of prospects over the years and I cannot remember one that has asked when sizing a system, “how much workspace do I need?” Their assumption is that if they have 5TB of active data, they will need 5TB of memory. They are usually surprised and shocked when we tell that they will need 7.5 to 10 depending on their workload requirements.
So why do we need so much workspace?
Here we have a query which joins together customer and order tables, groups the results by customer region and computes a couple of aggregations before returning the total number of orders and total spend for each region. This is the sort of thing that goes on all the time on an analytical platform and even the big, complex queries tend to contain operations like this. We’ll say that both tables are big and that the user has loaded them both into memory, hashed on their respective primary keys, meaning that each row has an in-memory copy and the row’s primary key dictates which node in the cluster each row will be stored on.
This is a typical static query plan for this query. Each step happens in sequence and at two points in this query plan we need to create temporary intermediate result sets TT1 and TT2. In an in-memory platform these are stored in the workspace.
Obviously we have a problem if TT1 and TT2 are too big but the problem becomes much more severe if we consider that the platform may be running other queries at the same time. The more concurrent sessions running the more the workspace becomes fragmented and the space available for each query becomes smaller and smaller. Queries start to return out of memory errors and get retried, further increasing the workload. Eventually nothing works and the system becomes unusable!
How much of a problem is this in reality. Well I can only speak from experience. Before Kognitio introduced the query streaming feature, this was by far the most common thing customers told me. They understood that it could be fixed by buying a bigger system but they could not justify sizing a system based on peaks loads. They said that they would accept slower answers when the system was over-loaded if it could just still return an answer. In reality this is never true. Users get upset the moment something goes slower, whatever the reason! But not as upset as they get when queries sometimes return errors, in what appears to them to be a random pattern.
Another thing that we learnt over the years is that users / DBAs hate reserving memory for workspace. However the system was originally sized, once they experienced the performance benefit of in-memory, they want to bring more and more data into memory, pushing up the data occupancy and reducing workspace. Originally we simply made recommendations about keeping 30-40% free for workspace but it never took long before we were getting support calls from customers who had 95% occupancy. To counter we introduced into the software a configurable maximum occupancy level, which by default we set to 80%. Now we get support calls asking how they change this config option!
We could use disk for the workspace. Horrible! This is an in-memory system we really don’t want to be writing temporary data to slow mechanical disks. OK maybe we could just do it when we are short of workspace, but then our query performance would fall of a cliff. Imagine doing this for tens or hundreds of concurrent users, the disk heads would be thrashing about and bandwidth would be terrible. Solid state disk would be better but they still have relatively slow write speeds and overall IO bandwidth is still limited.
Also when do you decide you need to start writing temporary results to disk, when memory is running out – might be too late then. Or do you write all the time in-case memory becomes low, now you have slowed query speeds even when resource is not constrained.
In reality I don’t know of any in-memory analytical platform / database that uses disk swapping.
I do know of products that use our second option: statically divided workspace. This is a very simplistic way of stopping the workspace becoming too fragmented. The Workspace is divided into a fixed number of chunks and each session is allocated a chunk. To allow the chunks to be large enough to be useful the total number of concurrent sessions allowed is low, for example 8. Besides the obvious issue with the low number of concurrent sessions, this is a highly inefficient use of the workspace as one session might have exhausted its workspace and returning errors whilst the other are all unused.
Query Streaming is our solution to this problem. Kognitio Streaming uses dynamic allocation of workspace and allows workspace that is in use to be dynamically resized as load increased and decrease. The assumption we make is that in an MPP, in-memory system re-computing intermediate results is much, much faster than writing and reading them from disk. Rather than treating intermediate results as something precious that must be preserved we take the view that if demand for resource means that we have to give up some space, so be it, we can simply recalculate them later if needed.
Our aim is to never return an out of memory error during a queries execution. I am not saying we always achieve this, but if do return an error, it’s a bug rather than a limitation of in-memory data analytics
Let’s take the same query we looked at earlier.
Here is its conventional, static query plan with its intermediate temporary tables.
Remember this is parallel system with the data distributed across many servers of nodes and each executing the query in parallel.
In this example the customer table was hash distributed based on customer id when the data was put into memory, but the orders table is not, so we need to redistribute it, to ensure that rows we wish to join are co-located. This creates the temporary result set TT1. When we do this as part of the query plan we do not redistribute columns we do not need for the query in question.
The next step, Step 2, does the actual join sending the resulting rows to another temporary result set TT2, distributing it on region name because the next step wants to group on this. The final step, Step3, simply aggregates the results on each node and returns a result set.
This static query plan is a good way to represent how nearly all queries run in Kognitio but it suggests that each step is run one at a time and requires possibly large intermediate result sets (TT1 and TT2) to be stored in workspace memory.
We have already talked about the workspace issue but running each step one at a time is also undesirable as we lose the opportunity to do other work while a step is blocked waiting for IO or network latency. Also no result rows are returned until the each step has been completed.
Kognitio query streaming takes the static query plan generated by the SQL optimiser and at runtimes automatically transforms it into a dynamic tree of operations in which each step can run concurrently. Making the query plan for this simple example look like this.
Temporary tables are transformed into query streams and used to pipe the data between the steps.
In this example TT1 has been turned into a stream. Both of the steps which use it will be running at the same time. The first step will produce rows from the orders table and send them to the appropriate node where they will sit in the stream’s incoming row buffer. The second step will then pull rows out of the stream as they arrive, join them to the appropriate customer rows using a hash lookup and then discard them. So we’ve saved the memory cost of storing the whole of TT1 and we can still run the query just as quickly as before. In fact it might run faster now because previously we may have had CPU idle time while waiting for the TT1 rows to transfer over the network, but now we can run the join and aggregate steps on the data we have received while we wait for the remaining rows to arrive.
So now let’s make more complicated by saying that the customer table is not distributed on customer_id so we can’t just send the order rows to the nodes where the matching customer rows are. This is a common scenario, data can only be initially distributed into memory one way. DBA / Users will usually try to choose a distribution that will support the majority of the workload without data redistributions but in a ad-hoc, mixed workload environment there will always be queries that the default distribution does not support.
This scenario also happens when the queries become more complicated and the joins are between temporary result sets coming from other joins, aggregations or rollups etc.
We now have two choices for redistributing the data. Either replicate one of the tables (end each row to every node) or move both tables resulting in two temporary tables distributed on the join column.
Replication is a costly operation but can only be used for a table that is under a certain size. For this example we will say that the table is too large to be replicated, so we will choose the second option, giving a query plan like this:
Here we have added an extra step (the redistribution of the customer table) and a new temporary table.
This extra temporary table seems like a small addition but it has a big implication for the streaming transformation.
The last row to come out of one of the streams might need to join to the first row that came out of the other, so we can’t just simply throw away rows after we have first looked at them.
We could leave one side of the join as a temporary table but that would suffer from the limitations of the static plan.
Instead we we use a different kind of data stream – a buffered stream. The streams in the first example were pass-through streams as data “passes through and is then discarded. A buffered stream does not discard the rows after looking at them but puts them in a “history buffer” in case they are needed again. The join algorithm can restart the buffered stream and use the history buffer to replay the rows back through the stream again.
The dynamic query plan for this query looks like this.
TT2 transforms into a buffered stream while TT! Transforms into a pass-through.
The join algorithm pulls sets of records from step1 into a buffer and then attempts to match them with all of the rows coming from step 2 using a hash lookup. The rows from step 1 get thrown away once the scan is complete but the rows from step 2 are kept in a history buffer for subsequent passes. The step 2 stream is then reset so that it can replayed from the history buffer and joined to the next batch coming from step 1. The process is then repeated to all of the rows from step 1 have been joined.
Because both sides of the join are streaming rows can be being joined and then aggregated while rows are still flowing from steps 1 & 2. This allows for very efficient use of CPU and network bandwidth throughout the queries execution.
Now the sharp eyed amongst you will be sitting there thinking but surely the history buffer is effectively storing the entire temporary table, so how does this help when workspace is constrained?
Well that’s where the point I made earlier comes in: In a MPP in-memory system re-computation is a relatively cheap operation.
In Kognitio Query Streaming the stream history buffers are discardable. Buffered streams keep a history buffer when there is enough workspace available, but once memory gets low the system has the option to throw away one of more history buffers and use the memory for something else. When its history buffer has been thrown away it effectively becomes a pass-through stream discarding rows after they have been used.
With no history buffer the join operation is unable to replay rows. The way it deals with this is to restart the process that generated the rows in the first place i.e. step 2. If enough memory has now come available it will build a new history buffer, if not it will keep restarting the process if it needs to see a row again. Obviously in these circumstances queries start to take considerably longer but this is far more acceptable than simply returning an error and then stopping.
So how does this look in practice?
The query would start by streaming from both source tables.
Step 1 starts streaming rows and starts to fill up a buffer, we call the join buffer. Step 2 is also streaming rows into an input buffer.
I should probably say at this point that step 1 and step 2 are not just redistributing rows. They are also applying vertical and horizontal filtering to minimise the number of rows and columns that we are redistributing. We only redistribute the minimum amount of data needed to satisfy the query.
Once we have enough rows in the join buffer we stream all of the data from step 2 through the join algorithm, joining as matches occur. As rows are joined we send them to step 3 for subsequent aggregation. Step 1 meanwhile continues to stream data into a new join buffer.
As we pass the rows through the join we also copy them to the history buffer as discussed earlier.
Once we have streamed all of the step 2 data passed the 1st join buffer we start on the next join buffer, but this time we use the history buffer as the data source for the step 2 stream. This prevents us having to run all the filtering and redistribution code again as well as avoiding sending rows between nodes again. We also create a new history buffer which fills up as the old history buffer empties.
We repeat this process until all the rows from step 1 have been processed.
One thing I should point out here is that although this sounds like we are moving a lot of data about here, in reality we a just swapping buffers around using zero memory copies.
As discussed earlier we have to prepare for the system deciding to throw away our history buffer at any point in time. If this happens then we restart step2 again to filter and redistribute from the raw data again. Again we attempt to create a history buffer in case memory has become free but if not we will continue to stream directly from step 2.
So what we are trying to do here is optimise for the situation where is enough workspace by effectively creating a full instantiated temporary table for one side of the join but being prepared to at any time encounter a low workspace situation and still be able to complete the query.
To support streaming all of Kognitio data processing algorithms have been architected to access data in a sequential manner (with restarts when necessary), so we are able to transform the vast majority of temporary table operations into streaming operations, fulfilling the original design od being able to answer more or less any query the user throws at it, given enough time, regardless of intermediate result size and how much workspace is available.
However once workspace becomes tight the system has a lot of difficult decisions to make about how much memory to use for each streams buffer and when to keep and discard history. For example in this particular example we could decide to make the join buffer for step 1 bigger, reducing the number of times we need to replay the history buffer and hence the step 2 restarts, but then we would have less memory for the history buffer, meaning the likely hood of it getting dropped increases. One of the strengths of this approach is that we are using a lot of information and intelligence to make these difficult memory management decisions as opposed to the crude mechanisms used by a cache which has very little info to go on when deciding when to drop blocks.
These problems are compounded by the fact that every step is happening independently on all nodes and each node is making its own memory management decision. This diagram abstracts a lot of this away. In our join example what is really happening looks a bit more like this:
Each node is having to keep history buffers for each step as well as manage input queues and flow control for every sender for every stream while sizing internal buffers for join lookups, aggregation results, and various other things. Now also bear in mind that busy Kognitio systems often have hundreds of plan steps running concurrently and you’ll start to get an idea of the challenges the system faces.
In an asynchronous system like this with lots of independent processes streaming data between them the opportunity for deadlocking is huge. This was a key problem we had to solve.
Early releases of the streaming feature exhibited a performance ‘cliff edge’ because of this. Everything worked well when there was enough memory available but when memory became tight performance of everything would degrade very quickly as buffers got smaller, operations made extra passes and accesses started repeating.
However, the streaming feature has now been in production for well over a decade and has been iterated on many times since then, based on feedback gained from running in real production environments.
This led to us adding the pass-through stream type and a lot of heuristics which are used to set buffer sizes and decide which history buffers to drop or keep.
Modern versions of Kognitio are now very good at making this sort of decision and are usually able to choose discards that have fairly low impact while allowing the important history buffers to remain. The engine uses a scoring mechanism to make decisions which takes multiple factors into account including the cost of rebuilding an image, the number of times it has been rescanned, decisions made on other nodes and the streams position within a hierarchy of streams feeding into each other.
What we are doing here is, when resource gets tight, compromising raw performance for the ability to continue to service a high concurrency, mixed workload environment. We can’t work magic here, the resource is generally fixed in size, although in the cloud we are working on being able to provide dynamic systems that increase in size as workload increases, but even then I am sure the financial people will impose budget restrictions that limit the size. Our experience is that this compromise is essential to allow us to be considered an enterprise class solution.
So those are the basic principles behind Kognitio streaming. We do regard this capability as one of major USPs in the in-memory data analytics space as this is the first time we have ever talked about how it works.
The more technical amongst you will have noticed that we have missed out some important bits. But the devil is in the detail here and those details are really hard to get right and have become our secret sauce.
We know of other companies in our space that have tried to run queries in this way and have not been able to make it reliable. The most common problem being the deadlocks I mentioned on the last slide
So even if we had the time we are not willing to reveal everything just yet.
Finally I wanted to briefly describe an example use.
Inmar is a good example of an application that relies on the query streaming I have just described.
Inmar are a 2Billion dollar US company that act as the middlemen between retailers and manufactures and pharmacies and insurance companies. In the retail case, they take the Point Of Sale data from the retailers and workout how much money the retailers are owed by the manufacturers, for honouring the manufacturers discount coupons. For those who don’t know coupons are huge in the US, much more than in Europe and tend to be used instead of loyalty cards. A side effect of this business is that they get masses of retail transaction data. This data contains really valuable customer behaviour data and Inmar wanted to somehow monetise this.
The data is vast and so they decided to store it a data lake in a Hadoop cluster. Hopefully most of you will be familiar with Hadoop.
This is relatively straight forward, it is what Hadoop is particularly good at. The problem came when they tried to provide access to this data. They wanted to be able to offer potential clients a web based portal through which they could perform interactive. Ad-hoc data analysis and visualisation. Unfortunately Hadoop is very bad at this. Most data visualisation tools use SQL to access the data and none of the standard SQL on Hadoop technologies can provide enough performance to support good frustration free interactive analysis, but more importantly they certainly would not allow lots of independent users to concurrently access the data and their monetisation strategy was to have lots of customers working with a common data set.
Once users have access to an interactive portal it is difficult to constrain what they can do and retail analytics is a use case that often creates lots of large intermediate results sets. Basket analysis in particular (where we try and work out what customers are buying with what) is particularly hard on the analytics platform.
They solved the problem by adopting Kognitio on Hadoop as their SQL on Hadoop layer. Kognitio proved to be 300 times faster than Hive on individual queries but also was able to support high concurrency usage.
To ensure a good ROI Inmar also needed to right size the system around average workload not peak. The query streaming feature was therefore essential to ensure that their customers did not start to get errors rather than answers, when the system was busy.