Skip to content
btoddb edited this page Feb 18, 2011 · 20 revisions

Motivation

Need a highly available message queuing system. Initial requirements are the following:

  • Highly available - essentially all the benefits of storing data in Cassandra
  • Ability to handle large spikes of messages coming into the queue. Processing of messages may lag, but don't want to turn away new messages
  • Support simple commit and rollback semantics. (No ACID, no XA, no isolation required.)
  • A message is delivered once and only once unless a rollback happens
  • Can handle many writers and many readers working autonomously in a distributed environment
  • Need statistics regarding the number of messages waiting, delivered, etc.
  • Easy administration and statistical reporting
  • And of course, it should be fast!

Design Challenges

By definition a queue suggests an ordering, usually FIFO, but not always. (For instance, priorities can cause messages to be popped in a different order than pushed.) Let's assume for now that we want FIFO without any priorities and we only have a single pusher and single popper. Cassandra provides two methods for ordering data, OrderPreservingPartitioner (OPP) or columns within a single row. Let's choose the "column" approach with the RandomPartitioner (RP) because the nature of the queue doesn't suggest a good distribution across the cluster to support OPP. So a naive approach is to simply use a single row, one column per message. Assign a time-based UUID to the message and order the columns by TimeUUIDType (which also gives a unique message identifier.) This gives us a FIFO queue! This idea is the basis for the design, however using a single row creates its own challenges:

  • A single row will only be distributed across N (ReplicationFactor) number of nodes. So if I have an 8 node cluster with N=3, then a single row only uses 3 nodes, the other 5 are wasted. If the cluster is hosting a lot of queues then the other nodes can be used, but not by a single queue
  • As columns are added and removed from a single row, the deleted columns create a substantial slow down increasing linearly over time
  • Cassandra does provide atomicity for operations on a single row, so having all pushers and poppers hitting the same row must create a bottleneck. I am not sure of the bottleneck, because the previous point prevents any substantial volume to test atomicity contention
  • Pop clients need to lock the row, pop the message, delete the message, update stats, and then release the lock. This creates a big bottleneck for poppers making it impossible to get decent performance
  • Likewise with push clients, if every push requires a row lock to update stats, then the bottleneck will be just as bad as pop clients.
  • And let's not forget, Cassandra doesn't provide any of this "locking" I keep talking about. It will need to be implemented on top of Cassandra

It is common practice to have more than one client pushing messages onto the queue, and likewise more than one client popping messages from the queue. This isn't a problem for a single row per queue (except atomicity contention), but does provide a challenge for tracking stats. If every push must lock the queue, insert the message, increase a counter, then unlock, a big bottleneck will be formed. Not to mention with more clients means more load, and now we're back to only N nodes will be used for the queue.

Design Goals

To overcome the previous challenges I propose the following goals (requirements):

  • Strict FIFO is not required - messages can (and will) be popped "out of order." In other words, the oldest message is not always popped before newer ones
  • Messages cannot be starved and will be popped close to other messages pushed around the same time
  • A push onto the queue should not require any locking and create as little database contention as possible
  • Use many cassandra rows to distribute load across the cluster

Pipes

The solution to most of the challenges is to use multiple rows per queue, or as I call them, Pipes. I'll define a pipe as

pipe : A group of 2 rows, one for messages "waiting to be popped" and another for messages "pending commit". Each column in the pipe represents a single message, and a message can only be "waiting" or "commit pending". The Cassandra schema defines a column as time-based UUID which gives us chronological ordering and a unique identifier for each message. Pipes are assigned a time-based UUID as an identifier for the duration of its life.

The first and most obvious win is that by using multiple pipes, the queue's data can be distributed around the Cassandra cluster. But how we use them is important. We can't simply create a new pipe, push messages to it until it is "full", then create another pipe and start over. This does distribute the data over time, but does not distribute the load.

