Skip to main content

Moving from Cassandra to ScyllaDB via the Apache Spark Migrator

ScyllaDB and Spark

 

This post introduces the ScyllaDB Migrator project – a Spark-based application that will easily and efficiently migrate existing Cassandra tables into ScyllaDB.

Over the last few years, ScyllaDB has helped many customers migrate from existing Cassandra installations to a ScyllaDB deployment. The migration approach is detailed in this document. Briefly, the process is comprised of several phases:

  1. Create an identical schema in ScyllaDB to hold the data;
  2. Configure the application to perform dual writes;
  3. Snapshot the historical data from Cassandra and load it into ScyllaDB;
  4. Configure the application to perform dual reads and verify data loaded from ScyllaDB;
  5. Decommission Cassandra.

The ScyllaDB Migrator project is meant to considerably speed up step 3; let’s see how it works!

1. An overview of the Migrator

The ScyllaDB Migrator project is a Spark-based application that does one simple task: it reads data from a table on a live Cassandra instance and writes it to ScyllaDB. It does so by running a full scan on the source table in parallel; the source table is divided to partitions, and each task in the Spark stage (refer back to Part 1 of the series to review these terms) copies a partition to the destination table.

By using Spark to copy the data, we gain the ability to distribute the data transfer between several processes on different machines, leading to much improved performance. The migrator also includes several other handy features:

  • it is highly resilient to failures, and will retry reads and writes throughout the job;
  • it will continuously write savepoint files that describe which token ranges have already been transferred: should the Spark job be stopped, these savepoint files can be used to resume the transfer from the point at which it stopped;
  • it can be configured to preserve the WRITETIME and TTL attributes of the fields that are copied;
  • it can handle simple column renames as part of the transfer (and can be extended to handle more complex transformations – stay tuned!).

Hopefully, these features will convince you to use the migrator to load data into your next ScyllaDB deployment. Let’s see how you’d set it up.

There’s one downside to using the migrator: since it reads the data directly from the Cassandra database, it will compete for resources with other workloads running on Cassandra. Our benchmarks, however, show that since there’s much more work to be performed on the Spark and ScyllaDB side while copying the data, Cassandra remains relatively undisturbed.

2. Using the Migrator

First off, since the migrator runs on Spark, you need a Spark deployment.

2.1 Deploying Spark

Luckily, Spark is pretty simple to set up. For a basic setup on a cloud provider, you can set up a few instances, unpack the Spark distribution on them and use these commands to start the Spark components:

Note that this configuration starts 8 Spark workers on each slave node, where each worker can execute 2 tasks in parallel. We’ll discuss these configuration options in further detail in the “Benchmarking the Migrator” section.

There are a few other options available for deploying Spark – you could use a managed solution, like Amazon EMR or Google Cloud Dataproc. You could also deploy Spark and the migrator directly onto a Kubernetes cluster. These are beyond the scope of this article, but if you’d like to contribute instructions for running the Migrator in these setups, please feel free to send a pull request.

The capacity required for the cluster depends on the size of the ScyllaDB cluster. We can offer the following rules of thumb:

  • allocate 1 Spark CPU core for every 2 ScyllaDB cores;
  • allocate 2GB of RAM for every Spark CPU core.

So, for example, if your ScyllaDB cluster has a total of 48 CPU cores, you’d need a total of 24 CPU cores and 48GB of RAM for Spark. You could use 3 c5.2xlarge instances for that cluster on AWS.

IMPORTANT: verify that the Spark nodes are added to a security group that can access port 9042 on Cassandra and ScyllaDB.

2.2 Running the Migrator

Now, once you have your Spark nodes up and running, ssh into the Spark master node and clone the Migrator repositoryInstall sbt (Scala’s build tool) on the machine, and run the build.sh script in the Migrator’s repository.

Next, duplicate the config.yaml.example file to configure your migration. The file is heavily commented, and we’ll discuss how to tune the parameters shortly, but for now, change the hostkeyspace and table settings under source and target to match your environment.

NOTE: the table and keyspace you’re copying into must already exist on ScyllaDB before the migration starts, with the same schema.

With the config file created, you can now start the migrator. But wait! Don’t just start it in the SSH session; if you do that, and your session gets disconnected, your job will stop. We recommend that you install tmux (and read this excellent tutorial) and run the migrator inside it. This way, you can detach and reattach the shell running the migrator job.

Once inside tmux, launch the job:

2.3 Keeping track of the transfer process

By default, Spark is pretty chatty in its logs, so you’ll need to skip past some of the lines that start scrolling through. The migrator will print out some diagnostics when it starts up; for example:

This output will help you make sure that the schema is being retrieved correctly from Cassandra and being processed correctly for writing to ScyllaDB. When the actual transfers start, you’ll see output similar to this:

You can keep track of the progress using the Spark application UI, conveniently located at http://<spark master IP>:4040. The progress of the job in the UI reflects the actual progress of the transfer.

We also recommend monitoring ScyllaDB closely during the transfer using the ScyllaDB Monitoring dashboards. The Requests Served per Shard metric should stay constant; if it fluctuates, this may indicate that ScyllaDB is not being saturated and more parallelism is required (see the tuning section below). The Write Timeouts per Second per Shard metric should be mostly zero. If it starts to rise, it may mean that you’re using too much parallelism for the migrator.

2.4 Stopping and resuming the job

As the migrator works, you’ll see it periodically log lines similar to these:

Every 5 minutes, the migrator will save a snapshot of the config file to the directory specified by the savepoints.path setting. Apart from your settings, the config file will also contain a list of token ranges that have already been transferred.

If you need to temporarily stop the job, or in the unlikely event that the migrator crashes, you could use these savepoint files to resume the migrator from where it stopped. This is done by copying one of these savepoint files and using it as the config for the job:

2.5 Timestamp preservation

By default, the Migrator will preserve the WRITETIME and TTL attributes for migrated rows. This is great for creating exact copies of your data if you rely on these timestamps in your application. There’s one caveat: this option is incompatible with tables containing compound data types (lists, maps, sets); so you’ll need to disable it with preserveTimestamps: false in the config file in that case.

This restriction might be lifted in the future.

2.6 Renaming fields

As mentioned, you could also rename columns while transferring the data. For example, if you’d like to copy the column called orig_field into the column dest_field, you can use this setting in the config file:

You could of course add more such renames by adding more items to the list:

 

 

Note, again, that the table on ScyllaDB must already be defined with the new column names.

2.7 Tuning the migrator

Lastly, here are a few tips for tuning the migrator’s performance.

We recommend starting 2 Spark workers on each node, but to start them with SPARK_WORKER_CORES=8. This will allow Spark to run 8 transfers in parallel on each worker, which will improve the throughput of the migration.

To take advantage of the above, it is extremely important to use a high source.splitCount value. This value determines how the source table is divided between the Spark tasks. If there are less splits than worker cores, some cores will be idle. On the other hand, if there are too many splits such that each one contains less than 1MB of data, performance will suffer due to the scheduling overhead.

The default setting of source.splitCount: 256 should be sufficient for migrations of tables larger than 10GB. However, for larger transfers and larger Spark cluster sizes, you may need to increase it.

And finally, if the ScyllaDB table you’re copying into is not in use, it is possible to gain a nice speed up (about 25%) by disabling compaction on it. You should of course re-enable it when the migration is finished. Do note that disabling compaction may cause ScyllaDB to use a larger amount of disk space should any write retries occur, so keep an eye on the free disk space.

Disabling and re-enabling compaction is done using the following CQL statements:

3. Benchmarking the Migrator

So let’s see some numbers – how does the migrator fare against the sstableloader on an identical setup? We’ll start with the migrator first. We used the following infrastructure for the benchmarks:

  • 6 Cassandra 3.11 nodes, running on i3.4xlarge instances (a total of 96 cores and 732GB of RAM);
  • 3 ScyllaDB 3.0-RC3 nodes, running on i3.4xlarge instances (a total of 48 cores and 366GB of RAM);
  • 3 Spark 2.4 nodes, running on r4.4xlarge instances (a total of 48 cores and 366GB of RAM).

The test data used consisted of 1TB of random data, generated as 1Kb rows with a replication factor of 3.

The first run used 23 Spark workers, started with SPARK_WORKER_CORES=2, for a total of 46 parallel transfers. This configuration achieved an average transfer rate of 2GB/minute. We can see, in the screenshots from the ScyllaDB Monitoring dashboards, that ScyllaDB is not being saturated:

2GB/minute

Restarting the workers with SPARK_WORKER_CORES=8, for a total of 184 parallel transfers, achieved a transfer rate of 3GB/minute – a 50% increase! We can see from the same graphs that ScyllaDB is now being saturated much more consistently:

3GB/minute

Disabling compaction also helped squeeze out some more performance, for a transfer rate of 3.73GB/minute:

3.73GB/minute

Increasing the concurrency to keep the load steadier did not help, unfortunately, and only resulted in write timeouts.

4. Summary

In this post, we’ve seen how you could deploy and configure Spark and the ScyllaDB Migrator and use them to easily transfer data from Cassandra to a ScyllaDB deployment. We encourage you to kick the tires on this project, use it to perform your migrations and report back if you hit any problems.