Posted By : Deborah Martin Comments are off
monitoring data, pnp4nagios
Categories :Blog, Data Science

So, we are now in an era where “Big Data” matters. We have oodles of choice when it comes to storing this data and a myriad of languages and tools to help us extract and report on that data.

But what about monitoring. Do you know if you’ll run out of disk storage in the next month, next six months ? Do you know if your database is performing as well as it should ? Do you need more RAM to satisfy your query requirements ?

These are questions invariably asked when there is a problem. When a database is initially commissioned and it is all new and shiny, the focus is usually to get up and running. Who cares what might happen in six months’ time, just tell me what my customers are doing!!

And then disaster strikes. Six months down the road, space is getting tight, the data feeds are growing and you’ve committed to keeping 3 years worth of data on the database at any one time.
It’s a perfect storm…

I can think of numerous times in the last 20 years when the above scenario and has happened and, dare I say it, could have been avoided if monitoring had been in place. And it doesn’t have to be confined to databases. Any resource you rely on, can and will eventually run out of steam.

For KAP, we monitor various aspects of a database to ensure its running smoothly. When it isn’t, we are told by the monitoring system so that we can proactively do something about it.

So, if there is a predictive disk failure, we can, 99% of the time, fix it before it becomes an issue by hot-swapping out the failed disks. No downtime. No bother. Happy client.

Typically, we also monitor query queues, disk states, KAP backups, JDBC-ODBC bridge, slab management, hardware health for NICs, Port monitoring and VPN monitoring among others.

We can also make our plugins available to clients. These plugins are compatible with Nagios Core (for more information, click here : Nagios Core) which is widely used and open source.

There are many other monitoring systems out there that will also allow our plugins to be easily adapted if required.

We can also produce reports using addons from the monitoring data which gives us an insight on the usage of any given database. For example, disk storage can often be underestimated – whether it be because the data grew faster than expected or that the retention of the data was longer than expected – both can have an impact. By, monitoring disk usage, we can see just how quickly the disk storage is being used and look to try and increase that storage with minimal impact to the client. We can do the same for RAM.

We use pnp4nagios (for more information, click here : pnp4nagios) which is an open source graphing tool that utilizes the data generated by Nagios Core.


Playing Around With Kognitio On Hadoop: Hashes Cannot Always Anonymise Data


Posted By : Chris Barratt Comments are off
kognitio data scripts how to

In this brave new world of big data one has to be careful of the loss of privacy with the public release of large data sets. Recently I reread “On Taxis And and Rainbows” that presents an account of how the author came to de-anonymise a data set made available by New York City’s Taxi and Limousine Commission. This data set included logs of each journey made: locations and times of pickups and drop-offs along with supposedly anonymised medallion numbers and other data. The author came to identify the medallion number data as the MD5 check-sums of the medallion numbers. They were then able to compute a table mapping 22 million possible medallion numbers to their MD5 check-sums. They reported that this took less than 2 minutes. This table allowed them to start ‘de-anonymising’ the data.

The article identified that this was possible because of the nature of the data. The set of all possible medallion numbers was known and small enough for a table mapping from the data for check-sums to be generated for each value in a feasible amount time.

Computing a large number of check-sums should be easy for a Kognitio system to handle. As several check-sum functions are implemented as SQL functions in the Kognitio system, I wondered how quickly Kognitio could generate a table mapping some potentially personally identifying information to check-sums using the functions.

As every UK address has a postcode I thought that postcodes would form a good candidate for a piece of personally identifying information. They are roughly equivalent to US ZIP codes. It seems plausible to me that someone might attempt anonymise postcodes using check-sums. So I thought I would try using Kognitio to compute tables of postcodes and their check-sums to see how quickly such an anonymisation strategy might be broken.

I found an almost complete list of valid postcodes in the “Code-Point Open” data set. Handily, this is freely available from the Ordnance Survey under the Open Government Licence. The data set contains about 1.7 million records each with a postcode field and other fields for related geographical data. The field titles are:


The data set is provided as a zip archive containing 120 CSV files with a total uncompressed size of 154MiB.

Getting The Kognitio System Check-sum Functions Timed

For the check-summing task I created a new Kognitio on Hadoop system (the ‘Kognitio system’ from here on) to run on a small developmental Hadoop cluster that I have access to. I copied the CSV files into a new HDFS directory on the cluster running the Kognitio system.