So let's assume we can use P pipes for pushing and popping, and that each pipe can only have a max of M messages. When the pipe is full, the pusher that caused the overflow creates a new pipe and moves on. Since this is a distributed system each client must be able to discern which pipes are being used for pushing and which pipes are being used for popping. This can be determined by querying a column family (which means it must be updated) or maybe an algorithm could be used to provide a "good guess" of where to start looking. First thought, this seems like a lot of record keeping that could get messy in a distributed system, especially if trying to keep accurate stats.

Let's stay on the multiple pipes, but let's now assume that each "push" client creates its own pipe. What are the advantages:

  • No database contention for pushing to a pipe, each client has its own row
  • Statistics can easily be kept for each pipe, and aggregated later if needed
  • It is easy to determine when to create a new pipe. The limitation on pipe size is purely based on the number of messages pushed to it, doesn't matter if they have been popped or not. This becomes easy when using "pipe per client" because the client can simply count the number of messages it has pushed and create a new pipe when a threshold is met.

What are the disadvantages:

  • Pop clients must know about the available pipes, so there must be a well known location for storing a descriptor for the pipe
  • The pipe descriptor must constantly be updated while actively pushing to it. This doesn't require any reads, and writes are super fast in Cassandra, so shouldn't be a problem
  • Pop clients must coordinate themselves such that only one of them is reading from a given pipe at a given time. This is a distributed system so there is no coordinator, the responsibility lies with each client. From a read perspective, this design requires some overhead of distributed locking and tracking of pipe status, but from a push perspective it is straightforward and writes are fast.

Cassandra Config

QueueDescriptors ColumnFamily

Contains attributes of the queue shared by all clients and rarely if ever updated.

Type = standard

Column comparator = BytesType

Key = queue name

PipeDescriptors ColumnFamily

Contains attributes of the pipe shared by all clients. Keeps track of push count, push/pop status, etc

Type = standard

Column comparator = BytesType

Key = queue name

QueuePipeCnxn ColumnFamily

Connects a pipe to a queue. One row per queue, so can create a slight bottleneck in high volume situations.

Type = standard

Column comparator = TimeUUIDType

Key = queue name

QueueStats ColumnFamily

Contains current statistics for the queue.

Type = standard

Column comparator = BytesType

Key = queue name

Fixed number of columns: totalMsgsPushed, totalMsgsPopped, pushPerSec, popPersec

pushPerSec and popPerSec are rolling averages to capture the current rate, not the rate since the queue was created. This is a simple scheme for keeps stats, but could be extended to keep rows for certain times of the day, etc.

PipeStats ColumnFamily

Contains the status of a pipe and the number of messages pushed to it. As you can guess this data is updated with every push of a new message, so it will not benefit from caching, unless the pipe is PF (see below) before popping starts.

Type = standard

Column comparator = TimeUUIDType

Key = queue name

Column per pipe referenced by the pipe's ID, which is a time-based UUID. The value is a complex type containing Status, MsgCount, and timestamp separated by "/" (ie "E/2201/201009082176.123"). Available statuses are: Push Active (PA), Push Finished (PF), Finished and Empty (E). The timestamp is used by the statistics calculations.

MessageDescriptors

Meta-data and the actual payload of a message in the queue. Only message IDs are moved between "waiting" and "pending" rows.

Type = standard

Column comparator = BytesType

Key = Msg ID (UUID)

<QueueName>_Waiting ColumnFamily

Holds pipes containing messages waiting to be popped for queue, . Since there is a column family created for each queue, Cassandra 0.7 is required.

Type = standard

Column comparator = TimeUUIDType

Key = pipe ID (UUID)

Column per message referenced by the message's ID, which is a time-based UUID. The value of the column is the actual message.

<QueueName>_Pending ColumnFamily

Holds pipes containing messages waiting to be committed/rollback from, . Since there is a column family created for each queue, Cassandra 0.7 is required. The life of a message in the ColumnFamily is hopefully very short, and is only read on rollback, therefore caching isn't very useful.

Type = standard

Column comparator = TimeUUIDType

Key = pipe ID (UUID)

