Event sourcing

This page documents how to implement Cloudstate event sourced entities in Go. For information on what Cloudstate event sourced entities are, please read the general Event sourcing documentation first.

An event sourced entity can be created by embedding the cloudstate.EventEmitter type and also implementing the cloudstate.Entity interface.

type ShoppingCart struct {
	// our domain object
	cart []*domain.LineItem
	// as an Emitter we can emit events
	cloudstate.EventEmitter
}

Then by composing the Cloudstate entity with a cloudstate.EventSourcedEntity and register it with CloudState.Register(), your entity gets configured to be an event sourced entity and handled by the Cloudstate instance from now on.

type EventSourcedEntity struct {
	// ServiceName is the fully qualified name of the service that implements this entities interface.
	// Setting it is mandatory.
	ServiceName string
	// PersistenceID is used to namespace events in the journal, useful for
	// when you share the same database between multiple entities. It defaults to
	// the simple name for the entity type.
	// It’s good practice to select one explicitly, this means your database
	// isn’t depend on type names in your code.
	// Setting it is mandatory.
	PersistenceID string
	// The snapshotEvery parameter controls how often snapshots are taken,
	// so that the entity doesn't need to be recovered from the whole journal
	// each time it’s loaded. If left unset, it defaults to 100.
	// Setting it to a negative number will result in snapshots never being taken.
	SnapshotEvery int64

	// EntityFactory is a factory method which generates a new Entity.
	EntityFunc func() Entity

	// internal
	registerOnce sync.Once
}

The ServiceName is the fully qualified name of the gRPC service that implements this entity’s interface. Setting it is mandatory.

The PersistenceID is used to namespace events in the journal, useful for when you share the same database between multiple entities. It is recommended to be the name for the entity type (in this case, ShoppingCart). Setting it is mandatory.

The SnapshotEvery parameter controls how often snapshots are taken, so that the entity doesn’t need to be recovered from the whole journal each time it’s loaded. If left unset, it defaults to 100. Setting it to a negative number will result in snapshots never being taken.

The EntityFunc is a factory method which generates a new Entity whenever Cloudstate has to initialize a new entity.

Persistence types and serialization

Event sourced entities persist events and snapshots, and these need to be serialized when persisted. The most straightforward way to persist events and snapshots is to use protobufs. Cloudstate will automatically detect if an emitted event is a protobuf, and serialize it as such. For other serialization options, including JSON, see Serialization.

While protobufs are the recommended format for persisting events, it is recommended that you do not persist your service’s protobuf messages, rather, you should create new messages, even if they are identical to the service’s. While this may introduce some overhead in needing to convert from one type to the other, the reason for doing this is that it will allow the service’s public interface to evolve independently from its data storage format, which should be private.

For our shopping cart example, we’ll create a new file called domain.proto, the name domain is selected to indicate that these are my application’s domain objects:

// These are the messages that get persisted - the events, plus the current state (Cart) for snapshots.
syntax = "proto3";

package com.example.shoppingcart.persistence;

option go_package = "persistence";

message LineItem {
    string productId = 1;
    string name = 2;
    int32 quantity = 3;
}

// The item added event.
message ItemAdded {
    LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
    string productId = 1;
}

// The shopping cart state.
message Cart {
    repeated LineItem items = 1;
}

State

Each entity should store its state locally in a mutable variable, either a mutable field or a multiple structure such as an array type or slice. For our shopping cart, the state is a slice of products, so we’ll create a slice of LineItems to contain that:

cart []*domain.LineItem

Constructing

The Cloudstate Go Support Library needs to know how to construct and initialize entities. For this, an entity has to provide a factory function, EntityFunc, which is set during registration of the event sourced entity.

err = server.RegisterEventSourcedEntity(
	&cloudstate.EventSourcedEntity{
		ServiceName:   "com.example.shoppingcart.ShoppingCart",
		PersistenceID: "ShoppingCart",
		EntityFunc:    NewShoppingCart,
	},
	cloudstate.DescriptorConfig{
		Service: "shoppingcart/shoppingcart.proto",
	}.AddDomainDescriptor("domain.proto"),
)

The entity factory function returns a cloudstate.Entity which is composed of two interfaces to handle commands and events.

func NewShoppingCart() cloudstate.Entity {
	return &ShoppingCart{
		cart:         make([]*domain.LineItem, 0),
		EventEmitter: cloudstate.NewEmitter(), // TODO: the EventEmitter could be provided by the event sourced handler
	}
}

Handling commands

An event sourced entity implements the composed cloudstate.Entity interface. cloudstate.Entity embeds the cloudstate.EventHandler interface and therefore entities implementing it get commands from Cloudstate through the event handler’s HandleCommand method.

The command types received by an event sourced entity are declared by the gRPC Server interface which is generated from the protobuf definitions. The Cloudstate Go Support library together with the registered cloudstate.EventSourcedEntity is then able to dispatch commands it gets from the Cloudstate proxy to the event sourced entity.

func (sc *ShoppingCart) HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error) {
	switch cmd := command.(type) {
	case *shoppingcart.GetShoppingCart:
		reply, err := sc.GetCart(ctx, cmd)
		return true, reply, err
	case *shoppingcart.RemoveLineItem:
		reply, err := sc.RemoveItem(ctx, cmd)
		return true, reply, err
	case *shoppingcart.AddLineItem:
		reply, err := sc.AddItem(ctx, cmd)
		return true, reply, err
	default:
		return false, reply, err
	}
}