For all the queries/SQL commands I used the ‘sys’ superuser of the Kognitio system. I was the only one who was going to use the Kognitio system so there was no need to protect data from anyone but myself and I could avoid setting up unneeded users and privileges for anything in the system.

I created an external table to give the Kognitio system access to the CSV data on HDFS. This kind of table can’t be written to but can be queried like any other ‘regular’ table on the Kognitio system. Then I created a view on the external table to project only the postcodes. I followed this by creating a view image on the view to pin the postcode only rows of the view in memory. It is faster to access rows in memory than anywhere else.

Then I created several ram-only tables to store postcodes with their check-sums. Ram-only tables as the name suggests do not store row data on disk and so using them avoids any slow disk operation overheads.

Finally I ran and timed a series of insert-select queries that computed the check-sums for our postcodes and inserted them into the ram-only tables.

Some Background On External Tables, Connectors and Plugin Modules

Kognitio’s external tables present data from external sources as a set of records of fields using a connector. The list of fields and the connector are specified by the create external table statement.

The connector in turn specifies the data provider to use along with any connector specific configuration parameter settings required. The data provider implements the interface to the external data source for the internals of the Kognitio system. Data providers are also known as connectors but I wanted to avoid potential confusion from shared names.

The data provider that I used to access the HDFS data is implemented as a plugin connector provided by a plugin module. Plugin Modules are how Kognitio system implements some features that are not in the core code. Loading and activating the plugin modules is required to make their features available to the Kognitio system. Plugin modules can also present parameters used for any module specific configuration required.

Creating The External Table: Making the Code-Point Open Data Accessible to the Kognitio System

The external table that I needed to access the data on the HDFS system required a connector that used the HDFS data provider. This is implemented as a plugin connector from the Hadoop plugin module. So I loaded the Hadoop plugin module, made it active and set the libhdfs and hadoop_client plugin module parameters.

create module hadoop;
alter module hadoop set mode active;
alter module hadoop set parameter libhdfs to '/opt/cloudera/parcels/CDH/lib64/';
alter module hadoop set parameter hadoop_client to '/opt/cloudera/parcels/CDH/bin/hadoop';

The parameters indicate to the HDFS data provider where the libhdfs library and hadoop executable are on my Hadoop cluster.

Then I could create the connector that was needed:

create connector hdfs_con source hdfs target ' namenode hdfs://, bitness 64 ';

The create connector statement specifies where the namenode is and to use 64 bit mode, which is best.

Then I could create the external table:

create external table OSCodePoints
PostCode char(8),
Positional_quality_indicator varchar(32000),
Eastings varchar(32000),
Northings varchar(32000),
Country_code varchar(32000),
NHS_regional_HA_code varchar(32000),
NHS_HA_code varchar(32000),
Admin_county_code varchar(32000),
Admin_district_code varchar(32000),
Admin_ward_code varchar(32000)
from hdfs_con target
'file "/user/kodoopdev1/OSCodePoint2016Nov/*.csv"'

The ‘file’ parameter specifies which HDFS files contain the data I want to access.

I created a view with a view image:

create view PostCodes as select PostCode from OSCodePoints;
create view image PostCodes;

The view projects just the postcode column out. Creating the view image pins the results from the view into RAM. All queries on the view read data from the view image and ran faster as a result.

I generated check-sums using the md5_checksum, sha1_checksum, sha224_checksum, sha256_checksum, sha384_checksum and sha512_checksum functions. These are all implemented as plugin functions provided by the hashes plugin module. This was loaded and activated:

create module hashes;
alter module hashes set mode active;

I ran the check-sum functions with seven different insert-select queries to populate a set of lookup tables that mapped postcodes to their check-sums. Six of the queries generated a check-sum for one of each of the check-sum functions. The 7th query used all 6 check-sum functions to generate a lookup of table for all 6 check-sums simultaneously. This was done to get some data indicating what the overhead associated with retrieving and creating rows was. The 7 lookup tables were created as ram-only tables. As the name implies these hold the row data in RAM and not on disk and so removed the overhead of writing the rows to disk.

The ram-only table create statements were:

create ram only table PostCodeMD5Sums
PostCode char(8),
md5_checksum binary(16)

