Event sourcing

This page documents how to implement Cloudstate event sourced entities in Go. For information on what Cloudstate event sourced entities are, please read the general Event sourcing documentation first.

An event sourced entity can be created by implementing the eventsourced.EntityHandler interface.

// An EntityHandler implements methods to handle commands and events.
type EntityHandler interface {
	// HandleCommand is the code that handles a command. It
	// may validate the command using the current state, and
	// may emit events as part of its processing. A command
	// handler must not update the state of the entity directly,
	// only indirectly by emitting events. If a command handler
	// does update the state, then when the entity is passivated
	// (removed from memory), those updates will be lost.
	HandleCommand(ctx *Context, name string, cmd proto.Message) (reply proto.Message, err error)
	// HandleEvent is the only piece of code that is allowed
	// to update the state of the entity. It receives events,
	// and, according to the event, updates the state.
	HandleEvent(ctx *Context, event interface{}) error
}

Then register that entity with cloudstate.CloudState.RegisterEventSourced, your entity gets configured to be an event sourced entity and handled by the Cloudstate instance from now on.

err = server.RegisterEventSourced(&eventsourced.Entity{
	ServiceName:   "com.example.shoppingcart.ShoppingCart",
	PersistenceID: "ShoppingCart",
	EntityFunc:    shoppingcart.NewShoppingCart,
}, protocol.DescriptorConfig{
	Service: "shoppingcart.proto",
}.AddDomainDescriptor("domain.proto"))
if err != nil {
	log.Fatalf("CloudState failed to register entity: %s", err)
}

The ServiceName is the fully qualified name of the gRPC service that implements this entity’s interface. Setting it is mandatory.

The PersistenceID is used to namespace events in the journal, useful for when you share the same database between multiple entities. It is recommended to be the name for the entity type (in this case, ShoppingCart). Setting it is mandatory.

The SnapshotEvery parameter controls how often snapshots are taken, so that the entity doesn’t need to be recovered from the whole journal each time it’s loaded. If left unset, it defaults to 100. Setting it to a negative number will result in snapshots never being taken.

The EntityFunc is a factory function which generates a new entity whenever Cloudstate has to initialize one.

func NewShoppingCart(eventsourced.EntityID) eventsourced.EntityHandler {
	return &ShoppingCart{
		cart: make([]*domain.LineItem, 0),
	}
}

Persistence types and serialization

Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straightforward way to persist events and snapshots is to use protocol buffers (protobuf). Cloudstate will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see Serialization.

While protobufs are the recommended format for persisting events, it is recommended that you do not persist your service’s protobuf messages, rather, you should create new messages, even if they are identical to the service’s. While this may introduce some overhead in needing to convert from one type to the other, the reason for doing this is that it will allow the service’s public interface to evolve independently of its data storage format, which should be private.

For our shopping cart example, we’ll create a new file called domain.proto, the name domain is selected to indicate that these are our application’s domain objects:

syntax = "proto3";

package com.example.shoppingcart.persistence;

option go_package = "github.com/cloudstateio/go-support/example/shoppingcart/persistence;persistence";

message LineItem {
    string productId = 1;
    string name = 2;
    int32 quantity = 3;
}

// The item added event.
message ItemAdded {
    LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
    string productId = 1;
}

// The shopping cart state.
message Cart {
    repeated LineItem items = 1;
}

State

Each entity should store its state locally in a mutable variable, either a mutable field or a multiple structure such as an array type or slice. For our shopping cart, the state is a slice of products, so we’ll create a slice of LineItems to contain that:

type ShoppingCart struct {
	// our domain object
	cart []*domain.LineItem
}

Constructing

The Cloudstate Go Support Library needs to know how to construct and initialize entities. For this, an entity has to provide a factory function, EntityFunc, which is set during registration of the event sourced entity.

err = server.RegisterEventSourced(&eventsourced.Entity{
	ServiceName:   "com.example.shoppingcart.ShoppingCart",
	PersistenceID: "ShoppingCart",
	EntityFunc:    shoppingcart.NewShoppingCart,
}, protocol.DescriptorConfig{
	Service: "shoppingcart.proto",
}.AddDomainDescriptor("domain.proto"))

The entity factory function returns a eventsourced.EntityHandler which handles commands and events.

func NewShoppingCart(eventsourced.EntityID) eventsourced.EntityHandler {
	return &ShoppingCart{
		cart: make([]*domain.LineItem, 0),
	}
}

Handling commands

An event sourced entity implements the eventsourced.EntityHandler interface and there for command handling the HandleCommand method. The command types received by an event sourced entity are declared by the gRPC Server interface which is generated from the protobuf definitions. The Cloudstate Go Support library together with the registered eventsourced.Entity is then able to dispatch commands it gets from the Cloudstate proxy.

func (sc *ShoppingCart) HandleCommand(ctx *eventsourced.Context, name string, cmd proto.Message) (proto.Message, error) {
	switch c := cmd.(type) {
	case *GetShoppingCart:
		return sc.GetCart(ctx, c)
	case *RemoveLineItem:
		return sc.RemoveItem(ctx, c)
	case *AddLineItem:
		return sc.AddItem(ctx, c)
	default:
		return nil, nil
	}
}

The return type of the command handler is by definition of the service interface, the output type for the gRPC service call. This will be sent as the reply.

The following shows the implementation of the GetCart command handler. This command handler is a read-only command handler, it doesn’t emit any events, it just returns some state:

func (sc *ShoppingCart) GetCart(*eventsourced.Context, *GetShoppingCart) (*Cart, error) {
	cart := &Cart{}
	for _, item := range sc.cart {
		cart.Items = append(cart.Items, &LineItem{
			ProductId: item.ProductId,
			Name:      item.Name,
			Quantity:  item.Quantity,
		})
	}
	return cart, nil
}

Emitting events

Commands that modify the state may do so by emitting events.

The only way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and when the entity is passivated and next reloaded, those modifications will not be present.

A command handler may emit an event by using the eventsourced.Context and invoking the Emit method on it. Calling Emit will immediately invoke the associated event handler for that event - this both validates that the event can be applied to the current state, as well as updates the state so that subsequent processing in the command handler can use it.

Here’s an example of a command handler that emits an event:

func (sc *ShoppingCart) AddItem(ctx *eventsourced.Context, li *AddLineItem) (*empty.Empty, error) {
	if li.GetQuantity() <= 0 {
		return nil, fmt.Errorf("cannot add negative quantity of to item %q", li.GetProductId())
	}
	ctx.Emit(&domain.ItemAdded{
		Item: &domain.LineItem{
			ProductId: li.ProductId,
			Name:      li.Name,
			Quantity:  li.Quantity,
		}})
	return &empty.Empty{}, nil
}

This command handler also validates the command, ensuring the quantity of items added is greater than zero. Returning an error fails the command and the support library takes care of signaling that back to the requesting proxy as a protocol.Failure reply.

Handling events

Event handlers are invoked at two points, when restoring entities from the journal, before any commands are handled, and each time a new event is emitted. An event handler’s responsibility is to update the state of the entity according to the event. Event handlers are the only place where it’s safe to mutate the state of the entity at all.

Every event sourced entity implements the eventsourced.EntityHandlers interface HandleEvent method. Events emitted by command handlers get dispatched to the implemented event handler which then decides how to proceed with the event.

func (sc *ShoppingCart) HandleEvent(ctx *eventsourced.Context, event interface{}) error {
	switch e := event.(type) {
	case *domain.ItemAdded:
		return sc.ItemAdded(e)
	case *domain.ItemRemoved:
		return sc.ItemRemoved(e)
	default:
		return nil
	}
}

Here’s an example of a concrete event handler for the ItemAdded event.

func (sc *ShoppingCart) ItemAdded(added *domain.ItemAdded) error {
	if added.Item.GetName() == "FAIL" {
		return errors.New("boom: forced an unexpected error")
	}
	if item, _ := sc.find(added.Item.ProductId); item != nil {
		item.Quantity += added.Item.Quantity
		return nil
	}
	sc.cart = append(sc.cart, &domain.LineItem{
		ProductId: added.Item.ProductId,
		Name:      added.Item.Name,
		Quantity:  added.Item.Quantity,
	})
	return nil
}

Producing and handling snapshots

Snapshots are an important optimisation for event sourced entities that may contain many events, to ensure that they can be loaded quickly even when they have very long journals. To produce a snapshot, the eventsourced.Snapshooter interface has to be implemented that must return a snapshot of the current state in serializable form.

// A Snapshooter enables eventsourced snapshots to be taken and as well
// handling snapshots provided.
type Snapshooter interface {
	// Snapshot is a recording of the entire current state of an entity,
	// persisted periodically (eg, every 100 events), as an optimization.
	// With snapshots, when the entity is reloaded from the journal, the
	// entire journal doesn't need to be replayed, just the changes since
	// the last snapshot.
	Snapshot(ctx *Context) (snapshot interface{}, err error)
	// HandleSnapshot is used to apply snapshots provided by the Cloudstate
	// proxy.
	HandleSnapshot(ctx *Context, snapshot interface{}) error
}

Here is an example of the shopping cart example creating snapshots for the current domain.Cart state of the shopping cart.

func (sc *ShoppingCart) Snapshot(*eventsourced.Context) (snapshot interface{}, err error) {
	return &domain.Cart{
		Items: append(make([]*domain.LineItem, 0, len(sc.cart)), sc.cart...),
	}, nil
}

When the entity is loaded again, the snapshot will first be loaded before any other events are received, and passed to a snapshot handler implementing the eventsourced.Snapshooter`s `HandleSnapshot method. A snapshot handler then can type-switch over types the corresponding eventsourced.Snapshooter interface has implemented.

func (sc *ShoppingCart) HandleSnapshot(ctx *eventsourced.Context, snapshot interface{}) error {
	switch value := snapshot.(type) {
	case *domain.Cart:
		sc.cart = append(sc.cart[:0], value.Items...)
		return nil
	default:
		return fmt.Errorf("unknown snapshot type: %v", value)
	}
}

Registering the entity

Once you’ve created your entity, you can register it with the cloudstate.CloudState server, by invoking the RegisterEventSourced method of a CloudState instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the domain.proto descriptor.

During registration the optional ServiceName and the ServiceVersion can be configured. (TODO: give an example on how to pick values for these after the spec defines semantics )

err = server.RegisterEventSourced(&eventsourced.Entity{
	ServiceName:   "com.example.shoppingcart.ShoppingCart",
	PersistenceID: "ShoppingCart",
	EntityFunc:    shoppingcart.NewShoppingCart,
}, protocol.DescriptorConfig{
	Service: "shoppingcart.proto",
}.AddDomainDescriptor("domain.proto"))