CMSC 818e Day 6

Reading time ~2 minutes

These are notes taken during CMSC 818e: Distributed And Cloud-Based Storage Systems. Course webpage and syllabus here.

Day Six

Properties in distributed systems are defined by per-process checkpoints. Replicas inside the system take period snapshots, and they communicate by sending “tokens” that communicate to other replicas that they’ve taken a snapshot (which triggers the receiving replica to itself take a snapshot and send out tokens). We assume that the system is piecewise deterministic and that all communication happens through “channels” that are reliable and ordered. Therefore, the state of the system is a combination of each replica’s independent snapshot and the state of the channels (snapshots plus any messages received before receiving other replica’s snapshot tokens). This model, developed in the 90’s, is how we synchronize write and read messages in a distributed system. We can use a similar model to see that global properties are held by the entire system.

Logical time

But how do we know what happens first? Here we need to distinguish between logical time and wall-clock time - the assumption is generally that syncing up wall-clocks is not feasible. We usually us NTP to synchronize machines. There’s also “Truetime” (used by Spanner in Google - which cheats because it uses atomic clocks & satellites); which synchronizes widely distributed systems within nanoseconds. Amazon has something similar called “Time Sync”.

So, failing atomic clocks and suspended disbelief, no matter how close we get to synchronizing events on distributed machines, you’re not going to be able to guarantee that you can order every pair of events that needs to be ordered. So instead we need logical time, which takes into account how processes communicate with each other.

  • Program order (aka reads-from order)
  • How to ensure partial ordering?
  • Lamport clock (“scaler” clock)
    • Assume communicating processes, no communication outside the messages, reliability and order.
    • Increment on each event (either an event, a send, or a receive)
    • Carry on a message
    • Receive time: c = max(msg, local) + d
    • if c(e1) < c(e2) THEN EITHER e1 HB e2 OR concurrent
    • always gets it right if they aren’t concurrent (and if they are concurrent, it doesn’t matter)
  • vector clock?
    • Vi[j] is Pi's view of Pj's events
    • always gets it right and tells us for sure if certain operations are concurrent

Key-value store

Understand that in the context of our hypothetical distributed system, the “processes” we’re referring to are replicas; also, imagine that we have full replication, which means that every object we’re talking about is on every replica (this is different from sharding, which is partial replication). Each message is a write to a key in the key-value store.

But how do we do garbage collection? When can the first process get rid of old data? Vector clock time is no longer sufficient, because we need to be able to do the pairwise maximums of all of the times of all the processes…matrix clocks!

With matrix clocks, update on receive by Pi: M[i][i]++ M[i][j] Vj = max (M[n][j])

Matrix clocks show ordering if concurrent (like the scalar clock), tell if concurrent (like the vector clock), and tell the minimum time of the entire system (minimum of each column). Perfect for garbage collection. But … very few systems use matrix clocks because N^2 gets big quickly. For garbage collection in a distributed context, oftentimes use vector clock, just communicate as much as possible about the lowest counts for each replica. It’s a bit conservative, but it works.

Sharding the Shards

In "Sharding the Shards: Managing Datastore Locality at Scale with Akkio", Annamalai, et al. present Akkio, a locality management service...… Continue reading