create ram only table PostCodeSHA1Sums
PostCode char(8),
sha1_checksum binary(20)

create ram only table PostCodeSHA224Sums
PostCode char(8),
sha224_checksum binary(28)

create ram only table PostCodeSHA256Sums
PostCode char(8),
sha256_checksum binary(32)

create ram only table PostCodeSHA384Sums
PostCode char(8),
sha384_checksum binary(48)

create ram only table PostCodeSHA512Sums
PostCode char(8),
sha512_checksum binary(64)

create ram only table PostCodeCheckSums
PostCode char(8) CHARACTER SET LATIN1,
md5_checksum binary(16),
sha1_checksum binary(20),
sha224_checksum binary(28),
sha256_checksum binary(32),
sha384_checksum binary(48),
sha512_checksum binary(64)

The insert-select queries were:

insert into PostCodeMD5Sums

insert into PostCodeSHA1Sums

insert into PostCodeSHA224Sums

insert into PostCodeSHA256Sums

insert into PostCodeSHA384Sums

insert into PostCodeSHA512Sums

insert into PostCodeCheckSums

Results And Conclusions

For the 1.7 million rows (1691721 to be exact) the times for the insert-select queries were:

md5_checksum: 0.62s
sha1_checksum: 0.53s
sha224_checksum: 0.63s
sha256_checksum: 0.67s
sha384_checksum: 0.83s
sha512_checksum: 0.86s
all 6 checksums: 2.87s

The results show that the check-sum for each postcode is being calculated in something around 500ns or less. This system could calculate about 170 billion check-sums a day. The Kognitio system was running on a node with two E5506s, giving 8 cores running at 2.13GHz, the node is several years old. It seems obvious to me the just using check-sums and/hashes alone isn’t going to hide data with this system, even more so with faster up-to-date hardware.

Comparing the “all 6 check-sums” insert-select query run time with those of the others did show that there was an appreciable overhead in retrieving and writing the row data. The total time taken to populate 6 lookup tables of just one check-sum value was 1.44 times more than the time taken to populate the one lookup table of all six check-sum functions.

Because I used the Code-Point Open data:
Contains OS data © Crown copyright and database right (2016)

Contains Royal Mail data © Royal Mail copyright and Database right (2016)

Contains National Statistics data © Crown copyright and database right (2016)

Getting the most from life


Posted By : Ben Cohen Comments are off
getting the most from life
Categories :Blog, Data Science

I am going to use Conway‘s Life to show off some of Kognitio’s features, including some which are new in version 8.1.50.

Life is a well-known toy problem so I won’t explain it in great detail beyond defining it. The rules are simple yet they lead to surprising emergent properties including periodic sequences and even a Turing machine:

  • At each of a sequence of time steps staring from zero, each cell in an infinite square lattice is either “alive” or “dead”.
  • At step zero there is a chosen starting configuration of dead or alive cells.
  • A dead cell will become alive in the next step if it has exactly 3 neighbours in the current step.
  • A living cell will remain alive in the next step if it has 2 or 3 neighbours in the current step.
  • All other cells will remain or become dead in the next step.

For example:

first step for a glider

We start by modelling the lattice. In principle a two-dimensional boolean array would work, but a table listing the cells that are alive in the current step – a sparse array – is more practical and will benefit from
Kognitio’s parallelism.

-- New syntax to drop a table but not give an error if it doesn't exist
drop table if exists LIFE cascade;

create table LIFE (X int not null, Y int not null, unique (X, Y));

The glider, pictured above, is a simple pattern with interesting behaviour so I will use that as an initial position.

-- Insert a glider
insert into LIFE values (0, 0), (0, 1), (0, 2), (1, 2), (2, 1);

We need a nice way to display the current state in LIFE, rather than trying to visualise a table of coordinates. This could be achieved by exporting the contents of LIFE and rendering it in an external program. But a simpler way is to use Kognitio’s external scripting feature, which allows the user to define functions on tables, to write a Python script to convert a CSV list of X and Y coordinates into a ascii-art graph for each row.

-- New syntax to drop a script environment but not error if it doesn't exist
drop script environment if exists PYTHON;

create script environment PYTHON command '/usr/bin/python';

-- New syntax to drop an external script but not error if it doesn't exist
drop external script if exists SHOW_LIFE;

