Skip to content
This repository has been archived by the owner on Mar 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #5 from marcellanz/master
Browse files Browse the repository at this point in the history
Go support for CloudState User Functions
  • Loading branch information
viktorklang authored Oct 22, 2019
2 parents 081e913 + accb0f3 commit b89ecf5
Show file tree
Hide file tree
Showing 42 changed files with 6,834 additions and 2 deletions.
13 changes: 13 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
language: go
os:
- linux
- osx
- windows
go:
- 1.13.x
before_install:
- go get -t -v ./...
script:
- go test -v -race -coverprofile=coverage.txt -covermode=atomic -bench=. ./...
after_success:
- if [ "$TRAVIS_OS_NAME" = "linux" ]; then bash <(curl -s https://codecov.io/bash); fi
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
# go-support
GoLang support for [CloudState](https://github.com/cloudstateio/cloudstate)
# CloudState stateful service support in Go
[![Build Status](https://travis-ci.com/marcellanz/go-support.svg?branch=feature%2Fgo-support)](https://travis-ci.com/marcellanz/go-support)
[![codecov](https://codecov.io/gh/marcellanz/go-support/branch/master/graph/badge.svg)](https://codecov.io/gh/marcellanz/go-support)
[![GoDoc](https://godoc.org/github.com/marcellanz/go-support?status.svg)](https://godoc.org/github.com/marcellanz/go-support)

This package provides support for writing [CloudState](https://github.com/cloudstateio/cloudstate) stateful functions in Go.

For more information see https://cloudstate.io.
23 changes: 23 additions & 0 deletions build/TCK.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM golang:1.13.1-alpine3.10

RUN apk --no-cache add git

WORKDIR /go/src/app
COPY . .

# -race and therefore CGO needs gcc, we don't want it to have in our build
RUN CGO_ENABLED=0 go build -v -o tck_shoppingcart ./tck/cmd/tck_shoppingcart
RUN go install -v ./...

# multistage – copy over the binary
FROM alpine:latest
RUN apk --no-cache add ca-certificates

WORKDIR /root/
COPY --from=0 /go/bin/tck_shoppingcart .

EXPOSE 8080
ENV HOST 0.0.0.0
ENV PORT 8080

CMD ["./tck_shoppingcart"]
8 changes: 8 additions & 0 deletions build/build-and-publish-docker-image-tck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

set -o nounset
set -o errexit
set -o pipefail

docker build -t gcr.io/mrcllnz/cloudstate-go-tck:latest -f ./build/TCK.Dockerfile .
docker push gcr.io/mrcllnz/cloudstate-go-tck:latest
15 changes: 15 additions & 0 deletions build/compile-pb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

set -o nounset
set -o errexit
set -o pipefail

# CloudState protocol
protoc --go_out=plugins=grpc,paths=source_relative:. --proto_path=protobuf/frontend/ protobuf/frontend/cloudstate/entity_key.proto
protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/entity.proto
protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/event_sourced.proto
protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ protobuf/protocol/cloudstate/function.proto

# TCK shopping cart sample
protoc --go_out=plugins=grpc:. --proto_path=protobuf/protocol/ --proto_path=protobuf/frontend/ --proto_path=protobuf/proxy/ --proto_path=protobuf/example/ protobuf/example/shoppingcart/shoppingcart.proto
protoc --go_out=plugins=grpc,paths=source_relative:tck/shoppingcart/persistence --proto_path=protobuf/protocol/ --proto_path=protobuf/frontend/ --proto_path=protobuf/proxy/ --proto_path=protobuf/example/shoppingcart/persistence/ protobuf/example/shoppingcart/persistence/domain.proto
32 changes: 32 additions & 0 deletions build/fetch-cloudstate-pb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash

set -o nounset
set -o errexit
set -o pipefail

function fetch() {
local path=$1
local tag=$2
mkdir -p "protobuf/$(dirname $path)"
curl -o "protobuf/${path}" "https://raw.githubusercontent.com/cloudstateio/cloudstate/${tag}/protocols/${path}"
}

tag=$1

# CloudState protocol
fetch "protocol/cloudstate/entity.proto" "${tag}"
fetch "protocol/cloudstate/event_sourced.proto" "${tag}"
fetch "protocol/cloudstate/function.proto" "${tag}"
fetch "protocol/cloudstate/crdt.proto" "${tag}"

# TCK shopping cart example
fetch "example/shoppingcart/shoppingcart.proto" "${tag}"
fetch "example/shoppingcart/persistence/domain.proto" "${tag}"

# CloudState frontend
fetch "frontend/cloudstate/entity_key.proto" "${tag}"

# dependencies
fetch "proxy/grpc/reflection/v1alpha/reflection.proto" "${tag}"
fetch "frontend/google/api/annotations.proto" "${tag}"
fetch "frontend/google/api/http.proto" "${tag}"
245 changes: 245 additions & 0 deletions cloudstate/cloudstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
//
// Copyright 2019 Lightbend Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudstate

import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"runtime"

"github.com/cloudstateio/go-support/cloudstate/protocol"
"github.com/golang/protobuf/descriptor"
"github.com/golang/protobuf/proto"
filedescr "github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)

const (
SupportLibraryVersion = "0.1.0"
SupportLibraryName = "cloudstate-go-support"
)

// CloudState is an instance of a CloudState User Function
type CloudState struct {
server *grpc.Server
entityDiscoveryServer *EntityDiscoveryServer
eventSourcedServer *EventSourcedServer
}

// New returns a new CloudState instance.
func New(options Options) (*CloudState, error) {
cs := &CloudState{
server: grpc.NewServer(),
entityDiscoveryServer: newEntityDiscoveryResponder(options),
eventSourcedServer: newEventSourcedServer(),
}
protocol.RegisterEntityDiscoveryServer(cs.server, cs.entityDiscoveryServer)
protocol.RegisterEventSourcedServer(cs.server, cs.eventSourcedServer)
return cs, nil
}

// Options go get a CloudState instance configured.
type Options struct {
ServiceName string
ServiceVersion string
}

// DescriptorConfig configures service and dependent descriptors.
type DescriptorConfig struct {
Service string
ServiceMsg descriptor.Message
Domain []string
DomainMessages []descriptor.Message
}

func (dc DescriptorConfig) AddDomainMessage(m descriptor.Message) DescriptorConfig {
dc.DomainMessages = append(dc.DomainMessages, m)
return dc
}

func (dc DescriptorConfig) AddDomainDescriptor(filename string) DescriptorConfig {
dc.Domain = append(dc.Domain, filename)
return dc
}

// RegisterEventSourcedEntity registers an event sourced entity for CloudState.
func (cs *CloudState) RegisterEventSourcedEntity(ese *EventSourcedEntity, config DescriptorConfig) (err error) {
ese.registerOnce.Do(func() {
if err = ese.initZeroValue(); err != nil {
return
}
if err = cs.eventSourcedServer.registerEntity(ese); err != nil {
return
}
if err = cs.entityDiscoveryServer.registerEntity(ese, config); err != nil {
return
}
})
return
}

// Run runs the CloudState instance.
func (cs *CloudState) Run() error {
host, ok := os.LookupEnv("HOST")
if !ok {
return fmt.Errorf("unable to get environment variable \"HOST\"")
}
port, ok := os.LookupEnv("PORT")
if !ok {
return fmt.Errorf("unable to get environment variable \"PORT\"")
}
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
if e := cs.server.Serve(lis); e != nil {
return fmt.Errorf("failed to grpcServer.Serve for: %v", lis)
}
return nil
}

// EntityDiscoveryServer implements the CloudState discovery protocol.
type EntityDiscoveryServer struct {
fileDescriptorSet *filedescr.FileDescriptorSet
entitySpec *protocol.EntitySpec
message *descriptor.Message
}

// newEntityDiscoveryResponder returns a new and initialized EntityDiscoveryServer.
func newEntityDiscoveryResponder(options Options) *EntityDiscoveryServer {
responder := &EntityDiscoveryServer{}
responder.entitySpec = &protocol.EntitySpec{
Entities: make([]*protocol.Entity, 0),
ServiceInfo: &protocol.ServiceInfo{
ServiceName: options.ServiceName,
ServiceVersion: options.ServiceVersion,
ServiceRuntime: fmt.Sprintf("%s %s/%s", runtime.Version(), runtime.GOOS, runtime.GOARCH),
SupportLibraryName: SupportLibraryName,
SupportLibraryVersion: SupportLibraryVersion,
},
}
responder.fileDescriptorSet = &filedescr.FileDescriptorSet{
File: make([]*filedescr.FileDescriptorProto, 0),
}
return responder
}

// Discover returns an entity spec for
func (r *EntityDiscoveryServer) Discover(c context.Context, pi *protocol.ProxyInfo) (*protocol.EntitySpec, error) {
log.Printf("Received discovery call from sidecar [%s w%s] supporting CloudState %v.%v\n",
pi.ProxyName,
pi.ProxyVersion,
pi.ProtocolMajorVersion,
pi.ProtocolMinorVersion,
)
for _, filename := range []string{
"google/protobuf/empty.proto",
"google/protobuf/any.proto",
"google/protobuf/descriptor.proto",
"google/api/annotations.proto",
"google/api/http.proto",
"cloudstate/event_sourced.proto",
"cloudstate/entity.proto",
"cloudstate/entity_key.proto",
} {
if err := r.registerFileDescriptorProto(filename); err != nil {
return nil, err
}
}
log.Printf("Responding with: %v\n", r.entitySpec.GetServiceInfo())
return r.entitySpec, nil
}

// ReportError logs any user function error reported by the CloudState proxy.
func (r *EntityDiscoveryServer) ReportError(c context.Context, fe *protocol.UserFunctionError) (*empty.Empty, error) {
log.Printf("ReportError: %v\n", fe)
return &empty.Empty{}, nil
}

func (r *EntityDiscoveryServer) updateSpec() (err error) {
protoBytes, err := proto.Marshal(r.fileDescriptorSet)
if err != nil {
return errors.New("unable to Marshal FileDescriptorSet")
}
r.entitySpec.Proto = protoBytes
return nil
}

func (r *EntityDiscoveryServer) resolveFileDescriptors(dc DescriptorConfig) error {
// service
if dc.Service != "" {
if err := r.registerFileDescriptorProto(dc.Service); err != nil {
return err
}
} else {
if dc.ServiceMsg != nil {
if err := r.registerFileDescriptor(dc.ServiceMsg); err != nil {
return err
}
}
}
// and dependent domain descriptors
for _, dp := range dc.Domain {
if err := r.registerFileDescriptorProto(dp); err != nil {
return err
}
}
for _, dm := range dc.DomainMessages {
if err := r.registerFileDescriptor(dm); err != nil {
return err
}
}
return nil
}

func (r *EntityDiscoveryServer) registerEntity(e *EventSourcedEntity, config DescriptorConfig) error {
if err := r.resolveFileDescriptors(config); err != nil {
return fmt.Errorf("failed to resolveFileDescriptor for DescriptorConfig: %+v: %w", config, err)
}
persistenceID := e.entityName
if e.PersistenceID != "" {
persistenceID = e.PersistenceID
}
r.entitySpec.Entities = append(r.entitySpec.Entities, &protocol.Entity{
EntityType: EventSourced,
ServiceName: e.ServiceName,
PersistenceId: persistenceID,
})
return r.updateSpec()
}

func (r *EntityDiscoveryServer) registerFileDescriptorProto(filename string) error {
descriptorProto, err := unpackFile(proto.FileDescriptor(filename))
if err != nil {
return fmt.Errorf("failed to registerFileDescriptorProto for filename: %s: %w", filename, err)
}
r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, descriptorProto)
return r.updateSpec()
}

func (r *EntityDiscoveryServer) registerFileDescriptor(msg descriptor.Message) error {
fd, _ := descriptor.ForMessage(msg) // this can panic
if r := recover(); r != nil {
return fmt.Errorf("descriptor.ForMessage panicked (%v) for: %+v", r, msg)
}
r.fileDescriptorSet.File = append(r.fileDescriptorSet.File, fd)
return nil
}
Loading

0 comments on commit b89ecf5

Please sign in to comment.