Using the Rust Driver to Interact with a ScyllaDB Cluster
The ScyllaDB team has been working hard to develop the scylla-rust-driver, an open-source ScyllaDB (and Apache Cassandra) driver for Rust. It’s written in pure Rust with a fully async API using Tokio. You can read more regarding its benchmark results, plus how our developers solved a performance regression.
TAKE THE RUST LESSON IN SCYLLA UNIVERSITY
We recently added a new ScyllaDB University lesson on using the new driver for interacting with a ScyllaDB cluster. In this post, I’ll cover the essential parts of the lesson in which you’ll build a simple Rust application that will connect to a ScyllaDB cluster and perform basic queries. The lesson on ScyllaDB University also includes the code used, which you can run yourself.
Why Rust?
Before we dive into the new Rust lessons, let’s address the obvious question: why Rust?
Rust is a modern, performant language that is gaining popularity and becoming more widely used. It’s a systems programming language. However, you can pretty much develop anything with it. It’s built to run fast and safe, preventing most crashes since all memory accesses are checked. It also eliminates data races.
Moreover, Rust also implements a unique and interesting async model. Namely, Rust’s futures represent computations, and the responsibility to move these asynchronous computations forward belongs to the programmer. That allows creating async programs in a very efficient way, minimizing the need for allocations, since the state machine represented by Rust’s async functions is known at compile time.
Now, onto the new Rust lesson…
Creating the Data Schema
The sample Rust application for our lesson will be able to store and query temperature time-series data. Each measurement will contain the following information:
- The sensor ID for the sensor that measured the temperature
- The time the temperature was measured
- The temperature value
First, create a keyspace called tutorial:
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
Based on the desired query being the temperature reported by a specific device for a given time interval, create the following table:
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
LEARN MORE ABOUT BASIC DATA MODELING IN SCYLLADB
The application you’re building will be able to query all temperatures measured by a given device within a selected time frame. That’s why you will use the following SELECT
query:
SELECT * FROM tutorial.temperature
WHERE device = ?
AND time > ?
AND time < ?;
Where ?
will be replaced with actual values – device ID, time-from, and time-to, respectively.
Connecting to the Database with Rust
The application name is temperature, and the required dependencies are defined in the Cargo.toml file:
uuid = {version = "0.8", features = ["v4"]}
tokio = {version = "1.1.0", features = ["full"]}
scylla = "0.3.1"
futures = "0.3.6"
chrono = "0.4.0"
Where:
- uuid – Package that provides UUID.
- tokio – Provides the async runtime to execute database queries in.
- scylla – Rust ScyllaDB/Casandra driver.
- chrono – Package for working with time.
The main
function works asynchronously by using tokio
. The following makes sure it returns the result:
#[tokio::main]
async fn main() -> Result<()> {
...
}
The file /src/db.rs
will hold the logic for working with the ScyllaDB instance. The first step is to establish a database session.
use scylla::{Session, SessionBuilder};
use crate::Result;
pub async fn create_session(uri: &str) -> Result<Session> {
SessionBuilder::new()
.known_node(uri)
.build()
.await
.map_err(From::from)
}
To initialize the session:
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
todo!()
}
Notice the .await
after create_session
. That’s because async
functions return a Future. Futures can be await
-ed inside other async
functions to get their actual value, which in this case is Result<Session, Error>
. And lastly, with the ?
after await
we are making sure that if we get back an error instead of a session from create_session
, the error will be propagated up, and the application will terminate, printing the error.
Next, the file /src/db.rs
, defines functions for creating the keyspace and table to store temperature measurements. You’ll use queries for creating the keyspace and a table:
use scylla::{IntoTypedRows, Session, SessionBuilder};
use uuid::Uuid;
use crate::{Duration, Result, TemperatureMeasurement};
static CREATE_KEYSPACE_QUERY: &str = r#"
CREATE KEYSPACE IF NOT EXISTS tutorial
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
"#;
static CREATE_TEMPERATURE_TABLE_QUERY: &str = r#"
CREATE TABLE IF NOT EXISTS tutorial.temperature (
device UUID,
time timestamp,
temperature smallint,
PRIMARY KEY(device, time)
);
"#;
pub async fn initialize(session: &Session) -> Result<()> {
create_keyspace(session).await?;
create_temperature_table(session).await?;
Ok(())
}
async fn create_keyspace(session: &Session) -> Result<()> {
session
.query(CREATE_KEYSPACE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
async fn create_temperature_table(session: &Session) -> Result<()> {
session
.query(CREATE_TEMPERATURE_TABLE_QUERY, ())
.await
.map(|_| ())
.map_err(From::from)
}
The file /src/db.rs
, defines the insert query. ScyllaDB will use each value as a replacement for ?
:
static ADD_MEASUREMENT_QUERY: &str = r#"
INSERT INTO tutorial.temperature (device, time, temperature)
VALUES (?, ?, ?);
"#;
pub async fn add_measurement(session: &Session, measurement: TemperatureMeasurement) -> Result<()> {
session
.query(ADD_MEASUREMENT_QUERY, measurement)
.await
.map(|_| ())
.map_err(From::from)
}
Reading Measurements
Next, the select-query logic is defined in the /src/db.rs
module:
static SELECT_MEASUREMENTS_QUERY: &str = r#"
SELECT * FROM fast_logger.temperature
WHERE device = ?
AND time > ?
AND time < ?;
"#;
pub async fn select_measurements(
session: &Session,
device: Uuid,
time_from: Duration,
time_to: Duration,
) -> Result<Vec<TemperatureMeasurement>> {
session
.query(SELECT_MEASUREMENTS_QUERY, (device, time_from, time_to))
.await?
.rows
.unwrap_or_default()
.into_typed::<TemperatureMeasurement>()
.map(|v| v.map_err(From::from))
.collect()
}
The important steps are:
- Make a select query with the specified parameters (device ID, start and end date).
- Await the response and convert it into rows.
- The rows might be empty.
unwrap_or_default
ensures that you will get an emptyVec
if that’s the case. - Once the rows are obtained, convert each row by using
into_typed::<TemperatureMeasurement>()
, which will use theFromRow
derive macro. - Since
into_typed
returns aResult
, that means converting each result might fail. With.map(|v| v.map_err(From::from))
you ensure that each row’s error will be converted to the generic error defined in/src/result.rs
. - Finally,
collect
saves the iterated values to a vector.
Now, back in /src/main.rs
you can see the rest of the main
function, imports, and modules:
use uuid::Uuid;
use crate::duration::Duration;
use crate::result::Result;
use crate::temperature_measurement::TemperatureMeasurement;
mod db;
mod duration;
mod result;
mod temperature_measurement;
#[tokio::main]
async fn main() -> Result<()> {
println!("connecting to db");
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = db::create_session(&uri).await?;
db::initialize(&session).await?;
println!("Adding measurements");
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000001),
temperature: 40,
};
db::add_measurement(&session, measurement).await?;
let measurement = TemperatureMeasurement {
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
time: Duration::seconds(1000000000003),
temperature: 60,
};
db::add_measurement(&session, measurement).await?;
println!("Selecting measurements");
let measurements = db::select_measurements(
&session,
Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?,
Duration::seconds(1000000000000),
Duration::seconds(10000000000009),
)
.await?;
println!(" >> Measurements: {:?}", measurements);
Ok(())
}
Additional Rust Learning Opportunities
- Check out the full Rust lesson on ScyllaDB University to see the full code, and run the example.
TAKE THE RUST LESSON IN SCYLLA UNIVERSITY