Skip to content
This repository has been archived by the owner on Mar 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10 from marcellanz/feature/issues6_7_8
Browse files Browse the repository at this point in the history
feature/issues[6,7,8]
  • Loading branch information
marcellanz authored Nov 5, 2019
2 parents b89ecf5 + f8b3059 commit faaff6d
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 327 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# CloudState stateful service support in Go
[![Build Status](https://travis-ci.com/marcellanz/go-support.svg?branch=feature%2Fgo-support)](https://travis-ci.com/marcellanz/go-support)
[![codecov](https://codecov.io/gh/marcellanz/go-support/branch/master/graph/badge.svg)](https://codecov.io/gh/marcellanz/go-support)
[![GoDoc](https://godoc.org/github.com/marcellanz/go-support?status.svg)](https://godoc.org/github.com/marcellanz/go-support)
# Cloudstate stateful service support in Go
[![Build Status](https://travis-ci.com/cloudstateio/go-support.svg)](https://travis-ci.com/cloudstateio/go-support)
[![codecov](https://codecov.io/gh/cloudstateio/go-support/branch/master/graph/badge.svg)](https://codecov.io/gh/cloudstateio/go-support)
[![GoDoc](https://godoc.org/github.com/cloudstateio/go-support/cloudstate?status.svg)](https://godoc.org/github.com/cloudstateio/go-support/cloudstate)

This package provides support for writing [CloudState](https://github.com/cloudstateio/cloudstate) stateful functions in Go.
This package provides support for writing [Cloudstate](https://github.com/cloudstateio/cloudstate) stateful functions in Go.

For more information see https://cloudstate.io.
90 changes: 45 additions & 45 deletions cloudstate/cloudstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,30 @@ type CloudState struct {
}

// New returns a new CloudState instance.
func New(options Options) (*CloudState, error) {
func New(config Config) (*CloudState, error) {
eds, err := newEntityDiscoveryServer(config)
if err != nil {
return nil, err
}
cs := &CloudState{
server: grpc.NewServer(),
entityDiscoveryServer: newEntityDiscoveryResponder(options),
entityDiscoveryServer: eds,
eventSourcedServer: newEventSourcedServer(),
}
protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryServer)
protocol.RegisterEventSourcedServer(cs.server, cs.eventSourcedServer)
return cs, nil
}

// Options go get a CloudState instance configured.
type Options struct {
// Config go get a CloudState instance configured.
type Config struct {
ServiceName string
ServiceVersion string
}

// DescriptorConfig configures service and dependent descriptors.
type DescriptorConfig struct {
Service string
ServiceMsg descriptor.Message
Domain []string
DomainMessages []descriptor.Message
}
Expand All @@ -83,7 +86,7 @@ func (dc DescriptorConfig) AddDomainDescriptor(filename string) DescriptorConfig
// RegisterEventSourcedEntity registers an event sourced entity for CloudState.
func (cs *CloudState) RegisterEventSourcedEntity(ese *EventSourcedEntity, config DescriptorConfig) (err error) {
ese.registerOnce.Do(func() {
if err = ese.initZeroValue(); err != nil {
if err = ese.init(); err != nil {
return
}
if err = cs.eventSourcedServer.registerEntity(ese); err != nil {
Expand Down Expand Up @@ -116,60 +119,46 @@ func (cs *CloudState) Run() error {
return nil
}

// EntityDiscoveryServer implements the CloudState discovery protocol.
// EntityDiscoveryServer implements the Cloudstate discovery protocol.
type EntityDiscoveryServer struct {
fileDescriptorSet *filedescr.FileDescriptorSet
entitySpec *protocol.EntitySpec
message *descriptor.Message
}

// newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryServer.
func newEntityDiscoveryResponder(options Options) *EntityDiscoveryServer {
responder := &EntityDiscoveryServer{}
responder.entitySpec = &protocol.EntitySpec{
// newEntityDiscoveryServer returns a new and initialized EntityDiscoveryServer.
func newEntityDiscoveryServer(config Config) (*EntityDiscoveryServer, error) {
svr := &EntityDiscoveryServer{}
svr.entitySpec = &protocol.EntitySpec{
Entities: make([]*protocol.Entity, 0),
ServiceInfo: &protocol.ServiceInfo{
ServiceName: options.ServiceName,
ServiceVersion: options.ServiceVersion,
ServiceName: config.ServiceName,
ServiceVersion: config.ServiceVersion,
ServiceRuntime: fmt.Sprintf("%s %s/%s", runtime.Version(), runtime.GOOS, runtime.GOARCH),
SupportLibraryName: SupportLibraryName,
SupportLibraryVersion: SupportLibraryVersion,
},
}
responder.fileDescriptorSet = &filedescr.FileDescriptorSet{
svr.fileDescriptorSet = &filedescr.FileDescriptorSet{
File: make([]*filedescr.FileDescriptorProto, 0),
}
return responder
return svr, nil
}

// Discover returns an entity spec for
func (r *EntityDiscoveryServer) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) {
log.Printf("Received discovery call from sidecar [%s w%s] supporting CloudState %v.%v\n",
// Discover returns an entity spec for registered entities.
func (r *EntityDiscoveryServer) Discover(_ context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) {
log.Printf("Received discovery call from sidecar [%s w%s] supporting Cloudstate %v.%v\n",
pi.ProxyName,
pi.ProxyVersion,
pi.ProtocolMajorVersion,
pi.ProtocolMinorVersion,
)
for _, filename := range []string{
"google/protobuf/empty.proto",
"google/protobuf/any.proto",
"google/protobuf/descriptor.proto",
"google/api/annotations.proto",
"google/api/http.proto",
"cloudstate/event_sourced.proto",
"cloudstate/entity.proto",
"cloudstate/entity_key.proto",
} {
if err := r.registerFileDescriptorProto(filename); err != nil {
return nil, err
}
}
log.Printf("Responding with: %v\n", r.entitySpec.GetServiceInfo())
return r.entitySpec, nil
}

// ReportError logs any user function error reported by the CloudState proxy.
func (r *EntityDiscoveryServer) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) {
// ReportError logs any user function error reported by the Cloudstate proxy.
func (r *EntityDiscoveryServer) ReportError(_ context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) {
log.Printf("ReportError: %v\n", fe)
return &empty.Empty{}, nil
}
Expand All @@ -189,12 +178,6 @@ func (r *EntityDiscoveryServer) resolveFileDescriptors(dc DescriptorConfig) erro
if err := r.registerFileDescriptorProto(dc.Service); err != nil {
return err
}
} else {
if dc.ServiceMsg != nil {
if err := r.registerFileDescriptor(dc.ServiceMsg); err != nil {
return err
}
}
}
// and dependent domain descriptors
for _, dp := range dc.Domain {
Expand All @@ -214,24 +197,38 @@ func (r *EntityDiscoveryServer) registerEntity(e *EventSourcedEntity, config Des
if err := r.resolveFileDescriptors(config); err != nil {
return fmt.Errorf("failed to resolveFileDescriptor for DescriptorConfig: %+v: %w", config, err)
}
persistenceID := e.entityName
if e.PersistenceID != "" {
persistenceID = e.PersistenceID
}
r.entitySpec.Entities = append(r.entitySpec.Entities, &protocol.Entity{
EntityType: EventSourced,
ServiceName: e.ServiceName,
PersistenceId: persistenceID,
PersistenceId: e.PersistenceID,
})
return r.updateSpec()
}

func (r *EntityDiscoveryServer) hasRegistered(filename string) bool {
for _, f := range r.fileDescriptorSet.File {
if f.GetName() == filename {
return true
}
}
return false
}

func (r *EntityDiscoveryServer) registerFileDescriptorProto(filename string) error {
if r.hasRegistered(filename) {
return nil
}
descriptorProto, err := unpackFile(proto.FileDescriptor(filename))
if err != nil {
return fmt.Errorf("failed to registerFileDescriptorProto for filename: %s: %w", filename, err)
}
r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, descriptorProto)
for _, fn := range descriptorProto.Dependency {
err := r.registerFileDescriptorProto(fn)
if err != nil {
return err
}
}
return r.updateSpec()
}

Expand All @@ -240,6 +237,9 @@ func (r *EntityDiscoveryServer) registerFileDescriptor(msg descriptor.Message) e
if r := recover(); r != nil {
return fmt.Errorf("descriptor.ForMessage panicked (%v) for: %+v", r, msg)
}
if r.hasRegistered(fd.GetName()) {
return nil
}
r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, fd)
return nil
}
12 changes: 6 additions & 6 deletions cloudstate/cloudstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cloudstate implements the CloudState event sourced and entity discovery protocol.
// Package cloudstate implements the Cloudstate event sourced and entity discovery protocol.
package cloudstate

import (
Expand All @@ -29,15 +29,15 @@ import (
)

func TestNewCloudState(t *testing.T) {
cloudState, _ := New(Options{})
cloudState, _ := New(Config{})
si := cloudState.server.GetServiceInfo()
if si == nil {
t.Fail()
}
}

func TestEntityDiscoveryResponderDiscover(t *testing.T) {
responder := newEntityDiscoveryResponder(Options{
server, _ := newEntityDiscoveryServer(Config{
ServiceName: "service.one",
ServiceVersion: "0.0.1",
})
Expand All @@ -48,7 +48,7 @@ func TestEntityDiscoveryResponderDiscover(t *testing.T) {
ProxyVersion: "9.8.7",
SupportedEntityTypes: nil,
}
spec, err := responder.Discover(context.Background(), info)
spec, err := server.Discover(context.Background(), info)
if err != nil {
t.Errorf("responder.Discover returned err: %v", err)
}
Expand Down Expand Up @@ -79,12 +79,12 @@ func captureOutput(f func()) string {
}

func TestEntityDiscoveryResponderReportError(t *testing.T) {
responder := newEntityDiscoveryResponder(Options{
server, _ := newEntityDiscoveryServer(Config{
ServiceName: "service.one",
ServiceVersion: "0.0.1",
})
output := captureOutput(func() {
empty, err := responder.ReportError(context.Background(), &protocol.UserFunctionError{
empty, err := server.ReportError(context.Background(), &protocol.UserFunctionError{
Message: "unable to do XYZ",
})
if err != nil || empty == nil {
Expand Down
2 changes: 1 addition & 1 deletion cloudstate/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cloudstate implements the CloudState event sourced and entity discovery protocol.
// Package cloudstate implements the Cloudstate event sourced and entity discovery protocol.
package cloudstate
11 changes: 9 additions & 2 deletions cloudstate/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package cloudstate

import "fmt"
import (
"context"
"fmt"
)

type OnNext func(event interface{}) error
type OnErr func(err error)
Expand Down Expand Up @@ -84,7 +87,11 @@ func (e *eventEmitter) Clear() {
}

type EventHandler interface {
HandleEvent(event interface{}) (handled bool, err error)
HandleEvent(ctx context.Context, event interface{}) (handled bool, err error)
}

type CommandHandler interface {
HandleCommand(ctx context.Context, command interface{}) (handled bool, reply interface{}, err error)
}

type Snapshotter interface {
Expand Down
Loading

0 comments on commit faaff6d

Please sign in to comment.