Skip to main content

Using the Rust Driver to Interact with a ScyllaDB Cluster

 

The ScyllaDB team has been working hard to develop the scylla-rust-driver, an open-source ScyllaDB (and Apache Cassandra) driver for Rust. It’s written in pure Rust with a fully async API using Tokio. You can read more regarding its benchmark results, plus how our developers solved a performance regression.

TAKE THE RUST LESSON IN SCYLLA UNIVERSITY

We recently added a new ScyllaDB University lesson on using the new driver for interacting with a ScyllaDB cluster. In this post, I’ll cover the essential parts of the lesson in which you’ll build a simple Rust application that will connect to a ScyllaDB cluster and perform basic queries. The lesson on ScyllaDB University also includes the code used, which you can run yourself.

Why Rust?

Before we dive into the new Rust lessons, let’s address the obvious question: why Rust?

Rust is a modern, performant language that is gaining popularity and becoming more widely used. It’s a systems programming language. However, you can pretty much develop anything with it. It’s built to run fast and safe, preventing most crashes since all memory accesses are checked. It also eliminates data races.

Moreover, Rust also implements a unique and interesting async model. Namely, Rust’s futures represent computations, and the responsibility to move these asynchronous computations forward belongs to the programmer. That allows creating async programs in a very efficient way, minimizing the need for allocations, since the state machine represented by Rust’s async functions is known at compile time.

Now, onto the new Rust lesson…

Creating the Data Schema

The sample Rust application for our lesson will be able to store and query temperature time-series data. Each measurement will contain the following information:

  • The sensor ID for the sensor that measured the temperature
  • The time the temperature was measured
  • The temperature value

First, create a keyspace called tutorial:

CREATE KEYSPACE IF NOT EXISTS tutorial
  WITH REPLICATION = {
    'class': 'SimpleStrategy',
    'replication_factor': 1
};

Based on the desired query being the temperature reported by a specific device for a given time interval, create the following table:

CREATE TABLE IF NOT EXISTS tutorial.temperature (
  device UUID,
  time timestamp,
  temperature smallint,
  PRIMARY KEY(device, time)
);

LEARN MORE ABOUT BASIC DATA MODELING IN SCYLLADB

The application you’re building will be able to query all temperatures measured by a given device within a selected time frame. That’s why you will use the following SELECT query:

SELECT * FROM tutorial.temperature
WHERE device = ?
AND time > ?
AND time < ?;

Where ? will be replaced with actual values – device ID, time-from, and time-to, respectively.

Connecting to the Database with Rust

The application name is temperature, and the required dependencies are defined in the Cargo.toml file:

uuid = {version = "0.8", features = ["v4"]}
tokio = {version = "1.1.0", features = ["full"]}
scylla = "0.3.1"
futures = "0.3.6"
chrono = "0.4.0"

Where:

  • uuid – Package that provides UUID.
  • tokio – Provides the async runtime to execute database queries in.
  • scylla – Rust ScyllaDB/Casandra driver.
  • chrono – Package for working with time.

The main function works asynchronously by using tokio. The following makes sure it returns the result:

#[tokio::main]
async fn main() -> Result<()> {
...
}

The file /src/db.rs will hold the logic for working with the ScyllaDB instance. The first step is to establish a database session.

use scylla::{Session, SessionBuilder};

use crate::Result;

pub async fn create_session(uri: &str) -> Result<Session> {
  SessionBuilder::new()
    .known_node(uri)
    .build()
    .await
    .map_err(From::from)
}

To initialize the session:

#[tokio::main]
async fn main() -> Result<()> {
  println!("connecting to db");
  let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
  let session = db::create_session(&uri).await?;
  todo!()
}

Notice the .await after create_session. That’s because async functions return a Future. Futures can be await-ed inside other async functions to get their actual value, which in this case is Result<Session, Error>. And lastly, with the ? after await we are making sure that if we get back an error instead of a session from create_session, the error will be propagated up, and the application will terminate, printing the error.

Next, the file /src/db.rs, defines functions for creating the keyspace and table to store temperature measurements. You’ll use queries for creating the keyspace and a table:

use scylla::{IntoTypedRows, Session, SessionBuilder};
use uuid::Uuid;

use crate::{Duration, Result, TemperatureMeasurement};