create external script SHOW_LIFE environment PYTHON
receives(X int, Y int)
-- Need to preserve leading spaces because of legacy behaviour
sends (Y int, CELLS varchar(32000)) output 'fmt_preserve_leading_spaces 1'
-- Use limit 1 threads so that everything goes to the same node, otherwise
-- we end up with wrong output such as  Y|CELLS  instead of  Y|CELLS
--                                      2|**                 2|***
--                                      2|*                  1|*
--                                      1|*                  0| *
--                                      0| *
-- We could use a partition and then relax that, which would allow parallelism,
-- but we need to synchronise the min_x and min_y values between the scripts.
-- That might be the subject of a future blog post.
limit 1 threads
script LS'python(
    import csv, sys

    min_x = min_y = sys.maxint
    max_x = max_y = -sys.maxint - 1
    cells = {}

    csvreader = csv.reader(sys.stdin)
    for (x, y) in csvreader:
        x = int(x)
        y = int(y)
        cells[(x, y)] = ()
        if x < min_x:
            min_x = x
        if x > max_x:
            max_x = x
        if y < min_y:
            min_y = y
        if y > max_y:
            max_y = y

    # Allow a margin around the cells
    min_x -= 1
    min_y -= 1
    max_x += 1
    max_y += 1
    if len(cells) > 0:
        csvwriter = csv.writer(sys.stdout)
        for y in reversed(range(min_y, max_y + 1)):
            row = ""
            for x in range(min_x, max_x + 1):
                if (x, y) in cells:
                    row += '*'
                    row += ' '
            csvwriter.writerow([y, row])
        csvwriter.writerow(["", "Top left is at (%s, %s)"%(min_x, max_y)])

external script SHOW_LIFE from (select * from LIFE)
order by Y desc;

We can also write a script to populate the table from a .LIF file rather than having to write an INSERT statement. (This could alternatively be defined as an external table.)

drop external script if exists GET_LIFE;

create external script GET_LIFE environment PYTHON
sends (X int, Y int)
-- Use limit 1 threads to avoid duplicates
limit 1 threads
script LS'python(
        import os, csv, sys, urllib2

        # Download the file from the given URL
        life_file = os.environ["WX2_LIFE_URL"]
        req = urllib2.Request(life_file)
        response = urllib2.urlopen(req)
        result = response.readlines()

        csvwriter = csv.writer(sys.stdout)
        if result[0].strip() == "#Life 1.06":
                # Life 1.06: parse space-separated values
                for l in result[1:]:
                        (x, y) = l.split()
                        csvwriter.writerow([x, y])
        elif result[0].strip() == "#Life 1.05":
                # Life 1.05: parse text pattern
                x = 0
                y = 0
                for l in result[1:]:
                        l = l.strip()
                        if len(l) == 0:
                        if l[0] == '#':
                                if l[1] == 'P':
                                        (p, x, y) = l.split()
                                        x = int(x)
                                        y = int(y)
                                for c in l:
                                        if c == '.': # cell is dead
                                        elif c == '*': # cell is alive
                                                csvwriter.writerow([x, y])
                                                print "Unexpected character"
                                        x += 1
                                y += 1
            print "Unknown LIFE file format"

insert into LIFE
    external script GET_LIFE
    -- Use the script parameter LIFE_URL to give the URL: a 10-engine Codership
    -- Or use a URL like 'file://localhost/path/to/file.lif' for a local file
    parameters LIFE_URL='';

Define a table NEIGHBOUR_MASK of the pairs (X, Y) that are neighbours of the origin, to provide a 3-by-3 “mask” for eight neighbouring cells.

drop table if exists NEIGHBOUR_MASK cascade;

create table NEIGHBOUR_MASK (DX int not null, DY int not null);

insert into NEIGHBOUR_MASK values (-1,  1), ( 0,  1), ( 1,  1),
                                  (-1,  0),           ( 1,  0),
                                  (-1, -1), ( 0, -1), ( 1, -1);

Define a view NEIGHBOUR_COUNT that joins the mask to LIFE, returning a count of the number of neighbours that are alive for each (X, Y) position that has any living neighbours (even if the position itself is dead).

-- New syntax to drop a view but not give an error if it doesn't exist
drop view if exists NEIGHBOUR_COUNT;