Column per message referenced by the message's ID, which is a time-based UUID. The value of the column is the actual message.

Pushing to the Queue

Since the design states, "assign a single pipe per pusher", pushing to the Queue is straightforward. The algorithm looks something like this:

  1. Client calls "push" with new message data
  2. Create CassQMsg object assigning a message ID (time-based UUID)
  3. If a new pipe is needed (because max messages reached in pipe or pipe timeout has been exceeded), create a new one and mark the old one as "push finished"
  4. Using batch_mutate insert new message into "Waiting" pipe and update the pipes "push count" in PushStats ColumnFamily

The algorithm doesn't require any synchronization locks which keeps pushing a new message fast, straightforward, and easily distributed. Pipes are kept until their statistics are aggregated into QueueStats ColumnFamily.

Popping from Queue

Popping is slightly more complicated, but still fairly straightforward. The biggest difference between pushing and popping is locking. Pop clients must synchronize on a common lock so multiple poppers are not accessing the same pipe at the same time. Pop clients can operate in single JVM or distributed mode. If single JVM mode, it is assumed all pop clients are running in a single JVM and normal Java synchronization constructs can be used. If operating in distributed mode (the intended environment) a distributed locking mechanism, a coordinator, is required as the common locking mechanism (like ZooKeeper or Hazelcast.) The algorithm looks something like this:

  1. A client is ready to pop a message from the queue and calls "pop"
  2. If we need to read the list of pipe descriptors, do it. At most maxPopWidth number of descriptors are loaded
  3. Iterate over the pipe descriptors (from where we left off last time) trying to lock a pipe for reading
  4. If pipe cannot be locked (because another client is using it) then try the next pipe in round robin order
  5. Read the oldest message from the pipe
  6. If message found, move from "waiting" to "pending" ColumnFamily, release lock, and return message. Finished
  7. If reach this step then no message available in pipe, check pipe's status.
  8. If pipe status is "push finished" or maxPushTimeOfPipe is exceeded, then mark pipe as "finished and empty"
  9. Release lock, try again until all pipes exhausted

Update: With this method the lock contention using ZooKeeper was too much and performance suffered a lot. Did not try with other mechanisms like HazelCast. Instead resorted to having each popper grab a pipe and use it until it was expired or full. The advantage is that there is a lot less lock contention, which yields better performance. The disadvantage is that messages can be processed in "less" FIFO order than before - depending on how many pushers and poppers there are in the system.

Message Ordering

The ordering outlined below is based on the initial popper design and does not accurately reflect the code. I've left it this way because the algorithm is more desirable and interesting than the actual implementation

As mentioned before, this queue is designed to be "mostly FIFO". What does that mean? It means that the oldest message is not always processed when a client pops a message. However messages are not starved, and to be more clear (vague) a message that is newly pushed to the queue will not be processed before a really old message. How long an "old" message can sit in the queue being passed over is dependent on the maxPopWidth and the number of push clients.

As an example, let's assume we have 8 push clients, therefore 8 pipes in "push active" status at any one time - call them P1 thru P8. (There could be more pipes in "push finished" and "finished and empty" status.) Let's also assume we have a maxPopWidth of 8 (the maxPopWidth should be equal to or larger than the number of expected push clients.) Clients are pushing messages to the queue in parallel and clients are trying to pop messages in parallel. Push clients start pushing messages to their pipes in the following chronological order:

M1:P4 M2:P1 M3:P6 M4:P1 M5:P1 M6:P7 M7:P8 M8:P1 M9:P2 M10:P3, M11:P5 ....

Clients start trying to pop messages at this time, they read the pipe descriptors in order P1 thru P8, and start trying to lock pipes in round robin order and read the messages. Each client will try to lock pipe P1, then P2, etc so the messages will come out approximately in the following "round robin pipe order":

M2, M9, M10, M1, M11, M3, M6, M7 .. (at this point round robin start again at P1) .. M4, M5, M8