static CREATE_KEYSPACE_QUERY: &str = r#"
  CREATE KEYSPACE IF NOT EXISTS tutorial
  WITH REPLICATION = {
    'class': 'SimpleStrategy',
    'replication_factor': 1
  };
"#;

static CREATE_TEMPERATURE_TABLE_QUERY: &str = r#"
  CREATE TABLE IF NOT EXISTS tutorial.temperature (
    device UUID,
    time timestamp,
    temperature smallint,
    PRIMARY KEY(device, time)
  );
"#;

pub async fn initialize(session: &Session) -> Result<()> {
  create_keyspace(session).await?;
  create_temperature_table(session).await?;
  Ok(())
}

async fn create_keyspace(session: &Session) -> Result<()> {
  session
    .query(CREATE_KEYSPACE_QUERY, ())
    .await
    .map(|_| ())
    .map_err(From::from)
}

async fn create_temperature_table(session: &Session) -> Result<()> {
  session
    .query(CREATE_TEMPERATURE_TABLE_QUERY, ())
    .await
    .map(|_| ())
    .map_err(From::from)
}

The file /src/db.rs, defines the insert query. ScyllaDB will use each value as a replacement for ?:

static ADD_MEASUREMENT_QUERY: &str = r#"
INSERT INTO tutorial.temperature (device, time, temperature)
VALUES (?, ?, ?);
"#;

pub async fn add_measurement(session: &Session, measurement: TemperatureMeasurement) -> Result<()> {
  session
    .query(ADD_MEASUREMENT_QUERY, measurement)
    .await
    .map(|_| ())
    .map_err(From::from)
}

Reading Measurements

Next, the select-query logic is defined in the /src/db.rs module:

static SELECT_MEASUREMENTS_QUERY: &str = r#"
  SELECT * FROM fast_logger.temperature
    WHERE device = ?
      AND time > ?
      AND time < ?;
"#;

pub async fn select_measurements(
  session: &Session,
  device: Uuid,
  time_from: Duration,
  time_to: Duration,
) -> Result<Vec<TemperatureMeasurement>> {
  session
    .query(SELECT_MEASUREMENTS_QUERY, (device, time_from, time_to))
    .await?
    .rows
    .unwrap_or_default()
    .into_typed::<TemperatureMeasurement>()
    .map(|v| v.map_err(From::from))
    .collect()
}

The important steps are:

  • Make a select query with the specified parameters (device ID, start and end date).
  • Await the response and convert it into rows.
  • The rows might be empty. unwrap_or_default ensures that you will get an empty Vec if that’s the case.
  • Once the rows are obtained, convert each row by using into_typed::<TemperatureMeasurement>(), which will use the FromRow derive macro.
  • Since into_typed returns a Result, that means converting each result might fail. With .map(|v| v.map_err(From::from)) you ensure that each row’s error will be converted to the generic error defined in /src/result.rs.
  • Finally, collect saves the iterated values to a vector.

Now, back in /src/main.rs you can see the rest of the main function, imports, and modules:

use uuid::Uuid;

use crate::duration::Duration;
use crate::result::Result;
use crate::temperature_measurement::TemperatureMeasurement;

mod db;
mod duration;
mod result;
mod temperature_measurement;

#[tokio::main]
async fn main() -> Result<()> {
  println!("connecting to db");
  let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
  let session = db::create_session(&uri).await?;
  db::initialize(&session).await?;

  println!("Adding measurements");
  let measurement = TemperatureMeasurement {
    device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
    time: Duration::seconds(1000000000001),
    temperature: 40,
  };
  db::add_measurement(&session, measurement).await?;

  let measurement = TemperatureMeasurement {
    device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
    time: Duration::seconds(1000000000003),
    temperature: 60,
  };
  db::add_measurement(&session, measurement).await?;

  println!("Selecting measurements");
  let measurements = db::select_measurements(
    &session,
    Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
    Duration::seconds(1000000000000),
    Duration::seconds(10000000000009),
  )
  .await?;
  println!("     >> Measurements: {:?}", measurements);

  Ok(())

}

Additional Rust Learning Opportunities

  • Check out the full Rust lesson on ScyllaDB University to see the full code, and run the example.

TAKE THE RUST LESSON IN SCYLLA UNIVERSITY