create view NEIGHBOUR_COUNT as
     select X + DX as X, Y + DY as Y, count(*) as NEIGHBOURS
     group by 1, 2;

To calculate the position in the next step we need an INSERT to add cells to the current position in LIFE that become alive, and a DELETE to remove cells from the current position in LIFE that die. Kognitio provides MERGE allowing us to do the delete and insert atomically, which is essential because each makes changes that will affect the other. Each time the MERGE is run, LIFE will be updated to the next step. (Alternatives to MERGE would be to add a step number column to LIFE or use an additional working table.)

-- Update LIFE to the next step
merge into LIFE LIFE
on LIFE.X = N.X and LIFE.Y = N.Y
-- Use DELETE to remove cells that die
when matched and N.NEIGHBOURS not in (2, 3) then delete
-- Use INSERT to add cells that become alive
when not matched and N.NEIGHBOURS = 3 then insert values (N.X, N.Y);

Here is the output for the glider for the first few steps. After the fourth step the glider has been replaced by a copy of the original moved one position diagonally.

     <null>|Top left is at (-1, 3)
          2| **
          1| * *
          0| *

     <null>|Top left is at (-2, 3)
          2|  **
          1| **
          0|   *

     <null>|Top left is at (-2, 3)
          2| ***
          1| *
          0|  *

     <null>|Top left is at (-2, 4)
          3|  *
          2| **
          1| * *

     <null>|Top left is at (-2, 4)
          3| **
          2| * *
          1| *

Finally, the Kognitio extension EXEC can be used to have the database repeat the merge an arbitrary number of times rather than having to repeatedly run it manually.

-- Execute the MERGE four times
exec select
       'merge into LIFE LIFE
        using NEIGHBOUR_COUNT N
        on LIFE.X = N.X and LIFE.Y = N.Y
        when matched and N.NEIGHBOURS not in (2, 3) then delete
        when not matched and N.NEIGHBOURS = 3 then insert values (N.X, N.Y)'
     from values between 1 and 4;

Disk Space Connector


Posted By : Mark Marsh Comments are off
disk space connector
Categories :Blog, Data Science

The Kognitio external table connector scripts are a powerful way of accessing data from a wide variety of sources. Any data that can be presented as a stream of CSV information can be brought into Kognitio in a massively parallel operation and presented as a table. There is no restriction on the programming language – as long as the code can run on Linux, read stdin and write stdout and stderr it can be used. Typically BASH, Python or Perl will be used but for more advanced connectors, we have used Java and C / C++.

This blog post presents a very basic connector that loads data from the Linux df command (which displays disk usage figures). It runs a single connector on each node to avoid retrieving duplicate information and prepends the hostname so we can see which node the figures relate to.

We are using a BASH script in this example which simply transforms the output of the df command into csv as shown.

Standard df

$ df -P

Filesystem 1024-blocks Used Available Capacity Mounted on
/dev/sda1 138840528 42271252 93702660 32% /
tmpfs 24705916 0 24705916 0% /dev/shm
cm_processes 24705916 187624 24518292 1% /var/run/cloudera-scm-agent/process


Processed into CSV
$ df -P | grep -v Filesystem | tr -s ' ' ',' | cut -d, -f1-4,6

To create the connector we need to wrap this in some code that runs the df when the connectors LOAD method is called.

#!/bin/bash -l

if [ "$WX2_METHOD" = "LOAD" ] ; then
  df=`df -P | grep -v Filesystem | tr -s ' ' ',' | cut -d, -f1-4,6`
  for o in $df ; do
    echo "${hn},${o}"

This code is placed on the AP of a standalone instance of Kognitio or the edge node of a Hadoop instance and the following SQL is used to create the connector (you will need “CREATE CONNECTOR” privileges).

create connector Disk_Space_Connector
  command '/vol1/services/wxadmin/Disk_Space_Connector'
  target 'max_connectors_per_node 1';

We can then build a table to access the connector. The table creation statement defines the column names and types and the database does the necessary conversions from CSV.
create external table disk_space (
hostname varchar,
filesystem varchar,
size_kb bigint,
used_kb bigint,
available_kb bigint,
mounted_on varchar
) from Disk_Space_Connector;

And finally select from the table.

select * from disk_space order by hostname, filesystem;

Which produces.

