Skip to main content

How Zillow Guarantees In-Order Delivery of Messages from Multiple Sources

Consuming messages from multiple sources in a scalable service sharing the same ScyllaDB cluster can be problematic, especially when those sources provide data from two queues each and cannot deliver guaranteed in-order messages. At Zillow, they provide the ScyllaDB write timestamp and employ a couple other tricks to provide correct and consistent data to our consuming services and avoid doing transactions. Additionally, their data model is a simple compound key that contains a large binary Avro document.

 

 

***

 

In the above video, Dan Podhola, Principal Software Engineer at Zillow, speaks on  Optimistic Concurrency with Write-Time Timestamps. 

Dan began by describing his team’s role at Zillow. They are responsible for processing property and listing records — what is for sale or rent — and mapping those to a common Zillow property IDs, then translating different message types into a common interchange format so their teams can talk to each other using the same type of data.

They are also responsible for deciding what’s best to display. He showed a high-level diagram of what happens when they receive a message from one of their data providers. It needs to be translated into a common output format.

“We fetch other data that we know about that property that’s also in that same format. We bundle that data together and choose a winner — I use the term ‘winner’ lightly here — and we send that bundle data out to our consumers.”

The Problem: Out-of-Order Data

The biggest problem that Zillow faced, and the reason for Dan’s talk, was that they have a highly threaded application. They receive message queues from two different data producers, and the messages can be received out of order. They cannot go backwards in time to look into the data, or, as Dan put it, “it would cause bad things to happen.” It would throw off analytics and many other systems that consume the data.

As an example, they might get a for-sale listing message that says a certain property has changed price. If they processed that sold message first, but then processed a price change which showed it was still for sale, it would create confusion.

It might require manual intervention to identify and fix, such as from a consumer complaining. “That’s not a great experience for anybody.” Especially not their analytics team.

This is the visual representation Dan showed to depict the problem. The message that was generated at time 2 has a later timestamp and should be accepted and processed. “It goes all the way through the system. At that point, we publish it. It’s golden. That’s what we want.” However an earlier message from Time 1 might be received from a backfilled queue. If it goes through the whole system and gets published, it’s already out-of-date.

“What’s the solution? You may already be thinking, ‘Well, just don’t write the older message.’ That’s a good solution and I agree with you. So I drew a diagram here that shows a decision tree that I added in the middle. It says ‘if the message is newer than the previous message then write it, otherwise don’t — just skip to the fetch.’”

Problems with Other Methods

“Now you might be wondering, ‘Well, why fetch it all? Just stop there.” If you’re grabbing a transaction lock and doing this in a SQL server type of way, that’s a good solution.

However, Dan then differentiated why you wouldn’t just implement this in the traditional SQL manner of doing things. That is, spinning up a SQL server instance, grabbing a lock and running a stored procedure that returns a result code. “Was this written? Was this not written? Then act on that information.”

The problem with the traditional SQL method, Dan pointed out, is the locking, “you don’t have the scalability of something like ScyllaDB.” He also pointed out that one could also implement this with ScyllaDB’s Lightweight Transactions (LWT), but those require three round-trip times of a normal transaction, and Zillow wanted to avoid them for performance reasons.

“So the second method is ‘store everything.’ That means we get every record and we store it, no matter when it was given to us. Then we have the application layer pull all the data at once. It does the filtering and then does action on it.”

However, this does lead to data bloat, “The problem with that is we have to trim data because we don’t need it all. We just didn’t want to have to deal with running a trim job or paying to store all that data we just didn’t need. Our system just needs to store the latest, most current state.”

Insert Using Timestamp


“Then finally there’s the ‘overwrite only-if-newer, but without a lock method.’”

The downside with this, Dan pointed out, is if you don’t know yet if your write has written to the database, actually committed the write, then you have to republish the results every single time.

Dan then showed the following CQL statement:

INSERT … USING TIMESTAMP = ?

“‘USING TIMESTAMP’ allows you to replace the timestamp that ScyllaDB uses [by default] to decide which record to write, or not write, as it were. This is normally a great thing that you would want. You have a couple of nodes. Two messages come in. Those messages have different timestamps — though only the latest one will survive [based on] the consistency.”

“You can use USING TIMESTAMP to supply your own timestamp, and that’s what we’re doing here. This allows us to pick a timestamp that we decided as the best arbiter for what is written and what is not. The upside is that it solves our problem without having to use LWT or an application lock, [or a cache] like Redis.”

“The downside is that we don’t know if the record was actually written to the database. So the workaround for that is to pretend that the record always has been written.”


“We have to ensure that we get the latest documents back when we ask for all the documents. For us to do that we have to write it QUORUM and we have to read it QUORUM — for at least just these two statements. This allows us to avoid conditions where you might write just to a single node and then you fetch — and that should even be in QUORUM. If you don’t write a QUORUM you might pull from the other two nodes and get an older result set, which would mean that we could flicker and send an old result that we don’t want. QUORUM here solves that problem. We didn’t use ALL because if one of the nodes goes down we wouldn’t be able to process at all.”

Putting It All Together


“This is my message flow diagram. You can see at the top there’s a Data Service that has its own data set. It publishes to its own real time, its own backfill queue. There’s a more complicated service down at the bottom that mostly streams data in from Flink but has a manual data retrieval service that allows people to correct issues or manually request individual messages or properties. And then there’s a Flink state on the bottom that uses some magic to fill a backfill queue.”

“Another interesting thing here is that the listing processor — which is the service that my team has written — takes messages from those queues. It actually publishes them differently. It’ll publish real-time messages to a Kinesis stream and it’ll publish backfill messages to an s3 bucket.”

“We have some additional required producer actions here. These different systems operate in different ways and might theoretically have older timestamps. Backfill would be a great example. You generate a message. It goes into your backfill bucket and then you publish a real time change. Well, if we happen to be processing the real time change, and then the backfill later, that message would have an older timestamp.”

“Now if they were produced at roughly the same time and their system had different ways internally of how it generated that message then the message generation time could be wrong.

So we’ve required our producers to give us the timestamp in their messages that is the time that they fetch the data from their database. Because their database is their arbiter of what’s the most recent record that was in there that they send to us.”