Heart of the SwarmKit - Store, Topology & Object Model by Aaron Lehman, Andrea Luzzardi and Stephen Day (Docker)

Written by GemmaG (Gemma Gordon)
Published: 2016-10-06 (last updated: 2016-10-07)

Outline

SwarmKit is a toolkit for orchestrating distributed systems at any scale. Includes primitives for node discovery, raft-based consensus and task scheduling.

There is an extensive ReadMe for SwarmKit, and see here for an interesting article on the relationship between Docker Swarm and Docker SwarmKit, and here for an experiential account of using SwarmKit in practice.

Essentially, the tool aims to provide easy management of large-scale distributed systems, and extends beyond Swarm to support different build environments. SwarmKit coordinates via the Raft Consensus Algorithm to maintain a consistent and replicated view of the cluster; uses mutual TLS for authentication and transport encryption; and offers node scheduling and monitoring, allowing tasks to be quickly redistributed from failing nodes to healthy nodes.


3 talks Stephen Day: data model in SwarmKit Andrea Luzzardi: architecture Aaron Lehmann: architecture

Stephen Day

Club Mate for the win! Mushroom hunter

Object Model

New orchestration features in docker 1.12 are built on top of swarmkit

simply a control system for your cluster: feedback control system.

  • desired state -> orchestrator -> cluster -> state at time

The simple system gets more complicated with clusters

want to minimise the difference between state at time and desired state = convergence.

Observability and controllability

Main problems. When you have a cluster of machines you have a continuum of data systems with variable observability. E.g. failed node has very limited observability, whereas connectivity state is easier to observe. User input observed via Raft.

Data model requirements

  • represent difference in cluster state
  • maximise observability
  • support convergence
  • be reliable and scalable

Services - a set of containers

  • express desired state of cluster
  • abstraction to control a set of containers
  • enumerates resources, network availability, placement
  • leave details of runtime to container process
  • implement these services by distributing processes across a cluster

Reconciliation

spec (desired state) -> object (current state) the loop continuously takes the current state and tries to match it to the desired state

Task model

Atomic scheduling unit of swarmkit - can be used for unikernels or vms - it's a one shot operation not specific to containers.

runtime

  • prepare: setup resources
  • start: start task
  • wait: wait until task exits
  • shutdown: stop task, cleanly

orchestrator will check presence of desired state and send to scheduler

Service spec input example service object example

Dataflow

user input (task spec) -> create tasks from that

Consistency model - for communicating tasks back to the cluster A strong field ownership model - only one component of the system can write to a field. This model can be inflexible in some cases, but

user - owns task spec (not touched in orchestrator, could be signed by author) orchestrator - owns ids, desired state allocator - owns networks

task status is shared - handoff for field ownership of task status. Once assigned it is owned by the worker

Manager -> pre-run -> worker manager: new, allocated, assigned

This process state is a remote operation but can be observed well using remote data structures.

Conclusion

Swarmkit doesn't quit - they don't error out, they just never converge.

Andreas Luzzardi

Topology management in SwarmKit

Core component of swarm a cluster = a bunch of machines that you want to connect to each-other

Workers and managers

push vs pull model

applies to any kind of distributed system. Both have pros and cons

  • Push worker registers to discovered system manager and then manager contacts later Good: provides better control over communication rate - managers decide when to contact workers Bad: requires a discovery mechanism - more failure scenarios, harder to troubleshoot

  • Pull worker connects to manager, registers itself good: simpler: wokers connect to managers bad: connection needs to be alive at all times

SwarmKit uses pull model favoured operational simplicity, but want to take back rate control

Rate Control

  • using heartbeats - ping and pong

  • manager dictates heartbeat rate to workers

  • rate is configurable

  • managers to agree on same rate by consensus (raft)

  • managers add jitter so pings are spread over time (avoid bursts)

  • Workloads

  • worker connects to manager, opens gRPC stream to receive workloads

  • manager buffers workloads and sends in batches to avoid congestion (either 100 or every 100ms whichever occurs first)

  • adds a little delay but gain efficiency in communication

Replication

running multiple managers - for distributed systems

  • worker can connect to any manager

  • followers will forward traffic to the leader (leader manager and follower managers)

  • followers mulitplex all workers to the leader using a single connectino

  • backed by gRPC channels (HTTP/2 streams)

  • reduces leader networkign load (learned from Swarm)

  • upon leader failure, a new one is elected

  • all managers start redirecting worker traffic to new one

  • transparent to all workers

  • manager sends list of all manager addresses to workers

  • when new manager joins, all workers are notified

  • upon manager failure, workers spread connections over time (gRPC exponential backup and reconnection jitter)

  • manager weights: not used extensively yet, but will be in future. Add a preference to prioritise/deprioritise managers as needed

Presence