hadoop02-rack3-enc2-10 /dev/sda1 30465020 11693060 18142960 /
hadoop02-rack3-enc2-10 /dev/sdb1 220248772 67738168 141322540 /hdfs
hadoop02-rack3-enc2-10 cm_processes 24705916 183780 24522136 /var/run/process
hadoop02-rack3-enc2-10 tmpfs 24705916 12935056 11770860 /dev/shm
hadoop02-rack3-enc2-11 /dev/sda1 30465020 12214992 17621028 /
hadoop02-rack3-enc2-11 /dev/sdb1 220248772 65651144 143409564 /hdfs
hadoop02-rack3-enc2-11 cm_processes 24705916 183196 24522720 /var/run/process
hadoop02-rack3-enc2-11 tmpfs 24705916 12935056 11770860 /dev/shm
hadoop02-rack3-enc2-12 /dev/sda1 30465020 11474152 18361868 /
hadoop02-rack3-enc2-12 /dev/sdb1 220248772 68018648 141042060 /hdfs
hadoop02-rack3-enc2-12 cm_processes 24705916 183216 24522700 /var/run/process
hadoop02-rack3-enc2-12 tmpfs 24705916 12935056 11770860 /dev/shm

If the table definition does not match the data, an error will be returned – we have removed the hostname column to show this.
create external table disk_space_err (
filesystem varchar,
size_kb bigint,
used_kb bigint,
available_kb bigint,
mounted_on varchar
) from Disk_Space_Connector;

select * from disk_space_err order by filesystem;

HY000[Kognitio][WX2 Driver][hdev:6550] ET010F: Invalid external data record, see SYS.IPE_CONV_ERROR for details (tno 6074)

To get details of the error, query the SYS.IPE_CONV_ERROR table as directed in the error message (remember to substitute your tno into the query).

select column_no, record_char_pos, message, cast(record as varchar(10000)) as record from SYS.IPE_CONV_ERROR where tno = 6074;

Which returns (turned sideways for clarity).

MESSAGE IE112B: Invalid field value for column type or incorrect field terminator
RECORD hadoop02-rack3-enc2-11,/dev/sda1,30465020,12218452,17617568,/

This shows that the problem is with column 2 at position 23 in the record. Looking at the table definition, we are expecting a bigint in the second column and we have the value “/dev/sda1” which can’t be converted, hence the error.

If the data source is not comma separated, there are many options that can be used to transform the data – see the quick reference sheet “Target String Format Attributes.pdf” and chapter 8 of “kognitio-Guide-v80100.pdf” for more information (all documentation is available on the forum page Click here, for the latest version 8 documentation).

This blog entry has only scratched the surface of what can be done with an external table connector. As well as the LOAD method we utilised above there are methods called when the connector is created and methods to allow a subset of columns to be returned which can improve performance when only a small number of columns are required from a wide table.

Of particular note is a method called LOADCO which can be used to coordinate the loading of multiple files or other objects by sending messages to the LOAD methods. I recently used it to create a connector that fetches a list of files via SSH from a server and then distributes the file names across multiple LOAD processes to load them in parallel. This can give very good performance – our testing settled on 12 concurrent load processes (6 each on 2 nodes) as being the most the file server could handle but we tried 100 and the only problem was with the file server struggling to serve the files.

The BASH script and SQL used above are available on the forum at Click here for Disk Space Connector script

If you have any questions or ideas for connectors please post them on the forum or in the blog comments.

Mark Marsh – Senior Technical Consultant

Avoiding algorithm stagnation…A KACE STORi in full


Posted By : Paul Groom Comments are off
Categories :Data Science
Tags :  

Chak Leung – KACE data scientist

There are a plethora of mathematical techniques openly available to us with open source code easily available and packaged, ready for us to exploit. However, as Data Scientists, we often have favoured algorithms and it is easy for us to become stagnated. Additionally common approaches to data parallelisation can have their drawbacks and if our choice of mathematical technique doesn’t mesh well with this, what can we do about it?

At Kognitio we believe it is essential to incubate new ideas and like to reach out to academia where new techniques are being developed everyday but may be underutilised due insufficient data/infrastructure, something we have plenty of.

Venue and partner

In late August the KACE team went to the STOR-i centre for doctoral training at Lancaster University for a one day workshop event where we presented some of these problems.

