Conflict-free Replicated Data Types

This page documents how to implement Cloudstate CRDT entities in Go. For information on what Cloudstate CRDT entities are, please read the general Conflict-free Replicated Data Type documentation first.

A CRDT entity can be created by implementing the crdt.EntityHandler interface and registering an entity with a Cloudstate instance.

type EntityHandler interface {
	HandleCommand(ctx *CommandContext, name string, msg proto.Message) (*any.Any, error)
	Default(ctx *Context) (CRDT, error)
	Set(ctx *Context, state CRDT) error
}

Accessing and creating an entity’s CRDT

Each CRDT entity manages one root CRDT. That CRDT will either be supplied to the entity by the proxy when it is started through crdt.EntityHandler.Set, or, if no CRDT exists for the entity when it is started, it has to be created by the entity using the crdt.EntityHandler.Default factory method.

func (s *ShoppingCart) Default(ctx *crdt.Context) (crdt.CRDT, error) {
	return crdt.NewORMap(), nil
}

func (s *ShoppingCart) Set(ctx *crdt.Context, state crdt.CRDT) error {
	s.items = state.(*crdt.ORMap)
	return nil
}

The state, the CRDT, supplied with Set by the Cloudstate instance will never be of a different type than what Default returns for a CRDT entity. This is because of their symmetric use for the same entities instance and version. In this regard, a type assertion check is not necessary.

Handling commands

Command handlers are implemented with the crdt.EntityHandler.HandleCommand method. The command handler provides a CommandContext, a commands name, and the gRPCs message as a protobuf message. The matching combination of a commands name and the messages type from the defined gRPC service can be used to handle the command. So to handle the shopping cart service method GetCart:

rpc GetCart (GeShoppingCart) returns (Cart);

the commands name GetCart together with its message type GetShoppingCart has to be matched. A type switch or a type assertion for the protobuf command message is useful to get the commands data, process it, and then return the appropriate return type, Cart in our example.

The return type of the command handler must be the output type for the gRPC service call, this will be sent as the reply.

The following shows the implementation of the GetCart command. This command handler is a read-only command handler, it doesn’t update the CRDT, it just returns some state:

func (s *ShoppingCart) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
	switch m := msg.(type) {
	case *GetShoppingCart:
		cart, err := s.getCart()
		if err != nil {
			return nil, err
		}
		return encoding.MarshalAny(cart)

Updating a CRDT

Due to Cloudstate’s take in turns approach, CRDTs may only be updated in command handlers and stream cancellation callbacks.

Here’s a snipped for handling of the AddLineItem message that adds the item to the shopping cart. If the gRPC service methods message identifies a service unambiguously, there is no need to dispatch the commands name.

func (s *ShoppingCart) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
...
	switch m := msg.(type) {
	case *AddLineItem:
		if m.GetQuantity() <= 0 {
			return nil, errors.New("cannot add a negative quantity of items")
		}

		item, err := encoding.MarshalAny(&LineItem{
			ProductId: m.GetProductId(),
			Name:      m.GetName(),
			Quantity:  m.GetQuantity(),
		})
		if err != nil {
			return nil, err
		}
		key := encoding.String(m.GetProductId())
		reg, err := s.items.LWWRegister(key)
		if err != nil {
			return nil, err
		}
		if reg != nil {
			reg.Set(item)
		} else {
			reg = crdt.NewLWWRegister(item)
		}
		s.items.Set(key, reg)
		return encoding.Empty, nil
...

It’s good practice to dispatch commands both using their command name and messages type. Unfortunately, the Go gRPC implementation does not provide typesafe service descriptors out of a compiled *.proto file as they are not exported to be user accessible.

While the Go runtime function runtime.FuncForPC allows to get a function pointers pointed function name and there are ways to get the shoppingcart.ShoppingCartServiceServer.AddItem command name from the gRPC interface of the shopping cart service at runtime, we do not encourage doing so. In general, the Cloudstate Go User Support library does use explicit ways to implement the gRPC service of a Cloudstate entity. It could have been implemeted using reflection heavily, although this is non-idiomatic use of Go and would leave this library as a strange citizen in the Go ecosystem.

Deleting a CRDT

A CRDT can be deleted by invoking crdt.Context.Delete. Once a CRDT is deleted, the entity will be shut down, and all subsequent commands for the entity will be rejected.

Caution should be taken when deleting CRDTs - the Reference Implementation of the proxy needs to maintain tombstones for each CRDT deleted, so over time, if many CRDTs are created and deleted, this will result in not just running out of memory, but increased network usage as the tombstones still need to be gossipped through the cluster for replication.

Streamed command handlers

Streamed commands can be used to receive and publish updates to the state. If a gRPC service call has a streamed result type, the handler for that call can use the crdt.CommandContext, and use that to register handler functions.

Responding to changes

If the command handler wishes to publish changes to the stream it can register a handler function with crdt.CommandContext.ChangeFunc, which will be invoked every time the CRDT changes.

The handler function is then able to return a message to be sent to the client (or empty.Empty, if it wishes to send no message in response to that particular change). The handler function may not modify the CRDT itself, but it may emit effects that may modify the CRDT.

If the shopping cart service had a WatchCart call, like this:

rpc WatchCart (GetShoppingCart) returns (stream Cart);

that could be implemented like this:

switch name {
case "WatchCart":
	ctx.ChangeFunc(func(c *crdt.CommandContext) (*any.Any, error) {
		cart, err := s.getCart()
		if err != nil {
			return nil, err
		}
		return encoding.MarshalAny(cart)
	})
	cart, err := s.getCart()
	if err != nil {
		return nil, err
	}
	return encoding.MarshalAny(cart)
}

Ending the stream

The ChangeFunc handler function can end the stream by invoking crdt.CommandContext.EndStream on the CommandContext it is passed. If it does this, it will not receive an cancellation callback.

Responding to stream cancellation

A command handler may register an crdt.CommandContext.CancelFunc handler function to be notified when the stream is cancelled. The cancellation handler function may update the CRDT. This is useful if the CRDT is being used to track connections, for example, when using crdt.Vote CRDTs to track a user’s online status.

Types of CRDTs

The Cloudstate Go language support library offers Go types for each of the CRDTs available in Cloudstate.

Counters and flags

crdt.GCounter, crdt.PNCounter and crdt.Flag are available, offering operations relevant to each CRDT.

Vote

crdt.Vote is available for the Vote CRDT. The Vote CRDT allows updating the current node’s vote using the crdt.Vote.Vote method, the current nodes vote can be queried using the crdt.Vote.SelfVote method.

For determining the result of a vote, crdt.Vote.Voters and for determining the result of a vote, crdt.Vote.VotesFor can be used to check the total number of nodes, and the number of nodes that have voted for the condition, respectively. In addition, convenience methods are provided for common vote decision approaches, crdt.Vote.AtLeastOne returns true if there is at least one voter for the condition. crdt.Vote.Majority returns true if the number of votes for is more than half the number of voters, and crdt.Vote.All returns true if the number of votes for equals the number of voters.

Registers

crdt.LWWRegister provides the LWWRegister CRDT. It can be interacted with using the crdt.LWWRegister.Set and crdt.LWWRegister.Value methods. If you wish to use a custom clock, you can use the crdt.LWWRegister.SetWithClock overload that allows passing a custom clock and custom clock value.

Sets and Maps

The Cloudstate Go support provides crdt.GSet, crdt.ORSet and crdt.ORMap.

With Cloudstate maps and sets, the map keys and set values serialized form must be stable. The Cloudstate proxy uses the serialized form of the values to track changes in the set or map. If the same value serializes to two different sets of bytes on different occasions, they will be treated as different elements in the set or map.

This is particularly relevant when using protocol buffers. The ordering of map entries in a serialized protocol buffers is undefined, and very often will be different for two equal maps. Hence, maps should never be used as keys in ORMap or as values in GSet, ORSet. For the rest of the protocol buffers specification, while no guarantees are made on the stability by the specification itself, the Go protocol buffer libraries do produce stable orderings of fields and stable output of non-map values.

Care should be taken when changing the structure of protocol buffers. Many changes that are backwards compatible from a protocol buffer standpoint do not necessarily translate into stable serializations.

Registering the entity

Once you’ve created your entity, you can register it with the cloudstate.CloudState server, by invoking the cloudstate.CloudState.RegisterCRDT method. In addition to passing your entity and service descriptor, if you use protocol buffers for serialization and any protobuf message definitions are missing from your service descriptor (they are not declared directly in the file, nor as dependencies), then you’ll need to pass those protocol buffer descriptors as well.

err = server.RegisterCRDT(&crdt.Entity{
	ServiceName: "example.shoppingcart.ShoppingCartService",
	EntityFunc:  shoppingcart.NewShoppingCart,
}, protocol.DescriptorConfig{
	Service: "shoppingcart.proto",
}.AddDomainDescriptor("domain.proto", "hotitems.proto"))