Conflict-free Replicated Data Types
This page documents how to implement Cloudstate CRDT entities in Go. For information on what Cloudstate CRDT entities are, please read the general Conflict-free Replicated Data Type documentation first.
A CRDT entity can be created by implementing the crdt.EntityHandler
interface and registering an entity with a Cloudstate instance.
type EntityHandler interface {
HandleCommand(ctx *CommandContext, name string, msg proto.Message) (*any.Any, error)
Default(ctx *Context) (CRDT, error)
Set(ctx *Context, state CRDT) error
}
Accessing and creating an entity’s CRDT
Each CRDT entity manages one root CRDT. That CRDT will either be supplied to the entity by the proxy when it is started through crdt.EntityHandler.Set
, or, if no CRDT exists for the entity when it is started, it has to be created by the entity using the crdt.EntityHandler.Default
factory method.
func (s *ShoppingCart) Default(ctx *crdt.Context) (crdt.CRDT, error) {
return crdt.NewORMap(), nil
}
func (s *ShoppingCart) Set(ctx *crdt.Context, state crdt.CRDT) error {
s.items = state.(*crdt.ORMap)
return nil
}
The state, the CRDT, supplied with |
Handling commands
Command handlers are implemented with the crdt.EntityHandler.HandleCommand
method.
The command handler provides a CommandContext
, a commands name
, and the gRPCs message as a protobuf message.
The matching combination of a commands name and the messages type from the defined gRPC service can be used to handle the command.
So to handle the shopping cart service method GetCart
:
rpc GetCart (GeShoppingCart) returns (Cart);
the commands name GetCart
together with its message type GetShoppingCart
has to be matched.
A type switch or a type assertion for the protobuf command message is useful to get the commands data, process it, and then return the appropriate return type, Cart
in our example.
The return type of the command handler must be the output type for the gRPC service call, this will be sent as the reply.
The following shows the implementation of the GetCart
command.
This command handler is a read-only command handler, it doesn’t update the CRDT, it just returns some state:
func (s *ShoppingCart) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
switch m := msg.(type) {
case *GetShoppingCart:
cart, err := s.getCart()
if err != nil {
return nil, err
}
return encoding.MarshalAny(cart)
Updating a CRDT
Due to Cloudstate’s take in turns approach, CRDTs may only be updated in command handlers and stream cancellation callbacks.
Here’s a snipped for handling of the AddLineItem
message that adds the item to the shopping cart.
If the gRPC service methods message identifies a service unambiguously, there is no need to dispatch the commands name.
func (s *ShoppingCart) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
...
switch m := msg.(type) {
case *AddLineItem:
if m.GetQuantity() <= 0 {
return nil, errors.New("cannot add a negative quantity of items")
}
item, err := encoding.MarshalAny(&LineItem{
ProductId: m.GetProductId(),
Name: m.GetName(),
Quantity: m.GetQuantity(),
})
if err != nil {
return nil, err
}
key := encoding.String(m.GetProductId())
reg, err := s.items.LWWRegister(key)
if err != nil {
return nil, err
}
if reg != nil {
reg.Set(item)
} else {
reg = crdt.NewLWWRegister(item)
}
s.items.Set(key, reg)
return encoding.Empty, nil
...
It’s good practice to dispatch commands both using their command name and messages type.
Unfortunately, the Go gRPC implementation does not provide typesafe service descriptors out of a compiled While the Go runtime function |
Deleting a CRDT
A CRDT can be deleted by invoking crdt.Context.Delete
.
Once a CRDT is deleted, the entity will be shut down, and all subsequent commands for the entity will be rejected.
Caution should be taken when deleting CRDTs - the Reference Implementation of the proxy needs to maintain tombstones for each CRDT deleted, so over time, if many CRDTs are created and deleted, this will result in not just running out of memory, but increased network usage as the tombstones still need to be gossipped through the cluster for replication.
Streamed command handlers
Streamed commands can be used to receive and publish updates to the state.
If a gRPC service call has a streamed result type, the handler for that call can use the crdt.CommandContext
, and use that to register handler functions.
Responding to changes
If the command handler wishes to publish changes to the stream it can register a handler function with crdt.CommandContext.ChangeFunc
, which will be invoked every time the CRDT changes.
The handler function is then able to return a message to be sent to the client (or empty.Empty
, if it wishes to send no message in response to that particular change).
The handler function may not modify the CRDT itself, but it may emit effects that may modify the CRDT.
If the shopping cart service had a WatchCart
call, like this:
rpc WatchCart (GetShoppingCart) returns (stream Cart);
that could be implemented like this:
switch name {
case "WatchCart":
ctx.ChangeFunc(func(c *crdt.CommandContext) (*any.Any, error) {
cart, err := s.getCart()
if err != nil {
return nil, err
}
return encoding.MarshalAny(cart)
})
cart, err := s.getCart()
if err != nil {
return nil, err
}
return encoding.MarshalAny(cart)
}
Ending the stream
The ChangeFunc
handler function can end the stream by invoking crdt.CommandContext.EndStream
on the CommandContext
it is passed.
If it does this, it will not receive an cancellation callback.
Responding to stream cancellation
A command handler may register an crdt.CommandContext.CancelFunc
handler function to be notified when the stream is cancelled.
The cancellation handler function may update the CRDT. This is useful if the CRDT is being used to track connections, for example, when using crdt.Vote
CRDTs to track a user’s online status.
Types of CRDTs
The Cloudstate Go language support library offers Go types for each of the CRDTs available in Cloudstate.
Counters and flags
crdt.GCounter
, crdt.PNCounter
and crdt.Flag
are available, offering operations relevant to each CRDT.
Vote
crdt.Vote
is available for the Vote CRDT. The Vote CRDT allows updating the current node’s vote using the crdt.Vote.Vote
method, the current nodes vote can be queried using the crdt.Vote.SelfVote
method.
For determining the result of a vote, crdt.Vote.Voters
and for determining the result of a vote, crdt.Vote.VotesFor
can be used to check the total number of nodes, and the number of nodes that have voted for the condition, respectively.
In addition, convenience methods are provided for common vote decision approaches, crdt.Vote.AtLeastOne
returns true if there is at least one voter for the condition. crdt.Vote.Majority
returns true if the number of votes for is more than half the number of voters, and crdt.Vote.All
returns true
if the number of votes for equals the number of voters.
Registers
crdt.LWWRegister
provides the LWWRegister CRDT. It can be interacted with using the crdt.LWWRegister.Set
and crdt.LWWRegister.Value
methods.
If you wish to use a custom clock, you can use the crdt.LWWRegister.SetWithClock
overload that allows passing a custom clock and custom clock value.
Sets and Maps
The Cloudstate Go support provides crdt.GSet
, crdt.ORSet
and crdt.ORMap
.
With Cloudstate maps and sets, the map keys and set values serialized form must be stable. The Cloudstate proxy uses the serialized form of the values to track changes in the set or map. If the same value serializes to two different sets of bytes on different occasions, they will be treated as different elements in the set or map. This is particularly relevant when using protocol buffers.
The ordering of map entries in a serialized protocol buffers is undefined, and very often will be different for two equal maps.
Hence, maps should never be used as keys in Care should be taken when changing the structure of protocol buffers. Many changes that are backwards compatible from a protocol buffer standpoint do not necessarily translate into stable serializations. |
Registering the entity
Once you’ve created your entity, you can register it with the cloudstate.CloudState
server, by invoking the cloudstate.CloudState.RegisterCRDT
method.
In addition to passing your entity and service descriptor, if you use protocol buffers for serialization and any protobuf message definitions are missing from your service descriptor (they are not declared directly in the file, nor as dependencies), then you’ll need to pass those protocol buffer descriptors as well.
err = server.RegisterCRDT(&crdt.Entity{
ServiceName: "example.shoppingcart.ShoppingCartService",
EntityFunc: shoppingcart.NewShoppingCart,
}, protocol.DescriptorConfig{
Service: "shoppingcart.proto",
}.AddDomainDescriptor("domain.proto", "hotitems.proto"))