Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Choose Your Own Consistency

Choose Your Own Consistency

Joint talk with Tom Santero (@tsantero) for Toronto Erlang Factory Lite 2013.

Christopher Meiklejohn

November 23, 2013
Tweet

More Decks by Christopher Meiklejohn

Other Decks in Programming

Transcript

  1. Eventual Consistency “Eventual consistency is a consistency model used in

    distributed computing that informally guarantees that, if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value.” --Wikipedia
  2. Riak Overview Allow Mult [{a, v1, t1}, {b, v1, t2}]

    [{a, v1, t1}, {b, v1, t2}] [{a, v1, t1}, {b, v1, t2}]
  3. b a c a, b a, c a, b, c

    Set; merge function: union. b, c
  4. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}], [{1, a}] ] [ [{1, a}], [{1, a}] ]
  5. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}], [{1, a}] ] [ [{1, a}], [{1, a}] ] [ [{1, a}, {2, a}], [{1, a}] ]
  6. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}], [{1, a}] ] [ [{1, a}], [{1, a}] ] [ [{1, a}, {2, a}], [{1, a}] ] [ [{1, a}, {2, a}], [{1, a}] ]
  7. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}, {2, b}], [] ]
  8. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}, {2, b}], [] ] [ [{1, a}], [{1, a}] ]
  9. [ [{1, a}], [] ] [ [{1, a}], [] ]

    [ [{1, a}, {2, b}], [] ] [ [{1, a}], [{1, a}] ] [ [{1, a}, {2, b}], [{1, a}] ]
  10. -­‐type  crdt()  ::  term(). -­‐type  operation()  ::  term(). -­‐type  actor()

     ::  term(). -­‐type  value()  ::  term(). -­‐type  error()  ::  term(). -­‐callback  new()  -­‐>  crdt(). -­‐callback  value(crdt())  -­‐>  term(). -­‐callback  value(term(),  crdt())  -­‐>  value(). -­‐callback  update(operation(),  actor(),  crdt())  -­‐>                         {ok,  crdt()}  |  {error,  error()}. -­‐callback  merge(crdt(),  crdt())  -­‐>  crdt(). -­‐callback  equal(crdt(),  crdt())  -­‐>  boolean(). -­‐callback  to_binary(crdt())  -­‐>  binary(). -­‐callback  from_binary(binary())  -­‐>  crdt(). riak_dt/src/riak_dt.erl
  11. riak_dt/src/riak_dt_gcounter.erl -­‐module(riak_dt_gcounter). -­‐export([new/0,  new/2,  value/1,  value/2,  update/3,  merge/2,  equal/2,  to_binary/1,

     from_binary/1]). -­‐export_type([gcounter/0,  gcounter_op/0]). -­‐opaque  gcounter()  ::  orddict:orddict(). -­‐type  gcounter_op()  ::  increment  |  {increment,  pos_integer()}.
  12. riak_dt/src/riak_dt_pncounter.erl -­‐module(riak_dt_pncounter). -­‐export([new/0,  new/2,  value/1,  value/2,        

             update/3,  merge/2,  equal/2,  to_binary/1,  from_binary/1]). -­‐export_type([pncounter/0,  pncounter_op/0]). -­‐opaque  pncounter()    ::  [{Actor::riak_dt:actor(),  Inc::pos_integer(),                                                      Dec::pos_integer()}]. -­‐type  pncounter_op()  ::  riak_dt_gcounter:gcounter_op()  |  decrement_op(). -­‐type  decrement_op()  ::  decrement  |  {decrement,  pos_integer()}. -­‐type  pncounter_q()    ::  positive  |  negative.
  13. riak_dt/src/riak_dt_gset.erl -­‐module(riak_dt_gset). -­‐behaviour(riak_dt). %%  API -­‐export([new/0,  value/1,  update/3,  merge/2,  equal/2,

                     to_binary/1,  from_binary/1,  value/2]). -­‐export_type([gset/0,  binary_gset/0,  gset_op/0]). -­‐opaque  gset()  ::  members(). -­‐type  binary_gset()  ::  binary(). -­‐type  gset_op()  ::  {add,  member()}.
  14. riak_dt/src/riak_dt_orset.erl -­‐module(riak_dt_orset). -­‐behaviour(riak_dt). %%  API -­‐export([new/0,  value/1,  update/3,  merge/2,  equal/2,

                     to_binary/1,  from_binary/1,  value/2,  precondition_context/1]). -­‐export_type([orset/0,  binary_orset/0,  orset_op/0]). -­‐opaque  orset()  ::  orddict:orddict(). -­‐type  binary_orset()  ::  binary().  %%  A  binary  that  from_binary/1  will   operate  on. -­‐type  orset_op()  ::  {add,  member()}  |  {remove,  member()}  |                                        {add_all,  [member()]}  |  {remove_all,  [member()]}  |                                        {update,  [orset_op()]}. -­‐type  actor()  ::  riak_dt:actor(). -­‐type  member()  ::  term().
  15. riak_dt/src/riak_dt_orswot.erl -­‐module(riak_dt_orswot). -­‐behaviour(riak_dt). -­‐export_type([orswot/0,  orswot_op/0,  binary_orswot/0]). -­‐opaque  orswot()  ::  {riak_dt_vclock:vclock(),

     entries()}. -­‐type  binary_orswot()  ::  binary().  %%  A  binary  that  from_binary/1  will  operate   on. -­‐type  orswot_op()  ::    {add,  member()}  |  {remove,  member()}  |                                            {add_all,  [member()]}  |  {remove_all,  [member()]}  |                                            {update,  [orswot_op()]}. -­‐type  orswot_q()    ::  size  |  {contains,  term()}. -­‐type  actor()  ::  riak_dt:actor(). -­‐type  entries()  ::  [{member(),  minimal_clock()}]. -­‐type  minimal_clock()  ::  [dot()]. -­‐type  dot()  ::  {actor(),  Count::pos_integer()}. -­‐type  member()  ::  term().
  16. riak_dt/src/riak_dt_map.erl -­‐module(riak_dt_map). -­‐behaviour(riak_dt). %%  API -­‐export([new/0,  value/1,  value/2,  update/3,  merge/2,

                     equal/2,  to_binary/1,  from_binary/1,  precondition_context/1]). -­‐export_type([map/0,  binary_map/0,  map_op/0]). -­‐type  binary_map()  ::  binary().  %%  A  binary  that  from_binary/1  will  accept -­‐type  map()  ::  {riak_dt_vclock:vclock(),  valuelist()}. -­‐type  field()  ::  {Name::term(),  Type::crdt_mod()}. -­‐type  crdt_mod()  ::  riak_dt_pncounter  |  riak_dt_lwwreg  |                                        riak_dt_od_flag  |                                        riak_dt_map  |  riak_dt_orswot. -­‐type  valuelist()  ::  [{field(),  entry()}]. -­‐type  entry()  ::  {minimal_clock(),  crdt()}. -­‐type  crdt()    ::    riak_dt_pncounter:pncounter()  |  riak_dt_od_flag:od_flag()  |                                    riak_dt_lwwreg:lwwreg()  |                                    riak_dt_orswot:orswot()  |                                    riak_dt_map:map(). -­‐type  map_op()  ::  {update,  [map_field_update()  |  map_field_op()]}.
  17. -­‐module(riak_dt_lwwreg). -­‐export([new/0,  value/1,  value/2,  update/3,  merge/2,        

             equal/2,  to_binary/1,  from_binary/1]). -­‐export_type([lwwreg/0,  lwwreg_op/0]). -­‐opaque  lwwreg()  ::  {term(),  non_neg_integer()}. -­‐type  lwwreg_op()  ::  {assign,  term(),  non_neg_integer()}    |  {assign,   term()}. -­‐type  lww_q()  ::  timestamp. riak_dt/src/riak_dt_lwwreg.erl
  18. This project is funded by the European Union, 7th Research

    Framework Programme, ICT call 10, grant agreement n°609551.
  19. Distributed Consensus “The problem of reaching agreement among remote processes

    is one of the most fundamental problems in distributed computing and is at the core of many algorithms for distributed data processing, distributed file management, and fault-tolerant distributed applications.” --Fischer, Lynch, Paterson
  20. Node 1 Node 2 Node 3 N++ prepare(N) promise(N, V

    ) b promise(N, V ) c V = f(V , V , V ) b c a N commit(N, V ) N accept(N)
  21. Node 1 Node 2 Node 3 N++; I = 0

    prepare(N, I) promise(N, I, V ) b promise(N, I, V ) c V = f(V , V , V ) b c a N commit(N, I, V ) N accept(N, I)
  22. Node 1 Node 2 Node 3 obj.epoch < epoch get(key)

    reply(Epoch , Seq , Val ) b Val = latest(Val , Val , Val ) Val.epoch = epoch write(Epoch, ++Seq, Val) ack(Epoch, Seq) b b reply(Epoch , Seq , Val ) c c c a b c
  23. Node 1 Node 2 Node 3 obj.epoch < epoch get(key)

    reply(Epoch , Seq , Val ) b Latest = latest(Val , Val , Val ) Val = modify(Latest) write(Epoch, ++Seq, Val) ack(Epoch, Seq) b b reply(Epoch , Seq , Val ) c c c a b c
  24. Node 1 Node 2 Node 3 obj.epoch == epoch Latest

    = local_get(Key) Val = modify(Latest) write(Epoch, ++Seq, Val) ack(Epoch, Seq)
  25. Existing Ensemble Joining Ensemble riak_01 riak_02 riak_03 riak_07 riak_08 riak_09

    [{riak_01}, {riak_02}, {riak_03}] [{riak_07}, {riak_08}, {riak_09}]