As you can see M1, the oldest message, isn't processed until the 4th message. The guarantee that a new message isn't processed before a really old message is because each push client fills its pipe before starting a new one, and pipes are processed in chronological order by pop clients.

With the current design, messages can be processed in FIFO order by using a single pusher, single popper, and maxPopWidth = 1. The design could be changed such that the entire queue is locked for every pop and always taking the oldest message from the pipes with "push active" status. maxPopWidth would be required to be always the same as the number of push clients.

Edge Cases

There are some scenarios that create complexity:

What if a push client terminates without marking the pipe as "push finished"?

The time the pipe is created is stored in the pipe descriptor and the queue descriptor contains the property, maxPushTimeOfPipe. This information is used by the pop client and the "pipe reaper" to cleanup stale pipes.

What if a message is popped, but never committed or rolled back?

The queue descriptor defines, transactionTimeout, which is the amount of time a message can be in "pending". An external process is required to scour the pipes looking for "pending" messages that are past the timeout period and "roll them back" into "waiting". (The rollback process merely removes the message from the pending row and pushes a new message. This might seem dumb, and it might be, but it also has the benefit of guaranteeing no "poison" messages.

Statistics

Currently only 3 useful stats are accessible: total number of pushed messages, total number of popped messages, and the queue depth.

Since the system is distributed, and the goal is to reduce lock contention, it is nearly impossible to reconcile stats. Each time a client wants to know these stats, they must be calculated and can take a few seconds if the queue depth is deep. Since other clients may be pushing and popping at the same time the calculations are only accurate from the perspective of the client requesting them. If another client is asking for the same stats it is highly unlikely they will reconcile on an active system.

Atomicity

The queue relies upon the atomicity of writing data associated with a single key. (However, Cassandra operations are never isolated - to use RDBMS terminology.) This holds true for a single "insert" as well as "batch_mutate". And to be clear, batch_mutate is only atomic for a particular key, doesn't matter if there are mutations to multiple ColumnFamilies, as long as the key is the same. So if the code deletes from QueueName_Waiting and inserts into QueueName_Pending, this is atomic, but not isolated.

Tuning

As a general rule, more pop clients are needed than push clients. Push is very fast and pop can take a while, but of course this depends on your Cassandra cluster and usage pattern.

  • It is a good rule of thumb to have more poppers than pushers. Popping messages is slow compared to how fast they can be pushed. How many poppers you need is dependent on your volume
  • The amount of time required to pop a message is for the most part only based on the max size of a pipe. Therefore you can add more poppers to increase throughput without significantly slowing the time to pop a message
  • If you have N pushers and M poppers such that M > N, and the poppers are keeping in pace with the pushers; you'll find that N-M poppers are constantly checking for a new pipe. Nothing to worry about, and in fact when a pusher needs to create another pipe, the extra poppers will start using it immediately
  • Between the time you pop a message and it is committed is the "transaction time". If you find messages being automatically rolled back, you should extend the transaction timeout or optimize the operation your performing on the data.

Notes About the Distributed Queue

In a distributed system about the only resource that must be constant is time. It is very important to have an accurate system clock that all clients and cassandra nodes synchronize against. There is no central server nor coordinator for the queuing system. The clients performing the pushing and popping are the "system"! Each client has the ability to reap stale pipes, rollback old messages, and rollup stats. If clocks are allowed to drift on the clients, this will cause very weird results that are hard to troubleshoot. For instance, client X pops a message, M1, and starts an operation on it. Client Y awakes and thinks that message M1's transaction has timed out because Y's clock has drifted too far into the future. Here is an article explaining the issue in a real world problem regarding the use of VMs for clients

http://ria101.wordpress.com/2011/02/08/cassandra-the-importance-of-system-clocks-avoiding-oom-and-how-to-escape-oom-meltdown/

The good news is that since the queuing system is completely distributed, there is no single point of failure and scaling is relatively easy! Just add more clients, watch the JMX stats on the cassandra cluster, and let it ride!

Enjoy!