STOR-i (Statistics and Operational Research) is a joint venture between the departments of Mathematics and Statistics and Management Science offering four year programmes with industry partner involvement.

The workshop was an excellent introduction to practical “big data” problems for the attendees including undergraduates, PhD students and post-doc researchers and gave Kognitio an opportunity to get fresh input into existing clustering problems including initialising clusters and amalgamating cluster models produced in parallel.

When data parallelisation isn’t enough?

Why is parallelising algorithms a problem? Take, for example, the K-means clustering algorithm where we might need to ask ourselves:

  1. Can I fit all the data within the memory of one single model?
  2. Would the extended run time be significant?
  3. Could the initial clustering start points be better placed for fewer iterations reducing the run times?
  4. Is there a way to combine all the parallel models into one at the end to combat the extended run time?

Data parallelisation answers the first two questions; we can duplicate the algorithm across threads and bypass the requirement of fitting all the data into memory on one node by fanning the data out and with parallel platforms, such as Kognitio, this also massively reduces execution times as well.

But this causes problems: how can we implement synchronised smarter start points and how do we combine the results from all the threads?


In the example above notice that if we simply split the data up and create a model in each thread we generate three models but really what we are after is one model. Here the K-means problem becomes much more complicated as the movement of clusters (from the initial start points) in one thread are not known to the other threads. This means there is the possibility of very different final cluster results for each thread in our parallel algorithm.

There are existing methods of model aggregation such as bootstrap aggregation or even taking smaller random samples to avoid parallelism altogether but these have their own issues they are based on estimations with no certain guarantees.

Thus, in search of a new and better solution, the challenges that we proposed to the attendees at STOR-I were:

  • Can we define some metrics or aggregation procedures so that the parallelism can be kept intact and results combined?
  • Could the steps of the clustering algorithm be parallelised so that every node can be simultaneously exploited to contribute to the results of each stage?


The main focus points were initialisation: better starting points than just random ones and the aggregation of cluster models to produce one large one.


Every group recognised that initialisation of the cluster starting points were important leading to many efforts in this area. Their ideas included

  • Initialising clusters based on variations from global means uniformly distributed on a hypersphere. This involved setting the initial clusters closer to the global mean of the data and projecting to areas where they would fit best. It was acknowledged distributing clusters would be difficult at higher dimensions than the presented 2-dimensions.
  • Genetic algorithm – split up the data initially where each centroid candidate is compared with the current selection, removing the lower quality ones in favour of potentially higher quality centroids. The problem here is that this might be computationally expensive for an initialisation step as with many heuristics.
  • Produce more clusters than needed – overfitting. Split the data into N threads trying to put closer data together, i.e. in the same thread then on each thread to over fit the data. Later iterations or aggregation could then bring similar clusters together.

Cluster aggregation

There were some interesting attempts to define characteristics of parallelised clusters such as size, variability, inter and inner cluster distances. The notion of cluster density to compare clusters was defined as:

cluster-compare-densityWhere x is the observation and c is the centroid.

From here the problem is reduced by re-clustering any data that does not fit a defined density criteria. This is repeated to produce the final model. This is an interesting metric which may be useful in comparing the characteristics of clusters produced in parallel by different node threads to determine whether they are suitable to be joined together.

A MapReduce approach was also suggested whereby the data is initially split and broadcast to all mappers with distances being computed in parallel. This allows the construction of global variant centers and assigns each data observation to a cluster. Finally in the reducing/combine stage, collect the sum of observations assigned to each cluster for the distribution across all clusters. This implementation works very well with Kognitio’s MPP platform and we have already tested an implementation of this.

It was acknowledged that re-joining the clusters after parallel iterations is not a trivial process. A super K-means process where the clusters from the parallel processes are themselves clustered was suggested by a few groups and this would be interesting if the overfitting approach was used in the initial parallel processes.

Another suggestion that was interesting was to carry out secondary passes where similar clusters are forced into the same processing thread to see if the clusters are then brought together.

A final suggestion was made for a different approach to the clustering algorithm itself – expectation maximisation (EM), an unsupervised learning algorithm which lifts the limits of spherical clusters for more diversity using maximum likelihood estimates.

Future Work

The KACE team are looking to implement some these ideas into our existing k-means clustering and investigate using the EM algorithm too. Our top priority will be robust amalgamation of the results from each thread before attempting to initialise better starting points.