Transaction Ordering and Replication
In this blog post I want to build on the example given by John and dig a little deeper into how VoltDB ensures global ordering and replication in the face of failures.
I am going to drill down on the following paragraph
Site 1 and site 4 both receive the message from the initiator. They then each do the same work in parallel. Since each individual site performs transactions one at a time, the transaction information is put into a queue at site 1 and queue at site 4. Work is pulled from the queues at each site and executed one at a time. These queues are actually quite special, as they ensure all transactions are run in a single global ordering. Another blog post will explain how ordering works in more detail.
The queue being referred to is implemented by RestrictedPriorityQueue (RPQ). This queue will not release a transaction for execution unless two properties have been ensured.
The first property is that every transaction released by the RPQ is globally ordered. That means that there is a guarantee that no transaction with an earlier txn-id will be received later from any initiator in the system. Because txn-ids generated by an initiator are monotonically increasing and TCP orders messages from an initiator; it is guaranteed that txn-ids seen from an initiator will also be monotonically increasing. The RPQ can determine that a transaction is globally ordered if the txn-id of the transaction is < the minimum of the txn-ids seen across all initiators. Initiators regularly send txn-ids to all partitions in the system (and by extension to the RPQs) when distributing transactions, or via heartbeat messages when there are no transactions to distribute.
The second property is that every transaction released by the RPQ is safely replicated to all partitions (and partition replicas) that are involved in the transaction. Partitions ack heartbeat messages from initiators with the txn-id contained in the heartbeat and also piggyback acks with responses to stored procedure invocations. Initiators track the the minimum txn-id acked across all partitions as well as for the replica set for each partition and include that value with heartbeats and stored procedure invocations.
The advantage of this scheme is that it introduces no extra messaging overhead in a busy system. The distribution of stored procedure work and responses to said work, contain the ordering and safety information necessary for the cluster to make progress. Once ordering and safety are ensured it is possible for partitions to execute all ordered/safe procedures independently without further coordination or traditional 2PC.
Fault Detection and Handling
VoltDB maintains an in memory catalog that is replicated at every node. The catalog describes the topology of the cluster including the number of hosts, partitions, replicas of partitions, and their up/down status. When a host fails the cluster transactionally agrees on the set of failed nodes (and by extension the set of failed partitions and initiators) and then deals with the outcome by dropping in-flight transactions that were partially scheduled by the failed initiator and rolling back multi-partition transactions that were partially executed.
If the set of survivors does not contain a replica of every partition then the cluster shuts down because it can no longer provide a consistent view of the database. If a potential network partition is detected and the survivor set does not contain the blessed host then the cluster takes a snapshot (since it can take a consistent snapshot) and then shuts down in order to prevent the cluster from splitting into two viable but diverging instances.
Node failure is detected via a heartbeat mechanism. All nodes are connected in a TCP mesh topology and messages are sent regularly as part of the global ordering process. Every attempt to send a message includes a check to see if a message has not been received for some amount of time. If a node or connection is unresponsive then a fault is signaled.
Every node then broadcasts the set of failed nodes that it is aware of and the last txn-id seen for each partition from each of the failed initiators and the last committed multi-partition txn-id. If a node receives a failure message containing a node failure that it has not observed then the node restarts the failure handling process by rebroadcasting the failure messages with the new larger failed node set. If a node receives a failure message that does not contain the same set of failed nodes it is aware of then the message is dropped because there is no agreement on the set of failed nodes. The node that sent the incongruent set will eventually acknowledge the full set of failures and rebroadcast with the correct set. Eventually the cluster converges on a single set of failed nodes (because nodes stop failing or all failures are acknowledged) with each surviving node having sent a failure message to every other survivor.
At this point each node updates its catalog to reflect the agreed upon state and each partition decides how to resolve in-flight transactions and in progress multi-partition transactions.
Using txn-ids from the failure messages broadcast by the survivors it is possible for each node to determine the last safe txn-id seen from each of the failed initiators. The RPQ at each partition is then walked and every transaction from a failed initiator with a txn-id > the safe txn-id is dropped. In theory these transactions could be replicated without the initiator, but there would be no way to return the response to the client since the initiator and its connection to the client are gone.
If a multi-partition transaction was in progress and the coordinator was on a failed host then the transaction has to be rolled back or committed. If the last committed multi-partition txn-id received in the broadcast failure notice from any node is >= equal to the txn-id of the in progress multi-partition transaction then the commit notice was received by at least one node and it is committed everywhere, otherwise it is rolled back. If the in progress multi-partition transaction was being coordinated by a survivor then the coordinator must remove any expected dependencies from the failed nodes so that the transaction does not stall waiting for dependencies that will never come.
Hopefully this clears up how VoltDB currently implements transaction ordering and replication. This approach scales well and has been tested up to 30 nodes and 3.4 million transactions/sec on gig-E.