The return type of the command handler is by definition of the service interface, the output type for the gRPC service call, this will be sent as the reply.

The following shows the implementation of the GetCart command handler. This command handler is a read-only command handler, it doesn’t emit any events, it just returns some state:

func (sc *ShoppingCart) GetCart(_ context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) {
	cart := &shoppingcart.Cart{}
	for _, item := range sc.cart {
		cart.Items = append(cart.Items, &shoppingcart.LineItem{
			ProductId: item.ProductId,
			Name:      item.Name,
			Quantity:  item.Quantity,
		})
	}
	return cart, nil
}

Emitting events

Commands that modify the state may do so by emitting events.

The only way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and when the entity is passivated and next reloaded, those modifications will not be present.

A command handler may emit an event by using the embedded cloudstate.EventEmitter and invoking the Emit method on it. Calling Emit will immediately invoke the associated event handler for that event - this both validates that the event can be applied to the current state, as well as updates the state so that subsequent processing in the command handler can use it.

Here’s an example of a command handler that emits an event:

func (sc *ShoppingCart) AddItem(_ context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) {
	if li.GetQuantity() <= 0 {
		return nil, fmt.Errorf("cannot add negative quantity of to item %s", li.GetProductId())
	}
	sc.Emit(&domain.ItemAdded{
		Item: &domain.LineItem{
			ProductId: li.ProductId,
			Name:      li.Name,
			Quantity:  li.Quantity,
		}})
	return &empty.Empty{}, nil
}

This command handler also validates the command, ensuring the quantity of items added is greater than zero. Returning an error fails the command and the support library takes care of signaling that back to the requesting proxy as a Failure reply.

Handling events

Event handlers are invoked at two points, when restoring entities from the journal, before any commands are handled, and each time a new event is emitted. An event handler’s responsibility is to update the state of the entity according to the event. Event handlers are the only place where it’s safe to mutate the state of the entity at all.

Event handlers are declared by implementing the cloudstate.EventHandler interface.

type EventHandler interface {
	HandleEvent(ctx context.Context, event interface{}) (handled bool, err error)
}

Events emitted by command handlers get dispatched to the implemented event handler which then decides how to proceed with the event.

func (sc *ShoppingCart) HandleEvent(_ context.Context, event interface{}) (handled bool, err error) {
	switch e := event.(type) {
	case *domain.ItemAdded:
		return true, sc.ItemAdded(e)
	case *domain.ItemRemoved:
		return true, sc.ItemRemoved(e)
	default:
		return false, nil
	}
}

Here’s an example of a concrete event handler for the ItemAdded event.

func (sc *ShoppingCart) ItemAdded(added *domain.ItemAdded) error { // TODO: enable handling for values
	if item, _ := sc.find(added.Item.ProductId); item != nil {
		item.Quantity += added.Item.Quantity
	} else {
		sc.cart = append(sc.cart, &domain.LineItem{
			ProductId: added.Item.ProductId,
			Name:      added.Item.Name,
			Quantity:  added.Item.Quantity,
		})
	}
	return nil
}

Producing and handling snapshots

Snapshots are an important optimisation for event sourced entities that may contain many events, to ensure that they can be loaded quickly even when they have very long journals. To produce a snapshot, the cloudstate.Snapshotter interface has to be implemented that must return a snapshot of the current state in serializable form.

type Snapshotter interface {
	Snapshot() (snapshot interface{}, err error)
}

Here is an example of the TCK shopping cart example creating snapshots for the current domain.Cart state of the shopping cart.

func (sc *ShoppingCart) Snapshot() (snapshot interface{}, err error) {
	return domain.Cart{
		Items: append(make([]*domain.LineItem, len(sc.cart)), sc.cart...),
	}, nil
}

When the entity is loaded again, the snapshot will first be loaded before any other events are received, and passed to a snapshot handler. Snapshot handlers are declared by implementing the cloudstate.SnapshotHandler interface.

type SnapshotHandler interface {
	HandleSnapshot(snapshot interface{}) (handled bool, err error)
}

A snapshot handler then can type-switch over types the corresponding cloudstate.Snapshotter interface has implemented.

func (sc *ShoppingCart) HandleSnapshot(snapshot interface{}) (handled bool, err error) {
	switch value := snapshot.(type) {
	case domain.Cart:
		sc.cart = append(sc.cart[:0], value.Items...)
		return true, nil
	default:
		return false, nil
	}
}

Registering the entity

Once you’ve created your entity, you can register it with the cloudstate.Cloudstate server, by invoking the Register method of a Cloudstate instance. In addition to passing your entity type and service name, you also need to pass any descriptors that you use for persisting events, for example, the domain.proto descriptor.

During registration the optional ServiceName and the ServiceVersion can be configured. (TODO: give an example on how to pick values for these after the spec defines semantics )

err = server.RegisterEventSourcedEntity(
	&cloudstate.EventSourcedEntity{
		ServiceName:   "com.example.shoppingcart.ShoppingCart",
		PersistenceID: "ShoppingCart",
		EntityFunc:    NewShoppingCart,
	},
	cloudstate.DescriptorConfig{
		Service: "shoppingcart/shoppingcart.proto",
	}.AddDomainDescriptor("domain.proto"),
)