diff --git a/README.md b/README.md index 7cb731e..0f0af2b 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -# CloudState stateful service support in Go -[![Build Status](https://travis-ci.com/marcellanz/go-support.svg?branch=feature%2Fgo-support)](https://travis-ci.com/marcellanz/go-support) -[![codecov](https://codecov.io/gh/marcellanz/go-support/branch/master/graph/badge.svg)](https://codecov.io/gh/marcellanz/go-support) -[![GoDoc](https://godoc.org/github.com/marcellanz/go-support?status.svg)](https://godoc.org/github.com/marcellanz/go-support) +# Cloudstate stateful service support in Go +[![Build Status](https://travis-ci.com/cloudstateio/go-support.svg)](https://travis-ci.com/cloudstateio/go-support) +[![codecov](https://codecov.io/gh/cloudstateio/go-support/branch/master/graph/badge.svg)](https://codecov.io/gh/cloudstateio/go-support) +[![GoDoc](https://godoc.org/github.com/cloudstateio/go-support/cloudstate?status.svg)](https://godoc.org/github.com/cloudstateio/go-support/cloudstate) -This package provides support for writing [CloudState](https://github.com/cloudstateio/cloudstate) stateful functions in Go. +This package provides support for writing [Cloudstate](https://github.com/cloudstateio/cloudstate) stateful functions in Go. For more information see https://cloudstate.io. \ No newline at end of file diff --git a/cloudstate/cloudstate.go b/cloudstate/cloudstate.go index 9813f2d..404851f 100644 --- a/cloudstate/cloudstate.go +++ b/cloudstate/cloudstate.go @@ -45,10 +45,14 @@ type CloudState struct { } // New returns a new CloudState instance. -func New(options Options) (*CloudState, error) { +func New(config Config) (*CloudState, error) { + eds, err := newEntityDiscoveryServer(config) + if err != nil { + return nil, err + } cs := &CloudState{ server: grpc.NewServer(), - entityDiscoveryServer: newEntityDiscoveryResponder(options), + entityDiscoveryServer: eds, eventSourcedServer: newEventSourcedServer(), } protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryServer) @@ -56,8 +60,8 @@ func New(options Options) (*CloudState, error) { return cs, nil } -// Options go get a CloudState instance configured. -type Options struct { +// Config go get a CloudState instance configured. +type Config struct { ServiceName string ServiceVersion string } @@ -65,7 +69,6 @@ type Options struct { // DescriptorConfig configures service and dependent descriptors. type DescriptorConfig struct { Service string - ServiceMsg descriptor.Message Domain []string DomainMessages []descriptor.Message } @@ -83,7 +86,7 @@ func (dc DescriptorConfig) AddDomainDescriptor(filename string) DescriptorConfig // RegisterEventSourcedEntity registers an event sourced entity for CloudState. func (cs *CloudState) RegisterEventSourcedEntity(ese *EventSourcedEntity, config DescriptorConfig) (err error) { ese.registerOnce.Do(func() { - if err = ese.initZeroValue(); err != nil { + if err = ese.init(); err != nil { return } if err = cs.eventSourcedServer.registerEntity(ese); err != nil { @@ -116,60 +119,46 @@ func (cs *CloudState) Run() error { return nil } -// EntityDiscoveryServer implements the CloudState discovery protocol. +// EntityDiscoveryServer implements the Cloudstate discovery protocol. type EntityDiscoveryServer struct { fileDescriptorSet *filedescr.FileDescriptorSet entitySpec *protocol.EntitySpec message *descriptor.Message } -// newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryServer. -func newEntityDiscoveryResponder(options Options) *EntityDiscoveryServer { - responder := &EntityDiscoveryServer{} - responder.entitySpec = &protocol.EntitySpec{ +// newEntityDiscoveryServer returns a new and initialized EntityDiscoveryServer. +func newEntityDiscoveryServer(config Config) (*EntityDiscoveryServer, error) { + svr := &EntityDiscoveryServer{} + svr.entitySpec = &protocol.EntitySpec{ Entities: make([]*protocol.Entity, 0), ServiceInfo: &protocol.ServiceInfo{ - ServiceName: options.ServiceName, - ServiceVersion: options.ServiceVersion, + ServiceName: config.ServiceName, + ServiceVersion: config.ServiceVersion, ServiceRuntime: fmt.Sprintf("%s %s/%s", runtime.Version(), runtime.GOOS, runtime.GOARCH), SupportLibraryName: SupportLibraryName, SupportLibraryVersion: SupportLibraryVersion, }, } - responder.fileDescriptorSet = &filedescr.FileDescriptorSet{ + svr.fileDescriptorSet = &filedescr.FileDescriptorSet{ File: make([]*filedescr.FileDescriptorProto, 0), } - return responder + return svr, nil } -// Discover returns an entity spec for -func (r *EntityDiscoveryServer) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) { - log.Printf("Received discovery call from sidecar [%s w%s] supporting CloudState %v.%v\n", +// Discover returns an entity spec for registered entities. +func (r *EntityDiscoveryServer) Discover(_ context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) { + log.Printf("Received discovery call from sidecar [%s w%s] supporting Cloudstate %v.%v\n", pi.ProxyName, pi.ProxyVersion, pi.ProtocolMajorVersion, pi.ProtocolMinorVersion, ) - for _, filename := range []string{ - "google/protobuf/empty.proto", - "google/protobuf/any.proto", - "google/protobuf/descriptor.proto", - "google/api/annotations.proto", - "google/api/http.proto", - "cloudstate/event_sourced.proto", - "cloudstate/entity.proto", - "cloudstate/entity_key.proto", - } { - if err := r.registerFileDescriptorProto(filename); err != nil { - return nil, err - } - } log.Printf("Responding with: %v\n", r.entitySpec.GetServiceInfo()) return r.entitySpec, nil } -// ReportError logs any user function error reported by the CloudState proxy. -func (r *EntityDiscoveryServer) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) { +// ReportError logs any user function error reported by the Cloudstate proxy. +func (r *EntityDiscoveryServer) ReportError(_ context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) { log.Printf("ReportError: %v\n", fe) return &empty.Empty{}, nil } @@ -189,12 +178,6 @@ func (r *EntityDiscoveryServer) resolveFileDescriptors(dc DescriptorConfig) erro if err := r.registerFileDescriptorProto(dc.Service); err != nil { return err } - } else { - if dc.ServiceMsg != nil { - if err := r.registerFileDescriptor(dc.ServiceMsg); err != nil { - return err - } - } } // and dependent domain descriptors for _, dp := range dc.Domain { @@ -214,24 +197,38 @@ func (r *EntityDiscoveryServer) registerEntity(e *EventSourcedEntity, config Des if err := r.resolveFileDescriptors(config); err != nil { return fmt.Errorf("failed to resolveFileDescriptor for DescriptorConfig: %+v: %w", config, err) } - persistenceID := e.entityName - if e.PersistenceID != "" { - persistenceID = e.PersistenceID - } r.entitySpec.Entities = append(r.entitySpec.Entities, &protocol.Entity{ EntityType: EventSourced, ServiceName: e.ServiceName, - PersistenceId: persistenceID, + PersistenceId: e.PersistenceID, }) return r.updateSpec() } +func (r *EntityDiscoveryServer) hasRegistered(filename string) bool { + for _, f := range r.fileDescriptorSet.File { + if f.GetName() == filename { + return true + } + } + return false +} + func (r *EntityDiscoveryServer) registerFileDescriptorProto(filename string) error { + if r.hasRegistered(filename) { + return nil + } descriptorProto, err := unpackFile(proto.FileDescriptor(filename)) if err != nil { return fmt.Errorf("failed to registerFileDescriptorProto for filename: %s: %w", filename, err) } r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, descriptorProto) + for _, fn := range descriptorProto.Dependency { + err := r.registerFileDescriptorProto(fn) + if err != nil { + return err + } + } return r.updateSpec() } @@ -240,6 +237,9 @@ func (r *EntityDiscoveryServer) registerFileDescriptor(msg descriptor.Message) e if r := recover(); r != nil { return fmt.Errorf("descriptor.ForMessage panicked (%v) for: %+v", r, msg) } + if r.hasRegistered(fd.GetName()) { + return nil + } r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, fd) return nil } diff --git a/cloudstate/cloudstate_test.go b/cloudstate/cloudstate_test.go index 53b281a..ee1fdc8 100644 --- a/cloudstate/cloudstate_test.go +++ b/cloudstate/cloudstate_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package cloudstate implements the CloudState event sourced and entity discovery protocol. +// Package cloudstate implements the Cloudstate event sourced and entity discovery protocol. package cloudstate import ( @@ -29,7 +29,7 @@ import ( ) func TestNewCloudState(t *testing.T) { - cloudState, _ := New(Options{}) + cloudState, _ := New(Config{}) si := cloudState.server.GetServiceInfo() if si == nil { t.Fail() @@ -37,7 +37,7 @@ func TestNewCloudState(t *testing.T) { } func TestEntityDiscoveryResponderDiscover(t *testing.T) { - responder := newEntityDiscoveryResponder(Options{ + server, _ := newEntityDiscoveryServer(Config{ ServiceName: "service.one", ServiceVersion: "0.0.1", }) @@ -48,7 +48,7 @@ func TestEntityDiscoveryResponderDiscover(t *testing.T) { ProxyVersion: "9.8.7", SupportedEntityTypes: nil, } - spec, err := responder.Discover(context.Background(), info) + spec, err := server.Discover(context.Background(), info) if err != nil { t.Errorf("responder.Discover returned err: %v", err) } @@ -79,12 +79,12 @@ func captureOutput(f func()) string { } func TestEntityDiscoveryResponderReportError(t *testing.T) { - responder := newEntityDiscoveryResponder(Options{ + server, _ := newEntityDiscoveryServer(Config{ ServiceName: "service.one", ServiceVersion: "0.0.1", }) output := captureOutput(func() { - empty, err := responder.ReportError(context.Background(), &protocol.UserFunctionError{ + empty, err := server.ReportError(context.Background(), &protocol.UserFunctionError{ Message: "unable to do XYZ", }) if err != nil || empty == nil { diff --git a/cloudstate/doc.go b/cloudstate/doc.go index fe79b31..929435c 100644 --- a/cloudstate/doc.go +++ b/cloudstate/doc.go @@ -13,5 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package cloudstate implements the CloudState event sourced and entity discovery protocol. +// Package cloudstate implements the Cloudstate event sourced and entity discovery protocol. package cloudstate diff --git a/cloudstate/event.go b/cloudstate/event.go index 24cef4c..134d19e 100644 --- a/cloudstate/event.go +++ b/cloudstate/event.go @@ -15,7 +15,10 @@ package cloudstate -import "fmt" +import ( + "context" + "fmt" +) type OnNext func(event interface{}) error type OnErr func(err error) @@ -84,7 +87,11 @@ func (e *eventEmitter) Clear() { } type EventHandler interface { - HandleEvent(event interface{}) (handled bool, err error) + HandleEvent(ctx context.Context, event interface{}) (handled bool, err error) +} + +type CommandHandler interface { + HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error) } type Snapshotter interface { diff --git a/cloudstate/eventsourced.go b/cloudstate/eventsourced.go index 48b0ecc..7c89001 100644 --- a/cloudstate/eventsourced.go +++ b/cloudstate/eventsourced.go @@ -17,27 +17,23 @@ package cloudstate import ( "context" - "errors" "fmt" - "github.com/cloudstateio/go-support/cloudstate/encoding" - "github.com/cloudstateio/go-support/cloudstate/protocol" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes/any" "io" "net/url" "reflect" "strings" "sync" + + "github.com/cloudstateio/go-support/cloudstate/encoding" + "github.com/cloudstateio/go-support/cloudstate/protocol" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/any" ) // Entity type Entity interface { - EntityInitializer -} - -// An EntityInitializer knows how to initialize an Entity -type EntityInitializer interface { - New() interface{} + CommandHandler + EventHandler } const snapshotEveryDefault = 100 @@ -45,20 +41,15 @@ const snapshotEveryDefault = 100 // EventSourcedEntity captures an Entity, its ServiceName and PersistenceID. // It is used to be registered as an event sourced entity on a CloudState instance. type EventSourcedEntity struct { - // Entity is a nil or Zero-Initialized reference - // to the entity to be event sourced. It has to - // implement the EntityInitializer interface - // so that CloudState can create new entity instances. - Entity Entity - // ServiceName is used to… - // Setting it is optional. + // ServiceName is the fully qualified name of the service that implements this entities interface. + // Setting it is mandatory. ServiceName string // PersistenceID is used to namespace events in the journal, useful for // when you share the same database between multiple entities. It defaults to // the simple name for the entity type. // It’s good practice to select one explicitly, this means your database // isn’t depend on type names in your code. - // Setting it is optional. + // Setting it is mandatory. PersistenceID string // The snapshotEvery parameter controls how often snapshots are taken, // so that the entity doesn't need to be recovered from the whole journal @@ -66,28 +57,17 @@ type EventSourcedEntity struct { // Setting it to a negative number will result in snapshots never being taken. SnapshotEvery int64 + // EntityFactory is a factory method which generates a new Entity. + EntityFunc func() Entity + // internal - entityName string registerOnce sync.Once } -// initZeroValue get its Entity type and Zero-Value it to +// init get its Entity type and Zero-Value it to // something we can use as an initializer. -func (e *EventSourcedEntity) initZeroValue() error { - if reflect.ValueOf(e.Entity).IsNil() { - t := reflect.TypeOf(e.Entity) - if t.Kind() == reflect.Ptr { // TODO: how deep can that go? - t = t.Elem() - } - value := reflect.New(t).Interface() - if ei, ok := value.(EntityInitializer); ok { - e.Entity = ei - } else { - return errors.New("the Entity does not implement EntityInitializer") - } - e.entityName = t.Name() - e.SnapshotEvery = snapshotEveryDefault - } +func (e *EventSourcedEntity) init() error { + e.SnapshotEvery = snapshotEveryDefault return nil } @@ -131,27 +111,25 @@ type EventSourcedServer struct { entities map[string]*EventSourcedEntity // contexts are entity instance contexts indexed by their entity ids contexts map[string]*EntityInstanceContext - // cmdMethodCache is the command handler method cache - cmdMethodCache map[string]reflect.Method } // newEventSourcedServer returns an initialized EventSourcedServer func newEventSourcedServer() *EventSourcedServer { return &EventSourcedServer{ - entities: make(map[string]*EventSourcedEntity), - contexts: make(map[string]*EntityInstanceContext), - cmdMethodCache: make(map[string]reflect.Method), + entities: make(map[string]*EventSourcedEntity), + contexts: make(map[string]*EntityInstanceContext), } } func (esh *EventSourcedServer) registerEntity(ese *EventSourcedEntity) error { + if _, exists := esh.entities[ese.ServiceName]; exists { + return fmt.Errorf("EventSourcedEntity with service name: %s is already registered", ese.ServiceName) + } esh.entities[ese.ServiceName] = ese return nil } -// Handle -// -// The stream. One stream will be established per active entity. +// Handle handles the stream. One stream will be established per active entity. // Once established, the first message sent will be Init, which contains the entity ID, and, // if the entity has previously persisted a snapshot, it will contain that snapshot. It will // then send zero to many event messages, one for each event previously persisted. The entity @@ -190,7 +168,7 @@ func (esh *EventSourcedServer) Handle(stream protocol.EventSourced_HandleServer) continue } if init := msg.GetInit(); init != nil { - if err := esh.handleInit(init, stream); err != nil { + if err := esh.handleInit(init); err != nil { failed = handleFailure(err, stream, 0) } entityId = init.GetEntityId() @@ -199,23 +177,18 @@ func (esh *EventSourcedServer) Handle(stream protocol.EventSourced_HandleServer) } } -func (esh *EventSourcedServer) handleInit(init *protocol.EventSourcedInit, server protocol.EventSourced_HandleServer) error { +func (esh *EventSourcedServer) handleInit(init *protocol.EventSourcedInit) error { eid := init.GetEntityId() if _, present := esh.contexts[eid]; present { return NewFailureError("unable to server.Send") } entity := esh.entities[init.GetServiceName()] - if initializer, ok := entity.Entity.(EntityInitializer); ok { - instance := initializer.New() - esh.contexts[eid] = &EntityInstanceContext{ - EntityInstance: &EntityInstance{ - Instance: instance, - EventSourcedEntity: entity, - }, - active: true, - } - } else { - return fmt.Errorf("unable to handle init entity.Entity does not implement EntityInitializer") + esh.contexts[eid] = &EntityInstanceContext{ + EntityInstance: &EntityInstance{ + Instance: entity.EntityFunc(), + EventSourcedEntity: entity, + }, + active: true, } if err := esh.handleInitSnapshot(init); err != nil { @@ -311,7 +284,7 @@ func (esh *EventSourcedServer) handleEvent(entityId string, event *protocol.Even return err } -// handleCommand handles a command received from the CloudState proxy. +// handleCommand handles a command received from the Cloudstate proxy. // // TODO: remove these following lines of comment // "Unary RPCs where the client sends a single request to the server and @@ -326,142 +299,79 @@ func (esh *EventSourcedServer) handleEvent(entityId string, event *protocol.Even // - a streamed flag, (TODO: for what?) // // together, these properties allow to call a method of the entities registered service and -// return its response as a reply to the CloudState proxy. +// return its response as a reply to the Cloudstate proxy. // // Events: // Beside calling the service method, we have to collect "events" the service might emit. // These events afterwards have to be handled by a EventHandler to update the state of the -// entity. The CloudState proxy can re-play these events at any time +// entity. The Cloudstate proxy can re-play these events at any time func (esh *EventSourcedServer) handleCommand(cmd *protocol.Command, server protocol.EventSourced_HandleServer) error { - // method to call - method, err := esh.methodToCall(cmd) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - entityContext := esh.contexts[cmd.GetEntityId()] - // build the input arguments for the method we're about to call - inputs, err := esh.buildInputs(entityContext, method, cmd, server.Context()) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - // call it - called := method.Func.Call(inputs) - // The gRPC implementation returns the rpc return method - // and an error as a second return value. - errReturned := called[1] - if errReturned.CanInterface() && errReturned.Interface() != nil && errReturned.Type().Name() == "error" { - // TCK says: TODO Expects entity.Failure, but gets lientAction.Action.Failure(Failure(commandId, msg))) - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: errReturned.Interface().(error).Error(), - }) - } - // the reply - callReply, err := marshalAny(called[0].Interface()) - if err != nil { // this should never happen - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: fmt.Errorf("called return value at index 0 is no proto.Message. %w", err).Error(), - }) - } - // emitted events - events, err := marshalEventsAny(entityContext) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - // snapshot - snapshot, err := esh.handleSnapshots(entityContext) - if err != nil { - return NewProtocolFailure(protocol.Failure{ - CommandId: cmd.GetId(), - Description: err.Error(), - }) - } - return sendEventSourcedReply(&protocol.EventSourcedReply{ - CommandId: cmd.GetId(), - ClientAction: &protocol.ClientAction{ - Action: &protocol.ClientAction_Reply{ - Reply: &protocol.Reply{ - Payload: callReply, - }, - }, - }, - Events: events, - Snapshot: snapshot, - }, server) -} - -func (*EventSourcedServer) buildInputs(entityContext *EntityInstanceContext, method reflect.Method, cmd *protocol.Command, ctx context.Context) ([]reflect.Value, error) { - inputs := make([]reflect.Value, method.Type.NumIn()) - inputs[0] = reflect.ValueOf(entityContext.EntityInstance.Instance) - inputs[1] = reflect.ValueOf(ctx) - // create a zero-value for the type of the message we call the method with - arg1 := method.Type.In(2) - ptr := false - for arg1.Kind() == reflect.Ptr { - ptr = true - arg1 = arg1.Elem() + msgName := strings.TrimPrefix(cmd.Payload.GetTypeUrl(), protoAnyBase+"/") + messageType := proto.MessageType(msgName) + if messageType.Kind() != reflect.Ptr { + return fmt.Errorf("messageType: %s is of non Ptr kind", messageType) } - var msg proto.Message - if ptr { - msg = reflect.New(arg1).Interface().(proto.Message) - } else { - msg = reflect.Zero(arg1).Interface().(proto.Message) - } - if err := proto.Unmarshal(cmd.GetPayload().GetValue(), msg); err != nil { - return nil, fmt.Errorf("failed to unmarshal: %w", err) - } - inputs[2] = reflect.ValueOf(msg) - return inputs, nil -} - -func (esh *EventSourcedServer) methodToCall(cmd *protocol.Command) (reflect.Method, error) { - entityContext := esh.contexts[cmd.GetEntityId()] - cacheKey := entityContext.ServiceName() + cmd.Name - method, hit := esh.cmdMethodCache[cacheKey] - // as measured this cache saves us about 75% of a call - // to be prepared with 4.4µs vs. 17.6µs where a typical - // call by reflection like GetCart() with Func.Call() - // takes ~10µs and to get return values processed somewhere 0.7µs. - if !hit { - entityValue := reflect.ValueOf(entityContext.EntityInstance.Instance) - // entities implement the proxied grpc service - // we try to find the method we're called by name with the - // received command. - methodByName := entityValue.MethodByName(cmd.Name) - if !methodByName.IsValid() { - entity := esh.entities[entityContext.ServiceName()] - return reflect.Method{}, fmt.Errorf("no method named: %s found for: %v", cmd.Name, entity) - } - // gRPC services are unary rpc methods, always. - // They have one message in and one message out. - if err := checkUnary(methodByName); err != nil { - return reflect.Method{}, err - } - // The first argument in the gRPC implementation - // is always a context.Context. - methodArg0Type := methodByName.Type().In(0) - contextType := reflect.TypeOf(context.Background()) - if !contextType.Implements(methodArg0Type) { - return reflect.Method{}, fmt.Errorf( - "first argument for method: %s is not of type: %s", - methodByName.String(), contextType.Name(), - ) + // get a zero-ed message of this type + if message, ok := reflect.New(messageType.Elem()).Interface().(proto.Message); ok { + // and marshal onto it what we got as an any.Any onto it + err := proto.Unmarshal(cmd.Payload.Value, message) + if err != nil { + return fmt.Errorf("%s, %w", err, ErrMarshal) + } else { + // we're ready to handle the proto message + entityContext := esh.contexts[cmd.GetEntityId()] + if commandHandler, ok := entityContext.EntityInstance.Instance.(CommandHandler); ok { + // The gRPC implementation returns the rpc return method + // and an error as a second return value. + _, reply, errReturned := commandHandler.HandleCommand(server.Context(), message) + // the error + if errReturned != nil { + // TCK says: TODO Expects entity.Failure, but gets lientAction.Action.Failure(Failure(commandId, msg))) + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: errReturned.Error(), + }) + } + // the reply + callReply, err := marshalAny(reply) + if err != nil { // this should never happen + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: fmt.Errorf("called return value at index 0 is no proto.Message. %w", err).Error(), + }) + } + // emitted events + events, err := marshalEventsAny(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + // snapshot + snapshot, err := esh.handleSnapshots(entityContext) + if err != nil { + return NewProtocolFailure(protocol.Failure{ + CommandId: cmd.GetId(), + Description: err.Error(), + }) + } + return sendEventSourcedReply(&protocol.EventSourcedReply{ + CommandId: cmd.GetId(), + ClientAction: &protocol.ClientAction{ + Action: &protocol.ClientAction_Reply{ + Reply: &protocol.Reply{ + Payload: callReply, + }, + }, + }, + Events: events, + Snapshot: snapshot, + }, server) + } } - // we'll find one for sure as we found one on the entityValue - method, _ = reflect.TypeOf(entityContext.EntityInstance.Instance).MethodByName(cmd.Name) - esh.cmdMethodCache[cacheKey] = method } - return method, nil + return nil } func (*EventSourcedServer) handleSnapshots(entityContext *EntityInstanceContext) (*any.Any, error) { @@ -486,13 +396,6 @@ func (*EventSourcedServer) handleSnapshots(entityContext *EntityInstanceContext) return nil, nil } -func checkUnary(methodByName reflect.Value) error { - if methodByName.Type().NumIn() != 2 { - return NewFailureError("method: %s is no unary method", methodByName.String()) - } - return nil -} - // applyEvent applies an event to a local entity func (esh EventSourcedServer) applyEvent(entityInstance *EntityInstance, event interface{}) error { payload, err := marshalAny(event) @@ -502,13 +405,6 @@ func (esh EventSourcedServer) applyEvent(entityInstance *EntityInstance, event i return esh.handleEvents(entityInstance, &protocol.EventSourcedEvent{Payload: payload}) } -// handleEvents handles a list of events encoded as protobuf Any messages. -// -// Event sourced entities persist events and snapshots, and these need to be -// serialized when persisted. The most straight forward way to persist events -// and snapshots is to use protobufs. CloudState will automatically detect if -// an emitted event is a protobuf, and serialize it as such. For other -// serialization options, including JSON, see Serialization. func (EventSourcedServer) handleEvents(entityInstance *EntityInstance, events ...*protocol.EventSourcedEvent) error { eventHandler, implementsEventHandler := entityInstance.Instance.(EventHandler) for _, event := range events { @@ -526,35 +422,12 @@ func (EventSourcedServer) handleEvents(entityInstance *EntityInstance, events .. } else { // we're ready to handle the proto message // and we might have a handler - handled := false if implementsEventHandler { - handled, err = eventHandler.HandleEvent(message) + _, err = eventHandler.HandleEvent(context.Background(), message) // TODO: propagate ctx from callee if err != nil { return err // FIXME/TODO: is this correct? if we fail here, nothing is safe afterwards. } } - // if not, we try to find one - // currently we support a method that has one argument that equals - // to the type of the message received. - if !handled { - // find a concrete handling method - entityValue := reflect.ValueOf(entityInstance.Instance) - entityType := entityValue.Type() - for n := 0; n < entityType.NumMethod(); n++ { - method := entityType.Method(n) - // we expect one argument for now, the domain message - // the first argument is the receiver itself - if method.Func.Type().NumIn() == 2 { - argumentType := method.Func.Type().In(1) - if argumentType.AssignableTo(messageType) { - entityValue.MethodByName(method.Name).Call([]reflect.Value{reflect.ValueOf(message)}) - } - } else { - // we have not found a one-argument method matching the events type as an argument - // TODO: what to do here? we might support more variations of possible handlers we can detect - } - } - } } } } // TODO: what do we do if we haven't handled the events? diff --git a/cloudstate/eventsourced_test.go b/cloudstate/eventsourced_test.go index 5924169..8349cfe 100644 --- a/cloudstate/eventsourced_test.go +++ b/cloudstate/eventsourced_test.go @@ -19,15 +19,16 @@ import ( "context" "errors" "fmt" + "os" + "sync" + "testing" + "github.com/cloudstateio/go-support/cloudstate/encoding" "github.com/cloudstateio/go-support/cloudstate/protocol" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/any" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" - "os" - "sync" - "testing" ) type TestEntity struct { @@ -35,6 +36,18 @@ type TestEntity struct { EventEmitter } +func (inc TestEntity) HandleCommand(_ context.Context, command interface{}) (handled bool, reply interface{}, err error) { + switch cmd := command.(type) { + case *IncrementByCommand: + reply, err := inc.IncrementByCommand(nil, cmd) + return true, reply, err + case *DecrementByCommand: + reply, err := inc.DecrementByCommand(nil, cmd) + return true, reply, err + } + return +} + func (inc TestEntity) String() string { return proto.CompactTextString(inc) } @@ -80,11 +93,6 @@ func resetTestEntity() { } } -func (te *TestEntity) New() interface{} { - testEntity.Value = 0 - return testEntity -} - // IncrementByCommand with value receiver func (te TestEntity) IncrementByCommand(_ context.Context, ibc *IncrementByCommand) (*empty.Empty, error) { te.Emit(&IncrementByEvent{ @@ -134,11 +142,14 @@ func (te *TestEntity) DecrementByEvent(d *DecrementByEvent) error { return err } -func (te *TestEntity) HandleEvent(event interface{}) (handled bool, err error) { +func (te *TestEntity) HandleEvent(_ context.Context, event interface{}) (handled bool, err error) { switch e := event.(type) { case *IncrementByEvent: _, err := te.IncrementBy(e.Value) return true, err + case *DecrementByEvent: + _, err := te.DecrementBy(e.Value) + return true, err default: return false, nil } @@ -191,12 +202,16 @@ func (t TestEventSourcedHandleServer) Recv() (*protocol.EventSourcedStreamIn, er func newHandler(t *testing.T) *EventSourcedServer { handler := newEventSourcedServer() entity := EventSourcedEntity{ - Entity: (*TestEntity)(nil), + EntityFunc: func() Entity { + resetTestEntity() + testEntity.Value = 0 + return testEntity + }, ServiceName: "TestEventSourcedServer-Service", SnapshotEvery: 0, registerOnce: sync.Once{}, } - err := entity.initZeroValue() + err := entity.init() if err != nil { t.Errorf("%v", err) } @@ -211,7 +226,7 @@ func initHandler(handler *EventSourcedServer, t *testing.T) { err := handler.handleInit(&protocol.EventSourcedInit{ ServiceName: "TestEventSourcedServer-Service", EntityId: "entity-0", - }, nil) + }) if err != nil { t.Errorf("%v", err) t.Fail() @@ -230,7 +245,8 @@ func TestMain(m *testing.M) { proto.RegisterType((*IncrementByEvent)(nil), "IncrementByEvent") proto.RegisterType((*DecrementByEvent)(nil), "DecrementByEvent") proto.RegisterType((*TestEntity)(nil), "TestEntity") - resetTestEntity() + proto.RegisterType((*IncrementByCommand)(nil), "IncrementByCommand") + proto.RegisterType((*DecrementByCommand)(nil), "DecrementByCommand") defer resetTestEntity() os.Exit(m.Run()) } @@ -245,9 +261,6 @@ func TestErrSend(t *testing.T) { func TestSnapshot(t *testing.T) { resetTestEntity() handler := newHandler(t) - if testEntity.Value >= 0 { - t.Fatalf("testEntity.Value should be <0 but was not: %+v", testEntity) - } primitive, err := encoding.MarshalPrimitive(int64(987)) if err != nil { t.Fatalf("%v", err) @@ -259,7 +272,7 @@ func TestSnapshot(t *testing.T) { SnapshotSequence: 0, Snapshot: primitive, }, - }, nil) + }) if err != nil { t.Fatalf("%v", err) } @@ -271,13 +284,7 @@ func TestSnapshot(t *testing.T) { func TestEventSourcedServerHandlesCommandAndEvents(t *testing.T) { resetTestEntity() handler := newHandler(t) - if testEntity.Value >= 0 { - t.Fatalf("testEntity.Value should be <0 but was not: %+v", testEntity) - } initHandler(handler, t) - if testEntity.Value != 0 { - t.Fatalf("testEntity.Value should be 0 but was not: %+v", testEntity) - } incrementedTo := int64(7) incrCmdValue, err := marshal(&IncrementByCommand{Amount: incrementedTo}, t) incrCommand := protocol.Command{ diff --git a/cloudstate/marshal_proto.go b/cloudstate/marshal_proto.go index 5e87b7c..3893b30 100644 --- a/cloudstate/marshal_proto.go +++ b/cloudstate/marshal_proto.go @@ -23,7 +23,7 @@ import ( // marshalAny marshals a proto.Message to a any.Any value. func marshalAny(pb interface{}) (*any.Any, error) { - // TODO: protobufs are expected here, but CloudState supports other formats + // TODO: protobufs are expected here, but Cloudstate supports other formats message, ok := pb.(proto.Message) if !ok { return nil, fmt.Errorf("got a non-proto message as protobuf: %v", pb) diff --git a/tck/cmd/tck_shoppingcart/shoppingcart.go b/tck/cmd/tck_shoppingcart/shoppingcart.go index 0741b92..2bcad46 100644 --- a/tck/cmd/tck_shoppingcart/shoppingcart.go +++ b/tck/cmd/tck_shoppingcart/shoppingcart.go @@ -31,17 +31,18 @@ import ( // main creates a CloudState instance and registers the ShoppingCart // as a event sourced entity. func main() { - cloudState, err := cloudstate.New(cloudstate.Options{ + server, err := cloudstate.New(cloudstate.Config{ ServiceName: "shopping-cart", ServiceVersion: "0.1.0", }) if err != nil { log.Fatalf("CloudState.New failed: %v", err) } - err = cloudState.RegisterEventSourcedEntity( + err = server.RegisterEventSourcedEntity( &cloudstate.EventSourcedEntity{ - Entity: (*ShoppingCart)(nil), - ServiceName: "com.example.shoppingcart.ShoppingCart", + ServiceName: "com.example.shoppingcart.ShoppingCart", + PersistenceID: "ShoppingCart", + EntityFunc: NewShoppingCart, }, cloudstate.DescriptorConfig{ Service: "shoppingcart/shoppingcart.proto", @@ -51,7 +52,7 @@ func main() { if err != nil { log.Fatalf("CloudState failed to register entity: %v", err) } - err = cloudState.Run() + err = server.Run() if err != nil { log.Fatalf("CloudState failed to run: %v", err) } @@ -65,15 +66,8 @@ type ShoppingCart struct { cloudstate.EventEmitter } -// New implements EntityInitializer and returns a new -// and initialized instance of the ShoppingCart entity. -func (sc ShoppingCart) New() interface{} { - return NewShoppingCart() -} - -// NewShoppingCart returns a new and initialized -// instance of the ShoppingCart entity. -func NewShoppingCart() *ShoppingCart { +// NewShoppingCart returns a new and initialized instance of the ShoppingCart entity. +func NewShoppingCart() cloudstate.Entity { return &ShoppingCart{ cart: make([]*domain.LineItem, 0), EventEmitter: cloudstate.NewEmitter(), // TODO: the EventEmitter could be provided by the event sourced handler @@ -107,19 +101,19 @@ func (sc *ShoppingCart) ItemRemoved(removed *domain.ItemRemoved) error { // // returns handle set to true if we have handled the event // and any error that happened during the handling -func (sc *ShoppingCart) HandleEvent(event interface{}) (handled bool, err error) { +func (sc *ShoppingCart) HandleEvent(_ context.Context, event interface{}) (handled bool, err error) { switch e := event.(type) { case *domain.ItemAdded: return true, sc.ItemAdded(e) - //case *domain.ItemRemoved: - // *domain.ItemRemoved is handled by reflection + case *domain.ItemRemoved: + return true, sc.ItemRemoved(e) default: return false, nil } } // AddItem implements the AddItem command handling of the shopping cart service. -func (sc *ShoppingCart) AddItem(c context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) { +func (sc *ShoppingCart) AddItem(_ context.Context, li *shoppingcart.AddLineItem) (*empty.Empty, error) { if li.GetQuantity() <= 0 { return nil, fmt.Errorf("cannot add negative quantity of to item %s", li.GetProductId()) } @@ -133,7 +127,7 @@ func (sc *ShoppingCart) AddItem(c context.Context, li *shoppingcart.AddLineItem) } // RemoveItem implements the RemoveItem command handling of the shopping cart service. -func (sc *ShoppingCart) RemoveItem(c context.Context, li *shoppingcart.RemoveLineItem) (*empty.Empty, error) { +func (sc *ShoppingCart) RemoveItem(_ context.Context, li *shoppingcart.RemoveLineItem) (*empty.Empty, error) { if item, _ := sc.find(li.GetProductId()); item == nil { return nil, fmt.Errorf("cannot remove item %s because it is not in the cart", li.GetProductId()) } @@ -142,7 +136,7 @@ func (sc *ShoppingCart) RemoveItem(c context.Context, li *shoppingcart.RemoveLin } // GetCart implements the GetCart command handling of the shopping cart service. -func (sc *ShoppingCart) GetCart(c context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) { +func (sc *ShoppingCart) GetCart(_ context.Context, _ *shoppingcart.GetShoppingCart) (*shoppingcart.Cart, error) { cart := &shoppingcart.Cart{} for _, item := range sc.cart { cart.Items = append(cart.Items, &shoppingcart.LineItem{ @@ -154,6 +148,22 @@ func (sc *ShoppingCart) GetCart(c context.Context, _ *shoppingcart.GetShoppingCa return cart, nil } +func (sc *ShoppingCart) HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error) { + switch cmd := command.(type) { + case *shoppingcart.GetShoppingCart: + reply, err := sc.GetCart(ctx, cmd) + return true, reply, err + case *shoppingcart.RemoveLineItem: + reply, err := sc.RemoveItem(ctx, cmd) + return true, reply, err + case *shoppingcart.AddLineItem: + reply, err := sc.AddItem(ctx, cmd) + return true, reply, err + default: + return false, reply, err + } +} + func (sc *ShoppingCart) Snapshot() (snapshot interface{}, err error) { return domain.Cart{ Items: append(make([]*domain.LineItem, len(sc.cart)), sc.cart...),