applications, application inside the datacenter Example: Uber Ride uses multiple microservices for supply, demand, and incentives Applications typically use several microservices and systems Potentially using different databases as underlying storage Potentially written in different programming languages depending on group/organization Application developers write: Business logic Code to “glue” services together – using messaging APIs, services APIs, etc
at-most-once vs. at-least-once messaging Durability guarantees differ between systems, might not match specification APIs not specified Mostly defined by implementation Semantics not well defined, and may differ depend on implementations Might not actually work according to specification Composition doesn’t preserve isolated semantics Apache Kafka and Apache Zookeeper experimentally evaluated in isolation Under composition, Kafka loses data (Kingsbury 2015)
programming with co-designed runtime system Implemented as an Erlang library Dataflow programming model CRDTs: ADTs for distributed programming Data types containing a binary merge function for joining two replicas or updates Used for value convergence under divergence introduced by concurrent modifications Programming model assumes eventual consistency Updates will be eventually delivered Updates may be delivered more than once Updates may be delivered in any order
merge Replicas propagate full objects to one another Delta-state based (2015) Bounded join-semilattices with join as merge Replicas propagate minimal change representation (join-irreducible) based on knowledge Requires FIFO ordering between nodes Operation-based (2011) Concurrent updates are commutative Propagated updates are generated effectors including causal metadata Pure operation-based (2015) Sequential data type stored Operations applied to data type at causal stability (requires causality) Concurrent updates are commutative Propagated updates are original update Conflict-Free Replicated Data Types come in a variety of different implementations, depending on the guarantees the system can get from the underlying system and network topology. For our examples, we focus on state-based CRDTs.
join Two-Phase Set (2P-Set) Pair of G-Sets (add and remove G-Sets) Coordinate-wise union as join Observed-Remove Set (OR-Set) Tag updates with unique identifier marking add/remove Coordinate-wise union of add and remove tokens per item Counters Grow-Only Counter (G-Counter) Vector clock Coordinate-wise maximum Positive-Negative Counter (PN-Counter) Pair of vector clocks Coordinate-wise maximum Others Maps, Registers, Booleans, Graphs… We will consider only one implementation of CRDTs at the moment: the state-based implementation that are formalized as bounded join-semilattices. As a brief presentation, sets and counters only.
Derive a new set B = product(A, filter(P, A)) %% Create concurrent process %% to insert into set process do insert(A, random()) end Creates a join-semilattice representation of a set (formalized as CRDT) Creates a morphism to a join-semilattice B under image of product/filter Concurrent additions produce a ‘join’ with A’s state; triggers update of B Variables are asynchronously replicated across all nodes in the system
to a G-Set? If a single set removal: Can we specialize to a 2P-Set? If we don’t decrement the counter: Can we specialize to a G-Counter? Can we choose an appropriate CRDT based on some knowledge of the program? Currently, this must be specified manually by the programmer by declaring the type.
Only requires eventual delivery of messages Delta-state based (2015) Only transmits change representation as join- irreducible element Requires per-node FIFO delivery Ideal for partially connected peer-to-peer due to buffering for FIFO delivery Pure operation-based (2015) Requires causal delivery and protocol for causal stability which may be problematic in certain high- churn environments Most efficient in transmission and storage Can we choose an appropriate CRDT based on some knowledge of the program? Delta can be enabled at runtime through configuration parameter. Pure operation-based is WIP.
any consistency model Eventual consistency is assumed by the programming model May be replicated or not, depending on required durability guarantees Supported storage engines Riak Redis Erlang DETS/ETS Lasp KV provides replication between nodes Full replication assumed Partial replication provided based on “topics” Underlying storage for the Lasp programming system is pluggable. Topics provide “opt-in” replication for certain variables, grouped by a namespace.
Full mesh (via Distributed Erlang) Full mesh (without Distributed Erlang) Client-server Peer-to-peer (via HyParView-inspired protocol) Publish-subscribe (via external AMQP endpoint) Programming model is topology-agnostic Peer-to-peer topology will forward messages to other nodes on behalf of others to provide point-to- point messaging Assumes best-effort delivery Underlying storage for the Lasp programming system is pluggable. Standalone system for Erlang (build by our group for Lasp and adopted in industry) Partisan (our library) operates using a simple message forwarding API that Lasp builds on.
choice of the underlying data store and network topologies Relies on Conflict-Free Replicated Data Types to deal with inherent issues of EC Out-ot-order message delivery Duplicate message delivery Limits the types of applications that can be written with the programming model Eventual consistency is not strong enough for some operations Not all operations can have a sensible merge operation (assignment to register) Atomically changing multiple variables (transactions, etc.) Causality required for ordering of some operations (secondary indexing, references) Coordination required for precondition-based operations (check-and-set, etc.)
model. Application State Storage Data Types Distribution Layer TCP RabbitMQ P2P Full Storage Backend Traditional Storage Riak Ephemeral Storage Client/Server Riak Distributed Storage UDP
model. Application State Storage Data Types Distribution Layer TCP RabbitMQ P2P Full Storage Backend Traditional Storage Riak Ephemeral Storage Client/Server Riak Distributed Storage UDP Focus on the programming model and the open problems.
Microsoft Orleans, Bernstein et al., MSR-TR-2016-1001 Transactions integrated into an actor language on arbitrary storage Argus, Liskov, CACM 1988 Transactions via guardians in distributed language on CLU Causal ordering of updates for sequential reasoning about application code Quelea, Sivaramakrishnan et al., PLDI 2015 Assumes eventually consistent storage, providing causal guarantees on top Analysis to determine where coordination is required for precondition invariants CISE, Gotsman et al., POPL 2016 Assumes causality in underlying storage engine
possible to determine where sequential data type instances can be replaced with CRDTs? For example: compatible data types, integer with assignment vs. counter with increment, blind writes Leverage EC where possible Causal reasoning is important to observe effects immediately For example: creating a reference to an object, inserting an item into a collection based on a monotonic condition Automatically specialize types Is it possible to analyze programs to determine the right CRDT implementation or type? For example: no inserts, use a G-Set; no decrements, use a G-Counter
based on the topology, prevent the compilation of certain programs? For example, certain network topologies might not be able to provide transactions: Peer-to-peer, when the network is partially connected Publish-subscribe topologies, where clients may not receive messages until they come online Storage-aware compiler When operating on storage that is weaker than causal consistency, what should happen? For example: On Riak, causality is not possible. Therefore, what should happen? Prevent compilation, disallowing operations that require causality? Allow compilation, but use a middleware layer to ensure causality?
data stores, how do I know which one to pick? Applications might require different stores: strong consistency, causal consistency, weak consistency? Desire the most available, weakest consistency model – is this compatible with application invariants? Given a choice of underlying network / communication layers, which are compatible? What stores should not be allowed given particular storage engines? What about the inverse? Infrastructure ‘glue’ correctness How do I know the code used to interact with the underlying data store is correct? For example: Am I interacting with the underlying storage system correctly? How do I designate which consistency levels and/or guarantees the underlying infrastructure provides?
runtimes for distribution can hardly compete with existing systems solutions Systems are designed with a particular use case in mind, with an optimized algorithm for that use case Systems industrialization has higher adoption in industry than new programming models/languages Existing languages are too primitive Established, proven runtime systems Mostly concurrent only (even that, is recent) Lack of primitives for distributed computing with some notable exceptions Can we establish a joint systems-PL research agenda Find a way to leverage existing systems infrastructure (and/or algorithms) Build a language (and/or programming model) that can leverage existing systems for large-scale distributed applications