Technology

Cassandra performance tuning

From 8k to 29k writes per second

We took IP2Location version DB11 database. It holds few millions of IPv4 ranges which should unwrap onto over 2 billion addresses. Such number of entries is actually not a big deal for PostgreSQL RDBMS or Apache Cassandra distributed databases system. However there is an issue of ingestion speed. The question is how quick I can programmatically compute IP addresses for IP ranges and insert them in persistant storage.

PostgreSQL can hold easily around 10TB of data in single node. It can hold even more especially if divided into separate partitions/tables or use multiple tablespaces with separate drives. In case of Apache Cassandra it is said that it can hold also over 10 TB per node. So lets check it out!

Cassandra keyspace and table definition

We start with defining basic non-redundant keyspace. Non-redundand because we want to know about data structure layout first.

create keyspace ip2location
with replication = {
  'class': 'SimpleStrategy',
  'replication_factor': 1
};

Table definition with only single partition key.

create table hosts_by_country_and_ip
(
  country varchar,
  ip varchar,
  region varchar,
  city varchar,
  lati varchar,
  longi varchar,
  izp varchar,
  timezone varchar,
  primary key ((country), ip)
);

With single partition key (country) and 6 Cassandra nodes (with default settings from Docker container) we get 8 – 9k inserts per second. It is way below of expected 20 – 40k inserts on modern hardware. However test setup is not run on modern hardware, it is actually run on HP Z800.

create table hosts_by_country_and_ip
(
  country varchar,
  ip varchar,
  region varchar,
  city varchar,
  lati varchar,
  longi varchar,
  izp varchar,
  timezone varchar,
  primary key ((country, region, city), ip)
);

Now with different table structure approach. Instead of single partition key, there are now 3 partition keys (country, region, city). Cassandra calculates partition key with token function which gives numeric result. By default each Cassandra node holds 16 tokens (token value ranges to be precise). Every insert calculates token value and coordinator places insert to appropriate node which holds the same token – holds data related to this token. With such a structure change, we get 16k inserts per second.

Why is that you may ask? It depends on data being inserted actually. So our ingestion Python program reads PostgreSQL IPv4 range and generates IP address list to insert into Cassandra. We did not put any kind of ORDER clause, so it may be or not be “random” data distribution. Effectively Cassandra write coordinator needs to solve too many conflicts/blockings because there are multiple runners inserting the same partition key data. If we spread out data we minimize conflicts solving computational time.

Cassandra node configuration

At first we tried to adjust table structure to increase writes performance. Next thing we can try on different hardware and with modified runtime parameters. So instead of HP Z800 (with SATA SSD drives) we run single Cassandra node on machine with single NVME drive. Created the same keyspace and the same table. With no change. Maybe there was a change, as single node outperformed 6 nodes, but we expect to have write thruput increased even more. No matter if this was 6, 12 or 24 Python ingestion instances running. We get up to 16k writes per second.

However additionally we set the following configuration values:

concurrent_reads: 64
concurrent_writes: 128
concurrent_counter_writes: 64

And increased MAX_HEAP_SIZE from 4GB to 8GB. We did not follow too much the documentation, but rather common-sense to increase write concurrency with arbirtary value. Run 24 Python ingestion instances and finally we got better results – 24k writes per second.

Measuring thruput

How to even measure write thruput easily? Cassandra do not offer any kind of analytics at that level. You need to manually created counters or use Spark to calculate count on token list. We choose the first approach. There are external counters which are updated with every Python runner. This way based on boolean logic, we can also achieve selecting data for concurrent processing.

Client-limited performance

Running only 12 Python ingestion instances gives over 18k writes per second. So there is link between server capabilities and running tasks inserting data. Increasing runners to 48 instances gives 28k writes per second. With 64 instances we got 29 – 30k writes per second. And I think is the max from the hardware and configuration at this time.

Reads vs Writes

There is one more thing to consider regarding performance. During intensive writes (e.g. 12 ingestion instances) CPU utilization oscillates around 200 – 300% (on 4 core, 8 thread CPU). However during intensive reads, with lets say, 32 parallel processes running, we can see CPU utilization between 400 and 700%. It is the level unseen during writes.

...

# Function processing single partition
def process_single_partition(r):
    cluster = Cluster(['0.0.0.0'])
    session = cluster.connect() 
    session.execute("USE ip2location")
    stmt = session.prepare("SELECT ip as c FROM hosts_by_country_and_ip WHERE country=? AND region=? and city=?")
    res = session.execute(stmt, [r[0], r[1], r[2]])   
    counter = 0 
    for row in res:
        counter = counter + 1
    print(r, counter)

# Multiprocessing invocation
Parallel(n_jobs=32)(delayed(process_single_partition)(obj) for obj in records)

As both PostgreSQL and Cassandra sessions and not pickable by Parallel, we need to either use connection pool or simply create connection with each run. For this simple demo it is enough.

Due to distributed nature of Cassandra, we cannot simply run COUNT query. Even when using all the required partition columns. In this example those are country, region and city. Even with usage of all of them we will get timeouts running COUNT, and even if we do not run inserts during out counting procedure. Most probably, if we would enlongen particular timeout parameters we could get more results. Still using COUNT we will get some results, but it will not be reasonable way to actually count our records number. The proper one is to build structures holding data counters or use Cassandra diagnostics.

Notice: other possiblities to count are DSE Analytics, nodetool tablestats. DataStax Bulk Loader seems to be also interesting one.

More nodes and more drives

It is said that Cassandra scales easily just by adding additional nodes with additional drives preferably. Generally speaking it is true, but should also consider such parameters like server heap size, memory available and CPU performance. Even better performance will be achieved if storage backend will be constructed from multiple drives giving more IOPS. It is also important to remember that contrary to RDBMS, in Apacha Cassandra (Dynamo concept) we need to design structures by query usage and not by data organization.