stable presence difficult in a distributed environment

  • leader commits worker state into raft which propogates consensus to all managers, is recoverable in case of re-election
  • heartbeat TLLs kept in leader memory. Can't store last ping time in raft, so keep TLLs in memory heap. Upon leader failover, workers are given a grace period to reconnect (and can be flagged accordingly)

Using swarmkit for a year, can now easily scale to a dozen machines on modest hardware

Aaron

Distributed data store

We store in an object model

  • state of the cluster
  • user-defined config
  • organised into objects
    • cluster
    • node
    • service etc

Needs to be durable and redundant storage. Can use external service, but we decided to embed our own distributed data store

  • want to make it easy to set up (remove dependency on a key value store like zookeeper and other external services)
  • fewer round trips (maintain synchronised copies allowing quick lookups)
  • can maintain local indices

In-memory data structures

  • objects are protocol buffers messages
  • stored in go-memdb in-memory database (uses Radix tree for indexing)

radix tree - is a prefix tree - eg hello and helpful share the prefix "hel" - store objects and build multiple indices on top of the tree. Provides lightweight in-memory snapshots - as long as the nodes in the tree are immutable, we can have a consistent view of the entire tree from a root node.

Basis for building transactions

Transactions

  • transactional interface to read or write data in the store
  • read transactions are just atomic snapshots
  • write transaction: take a snapshot, make changes, replace tree root with modified tree's root (atomic pointer swap)
  • one write transaction at a time to prevent conflict
  • commit of write transaction blocks until changes are committed to Raft (acknowledged by cluster quorum)

transaction example code: read (find image) transaction example code: write (find image)

Watches

See if any changes to objects

  • code can register to receive specific creation, update and deletion events e.g. can call a watch function to check on a particular state e.g. > running. You can add lots of things to your list of selectors together with node IDs.

Replication

  • only Raft leader of cluster does writes. Prob with conflicts if all managers can write. Only commit when you know they won't conflict.
    • during write transaction, log every change as well as updating radix tree
    • transaction log is serialised and replicated through Raft
    • internal types are protobuf types which makes serialisation easy
    • followers replay the log entries into the radix tree (de-serialise and replicate)

Sequencer To prevent writes stepping on each-other and provide consistency

  • every object in the store has a version field which is checked when you update the object
    • version stores the raft index when object was last updated
    • updates provide a base version and rejected if out of date
    • similar to CAS
    • also exposed through API calls that change objects in the store (swarmkit or docker apis to fetch services and nodes - include version numbers)

Write batching

  • every write transaction involves a raft roundtrip to get consensus
    • costly to do many, and want to limit size of a write otherwise the write might take too long
    • SO use batch primitive which lets the store automatically split a group of changes across multiple nodes. It loops over the nodes it knows exists and applies changes where necessary

Future work

  • multi-valued indices: can't index multiple networks easily at the moment
  • watch API: as a docker api
  • version control: within the distributed data store to allow presentation of historical data. For each object stored, we would like to track all changes made to it over time.


Q&A

Q: I like the time-out response with the pong mechanism - how do you handle situations where messages are lost? Andrea: the manager is expecting the worker to ping after x time - with a huge buffer (x3) so we get to miss 2 messages before kicking out the node. The system is built to deal with it and will buffer out the heartbeat rate. All the messages are going out over TCP which is reliable.

Q: is there scope for using swarm api for monitoring nodes more formally - for things we would usually use cadvisor for? We are looking into that at the moment, and getting data in place in order to observe more interesting parts of the system. Issue is getting the data in place and making it available - more complex than making decisions on the data. What about just monitoring for metrics, eg like node exporter? We need more monitoring in place but we don't want to use that data for feedback.

Q: what is the biggest swarm deployment you have had and for how long did it run? Biggest one was right before dockercon with 10,000 machines overnight to see if it could sustain the load beforehand. A few community scale tests 2200, and now trying with 3000. Trying to reach the limits - AWS accounts limit has been hit - need to file requests to get 10,000 machines. Need a ton of servers to handle that many machines, but servers can easily manage 100,000 connections. No bottlenecks visible atm but going to run monthly scale tests to check this.

Q: datastore and watch api - the examples looked like you were monitoring individual nodes. will those fire if children are updated? They have filters to decide when to trigger events to be sent. Sometimes we put no filters on it to get an entire stream - e.g. all service updates. Have you thought about using that to model a hierarchical nested keyspace? Not specifically. The keys I showed in the presentation are a low level detail of how it's entered in the databse. In the API we address objects by their ids or their names so it's not a hierarchical keyspace in terms of how users experience it. We want to be able to index our data, and once you put it into a hierarchy, the queries become less flexible. For your use case we could group into boxes or cubes and query events based on that, but we aren't building a database, and I'm not sure about the application in swarmkit at the moment.

BOF session tomorrow.