Blogs

Rewriting filters for partition columns

In a recent blog post Graeme showed how to create external tables in Kognitio that use tables in Hive, and how to speed up queries from Kognitio simply by creating partitions in Hive tables on columns used for filtering.  This post demonstrates a new feature in Kognitio’s upcoming 8.2.3 release where it can automatically filter on a partition column and eliminate partitions for a query with a filter on a “natural” dimension column, if the partition column value is derived from the dimension column.

While Hadoop and Hive are used as examples in this post, the techniques discussed are equally applicable to other platforms and connectors.  For example you could apply this within Amazon AWS by using S3 instead of HDFS and Glue instead of Hive.

Introduction

Suppose you have a dimension represented by a timestamp column: partitioning on that column is unlikely to improve query performance because too many distinct values produces too many tiny partitions, for example more than 31 million for a year’s worth of seconds. (This can also happen, depending on your data, for other datatypes including real and date.) This is a problem because the advantage of being able to choose a subset of the data is eventually outweighed by the cost of handling a large number of partitions by the underlying storage and when compiling a query. Without getting into the details of Hive internals, a reasonable rule of thumb is that you don’t want to exceed about 10,000 partitions.

Instead the standard practice is to add a partition column using a different datatype and set that to a value derived from the dimension column; for example you might choose an int for the number of days since an arbitrary date (or “epoch”) or you could extract the date portion of the timestamp. Then you apply your filter to the partitioned column as well as the dimension column.

Finally you train your users to add these partition predicates for their queries, and if they don’t they will end up making a lot of full table scans. They might even get incorrect results if they change the predicate on the dimension column and don’t rewrite the partition predicate to match.

Wouldn’t it be nice if you could write queries filtering on the dimension column and your database would automatically rewrite your filter to apply to the partitioned column as well?

Kognitio can do this! You just need to tell it the relationship between the dimension column and the partition column.

Example data: Bitcoin transactions

An example data set is Bitcoin trade history for the Coinbase exchange in US dollars, coinbaseUSD.csv.gz, which you can download from Bitcoincharts. This is a CSV file where each row describes a trade with the following columns.

  1. Timestamp: Seconds since the start of 1970 (“Unix epoch“)
  2. Trade price: US Dollars to 1 BTC
  3. Trade volume: BTC

To load this into Hadoop:

wget http://api.bitcoincharts.com/v1/csv/coinbaseUSD.csv.gz
gunzip coinbaseUSD.csv.gz
hadoop fs -mkdir /user/kodoop/bitcoin
hadoop fs -put coinbaseUSD.csv /user/kodoop/bitcoin

(When I downloaded this file on 2018-11-27 it had 54053050 rows and the greatest timestamp was 1543234300.  The timings below were obtained on an HDP cluster of two r4.xlarge nodes in AWS EC2 running Hive and Kognitio.)

Setup in Hive

To load the data into Hive, run:

set hive.exec.dynamic.partition.mode=nonstrict; -- Sort records into partitions automatically
drop table ext_bitcoin_coinbase_usd;
drop table bitcoin_coinbase_usd_partitioned;

Create a Hive external table over the CSV data:

create external table EXT_BITCOIN_COINBASE_USD (
TRADE_UNIX_TIMESTAMP int,
TRADE_PRICE decimal(18,12),
TRADE_VOLUME decimal(18,12)
)
row format delimited
fields terminated by ','
stored as textfile
location 'hdfs:///user/kodoop/bitcoin/';

Now we want to create a partitioned Hive table. Partitioning on trade_timestamp would be a bad idea because the value is distinct for each of the 54M rows (and when I tried this the insert-select failed with the error “java.lang.OutOfMemoryError: unable to create new native thread”). Instead we can partition on the number of days since the start of the year 2000.

create table BITCOIN_COINBASE_USD_PARTITIONED (
TRADE_TIMESTAMP timestamp,
TRADE_PRICE decimal(18,12),
TRADE_VOLUME decimal(18,12)
)
partitioned by (TRADE_DAYS_SINCE_2000 int);

-- 1423 partitions
insert into BITCOIN_COINBASE_USD_PARTITIONED
partition (TRADE_DAYS_SINCE_2000)
select from_unixtime(TRADE_UNIX_TIMESTAMP) as TRADE_TIMESTAMP,
TRADE_PRICE,
TRADE_VOLUME,
cast((TRADE_UNIX_TIMESTAMP - unix_timestamp(timestamp'2000-01-01 00:00:00')) / 86400 as int) as TRADE_DAYS_SINCE_2000
from EXT_BITCOIN_COINBASE_USD;

Setup in Kognitio

Then in Kognitio, version at least 8.2.3:

-- This should already be created by default
create connector HIVE source HIVE;

create schema HIVE;

create external table HIVE.BITCOIN_COINBASE_USD_PARTITIONED
from hive target 'table BITCOIN_COINBASE_USD_PARTITIONED';

To tell Kognitio about the relationship between the columns, create a view of that table with a predicate expressing the partition column TRADE_DAYS_SINCE_2000 in terms of the dimension column TRADE_TIMESTAMP.

create view HIVE.V_BITCOIN_COINBASE_USD_PARTITIONED as
    select * from HIVE.BITCOIN_COINBASE_USD_PARTITIONED
    where TRADE_DAYS_SINCE_2000 = cast((TRADE_TIMESTAMP - timestamp'2000-01-01 00:00:00') day(6) as int);

(You don’t need to project the partition column from this view. If you have more than one such pair of dimension column and partition column then just list them all as filters ANDed together.)

Automatic partition elimination for equality predicates

Suppose we want to run a query on the table using a filter with the dimension column equal to a constant.

-- Execution time: 37.2 s
select * from HIVE.BITCOIN_COINBASE_USD_PARTITIONED
where TRADE_PRICE = 19891.99 and
TRADE_TIMESTAMP = timestamp'2017-12-17 12:40:13';

That was slow because it had to scan the whole table but if we can apply the same filter rewritten for the partition column we can speed up the query significantly.

-- Execution time: 1.5 s
select * from HIVE.BITCOIN_COINBASE_USD_PARTITIONED
where TRADE_PRICE = 19891.99 and
TRADE_TIMESTAMP = timestamp'2017-12-17 12:40:13' and
TRADE_DAYS_SINCE_2000 = 6560;

But we had to specify a predicate on the partition column as well. To do this automatically, simply select from the view using a filter with the dimension column equal to a constant, as below.

Kognitio will recognise that it can substitute that constant value of TRADE_TIMESTAMP into the predicate from the filter on the view, to give a predicate giving a constant value for TRADE_DAYS_SINCE_2000. This will then be used to perform partition elimination as desired. Kognitio will still apply your original filter, which is necessary here because the mapping is many-to-one.

-- Execution time: 1.5 s
select * from HIVE.V_BITCOIN_COINBASE_USD_PARTITIONED
where TRADE_PRICE = 19891.99 and
TRADE_TIMESTAMP = timestamp'2017-12-17 12:40:13';

Automatic partition elimination for BETWEEN predicates

Kognitio can also perform partition elimination if you specify filters across a range of values with inequalities or BETWEEN using constants. This requires that the relationship between the dimension column and partition column is suitable, as described below.

Here we find the maximum trade price over a 48 hour period using the base table.

-- Execution time: 37.8 s
select max(TRADE_PRICE) from HIVE.BITCOIN_COINBASE_USD_PARTITIONED
where TRADE_TIMESTAMP between timestamp'2017-12-16 12:00:00' and timestamp'2017-12-18 12:00:00';

If you run the same query on the view Kognitio will automatically apply the filter to TRADE_DAYS_SINCE_2000 and perform partition elimination.

-- Execution time: 1.7 s
select max(TRADE_PRICE) from HIVE.V_BITCOIN_COINBASE_USD_PARTITIONED
where TRADE_TIMESTAMP between timestamp'2017-12-16 12:00:00' and timestamp'2017-12-18 12:00:00';

For this to be applied for BETWEEN predicates the relationship between the dimension column and partition column must preserve ordering. More precisely, the function mapping from the dimension column to the partition column must be monotonic (either non-strictly increasing or non-strictly decreasing). The relationship must also be simple enough for Kognitio to be able to verify that the function is suitable.

Examples of suitable functions include:

  • 4 - INT_COL
  • INT_COL + INT_COL
  • cast((TIMESTAMP_COL - timestamp'2000-01-01 00:00:00') day(5) as int) — Number of days since the start of 2000

Examples of functions that are not monotonic and therefore not suitable include:

  • extract(dow from DATE_COL)
  • cast(TIMESTAMP_COL as time)
  • INT_COL * INT_COL

That’s not all…

I have just described the motivation for implementing this feature but it is more general than demonstrated above. Firstly, using a view with a predicate indicating the relationship between a dimension column and another column can be used for other cases where you want to filter using the dimension column but an optimisation is based on the other column: examples include indexes in database tables stored externally and in Kognitio partitioned tables, predicated view/table unions and compressed data maps. Secondly, this is applied to all queries – not just those involving views over external tables – which might allow optimisations such as applying filters earlier or eliminating query components entirely.

Leave a Reply

Your email address will not be published nor used for any other purpose. Required fields are marked *