Conflict-free Replicated Data Types

This page documents how to implement Cloudstate CRDT entities in Java. For information on what Cloudstate CRDT entities are, please read the general Conflict-free Replicated Data Type documentation first.

A CRDT can be created by annotating it with the @CrdtEntity annotation.

@CrdtEntity
public class ShoppingCartEntity {

Accessing and creating an entity’s CRDT

Each CRDT entity manages one root CRDT. That CRDT will either be supplied to the entity by the proxy when it is started, or, if no CRDT exists for the entity when it is started, it can be created by the entity using a CrdtFactory extending context.

There are multiple ways that a CRDT entity may access its CRDT. It may have the CRDT injected directly into its constructor or a command handler - the value can be wrapped in an Optional to distinguish between entities that have been created and CRDTs that have not yet been created. If not wrapped in Optional, the CRDT will be created automatically, according to its type. The CRDT can also be read from any CrdtContext via the state method.

An entity’s CRDT can be created from the entity’s constructor using the CrdtFactory methods on CrdtCreationContext, or using the same methods in a command handler using the CommandContext. Note that the CRDT may only be created once, and only if it hasn’t been provided by the CloudState proxy already. Any attempt to create a CRDT when one already exists will throw an IllegalStateException.

For most use cases, simply injecting the CRDT directly into the constructor, and storing in a local field, will be the most convenient and straightforward method of using a CRDT. In our shopping cart example, we’re going to use an LWWRegisterMap, this shows how it may be injected:

private final LWWRegisterMap<String, Shoppingcart.LineItem> items;

public ShoppingCartEntity(LWWRegisterMap<String, Shoppingcart.LineItem> items) {
  this.items = items;
}

In addition to the CRDT, the constructor may accept a CrdtCreationContext.

Handling commands

Command handlers can be declared by annotating a method with @CommandHandler. They take a context class of type CommandContext.

By default, the name of the command that the method handles will be the name of the method with the first letter capitalized. So, a method called getCart will handle gRPC service call command named GetCart. This can be overridden by setting the name parameter on the @CommandHandler annotation.

The command handler also can take the gRPC service call input type as a parameter, to receive the command message. This is optional, sometimes it’s not needed. For example, our GetCart service call doesn’t need any information from the message, since it’s just returning the current state as is. Meanwhile, the AddItem service call does need information from the message, since it needs to know the product id, description and quantity to add to the cart.

The return type of the command handler must be the output type for the gRPC service call, this will be sent as the reply.

The following shows the implementation of the GetCart command handler. This command handler is a read-only command handler, it doesn’t update the CRDT, it just returns some state:

@CommandHandler
public Shoppingcart.Cart getCart() {
  return Shoppingcart.Cart.newBuilder().addAllItems(items.values()).build();
}

Updating a CRDT

Due to CloudState’s take in turns approach, CRDTs may only be updated in command handlers and stream cancellation callbacks.

Here’s a command handler for the AddItem command that adds the item to the shopping cart:

@CommandHandler
public Empty addItem(Shoppingcart.AddLineItem item, CommandContext ctx) {
  if (item.getQuantity() <= 0) {
    ctx.fail("Cannot add a negative quantity of items.");
  }
  if (items.containsKey(item.getProductId())) {
    items.computeIfPresent(
        item.getProductId(),
        (id, old) -> old.toBuilder().setQuantity(old.getQuantity() + item.getQuantity()).build());
  } else {
    items.put(
        item.getProductId(),
        Shoppingcart.LineItem.newBuilder()
            .setProductId(item.getProductId())
            .setName(item.getName())
            .setQuantity(item.getQuantity())
            .build());
  }
  return Empty.getDefaultInstance();
}

Deleting a CRDT

A CRDT can be deleted by invoking CommandContext.delete. Once a CRDT is deleted, the entity will be shut down, and all subsequent commands for the entity will be rejected.

Caution should be taken when deleting CRDTs - the Reference Implementation of the proxy needs to maintain tombstones for each CRDT deleted, so over time, if many CRDTs are created and deleted, this will result in not just running out of memory, but increased network usage as the tombstones still need to be gossipped through the cluster for replication.

Streamed command handlers

Streamed commands can be used to receive and publish updates to the state. If a gRPC service call has a streamed result type, the handler for that call can accept a StreamedCommandContext, and use that to register callbacks.

Responding to changes

If the command handler wishes to publish changes to the stream it can register a callback with onChange, which will be invoked every time the CRDT changes.

The callback is then able to return a message to be sent to the client (or empty, if it wishes to send no message in response to that particular change). The callback may not modify the CRDT itself, but it may emit effects that may modify the CRDT.

If the shopping cart service had a WatchCart call, like this:

rpc WatchCart(GetShoppingCart) returns (stream Cart);

that could be implemented like this:

@CommandHandler
public Shoppingcart.Cart watchCart(StreamedCommandContext<Shoppingcart.Cart> ctx) {

  ctx.onChange(subscription -> Optional.of(getCart()));

  return getCart();
}

Ending the stream

The onChange callback can end the stream by invoking endStream on the SubscriptionContext it is passed. If it does this, it will not receive an onCancel callback.

Responding to stream cancellation

A streamed command handler may also register an onCancel callback to be notified when the stream is cancelled. The cancellation callback handler may update the CRDT. This is useful if the CRDT is being used to track connections, for example, when using Vote CRDTs to track a user’s online status.

Types of CRDTs

The Cloudstate Java support library offers Java classes for each of the CRDTs available in Cloudstate.

Counters and flags

GCounter, PNCounter and Flag are available, offering operations relevant to each CRDT.

Vote

Vote is available for the Vote CRDT. The Vote CRDT allows updating the current node’s vote using the vote method, the current nodes vote can be queried using the getSelfVote method.

For determining the result of a vote, getVoters and getVotesFor can be used to check the total number of nodes, and the number of nodes that have voted for the condition, respectively. In addition, convenience methods are provided for common vote decision approaches, isAtLeastOne returns true if there is at least one voter for the condition, isMajority returns true if the number of votes for is more than half the number of voters, and isUnanimous returns true if the number of votes for equals the number of voters.

Registers

LWWRegister provides the LWWRegister CRDT. It can be interacted with using the set and get methods. If you wish to use a custom clock, you can use the set overload that allows passing a custom clock and custom clock value.

Direct mutations to LWWRegister and LWWRegisterMap values will not be replicated to other nodes, only mutations triggered through using the set and put methods will be replicated. Hence, the following update will not be replicated:

myLwwRegister.get().setSomeField("foo");

This update however will be replicated:

MyValue myValue = myLwwRegister.get();
myValue.setSomeField("foo");
myLwwRegister.set(myValue);

In general, we recommend that these values be immutable, as this will prevent accidentally mutating without realising the update won’t be applied. If using protobufs as values, this will be straightforward, since compiled protobuf classes are immutable.

Sets and Maps

Cloudstate Java support provides GSet and ORSet that implement the java.util.Set interface, and ORMap that implements the java.util.Map. However, not all operations are implemented - GSet doesn’t support any removal operations, and ORMap does not support any operations that would replace an existing value in the map.

To insert a value into an ORMap, you should use the getOrCreate method. The passed in callback will give you a CrdtFactory that you can use to create the CRDT value that you wish to use.

With all maps and sets, map keys and set values must be immutable. Cloudstate ignores the individual mutation of the key or value (not replicated to other nodes). Furthermore, their serialized form must be stable. The Cloudstate proxy uses the serialized form of the values to track changes in the set or map. If the same value serializes to two different sets of bytes on different occasions, they will be treated as different elements in the set or map.

This is particularly relevant when using protobufs. The ordering of map entries in a serialized protobuf is undefined, and very often will be different for two equal maps. Hence, maps should never be used as keys in ORMap or as values in GSet, ORSet. For the rest of the protobuf specification, while no guarantees are made on the stability by the protobuf specification itself, the Java libraries do produce stable orderings of fields and stable output of non-map values. Care should be taken when changing the protobuf structure. Many changes, that are backwards compatible from a protobuf standpoint, do not necessarily translate into stable serializations.

If using JSON serialization, it is recommended that you explicitly define the field ordering using Jackson’s @JsonPropertyOrder annotation, and as with protobufs, never use Map or Set in your JSON objects since the ordering of those is not stable.

Some wrapper classes are also provided for ORMap. These provide more convenient APIs for working with values of particular CRDT types. They are:

LWWRegisterMap

A map of LWWRegister values. This exposes the LWWRegister values as values directly in the map.

PNCounterMap

A map of PNCounter values. This exposes the current value of the PNCounters directly as values in the map, and offers increment and decrement methods to update the values.

Registering the entity

Once you’ve created your entity, you can register it with the CloudState server, by invoking the registerCrdtEntity method. In addition to passing your entity class and service descriptor, if you use protobuf for serialization and any protobuf message definitions are missing from your service descriptor (they are not declared directly in the file, nor as dependencies), then you’ll need to pass those protobuf descriptors as well.

public static void main(String... args) {
  new CloudState()
      .registerCrdtEntity(
          ShoppingCartEntity.class,
          Shoppingcart.getDescriptor().findServiceByName("ShoppingCartService"))
      .start();
}