Skip to content

Commit

Permalink
Add DD Tracing to gRPC Go Feature Server (#116)
Browse files Browse the repository at this point in the history
* fix: Bytewax engine create configmap from object (feast-dev#3821)

* Add tracing to Go gRPC server

---------

Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
Co-authored-by: Harry <quanghai.ng1512@gmail.com>
Co-authored-by: jacevedo34 <jacevedo34@gatech.edu>
Co-authored-by: Jose Acevedo <v-josacevedo@expediagroup.com>
  • Loading branch information
4 people authored Jul 10, 2024
1 parent b1dfe49 commit 850e97d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 24 deletions.
19 changes: 14 additions & 5 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package embedded
import (
"context"
"fmt"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"log"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand All @@ -29,6 +31,7 @@ import (
"github.com/feast-dev/feast/go/types"
jsonlog "github.com/rs/zerolog/log"
"google.golang.org/grpc/health/grpc_health_v1"
grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc"
)

type OnlineFeatureService struct {
Expand Down Expand Up @@ -259,7 +262,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(

// StartGprcServer starts gRPC server with disabled feature logging and blocks the thread
func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{})
return s.StartGrpcServerWithLogging(host, port, nil, LoggingOptions{})
}

// StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging
Expand All @@ -271,7 +274,7 @@ func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string
WriteInterval: logging.DefaultOptions.WriteInterval,
FlushInterval: logging.DefaultOptions.FlushInterval,
}
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
return s.StartGrpcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
}

func (s *OnlineFeatureService) constructLoggingService(writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) (*logging.LoggingService, error) {
Expand All @@ -295,9 +298,14 @@ func (s *OnlineFeatureService) constructLoggingService(writeLoggedFeaturesCallba
return loggingService, nil
}

// StartGprcServerWithLogging starts gRPC server with enabled feature logging
// StartGrpcServerWithLogging starts gRPC server with enabled feature logging
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
tracer.Start(tracer.WithRuntimeMetrics())
defer tracer.Stop()
}

loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts)
if err != nil {
return err
Expand All @@ -309,7 +317,8 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
return err
}

grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor()))

serving.RegisterServingServiceServer(grpcServer, ser)
healthService := healthcheck.NewHealthChecker()
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
Expand Down
23 changes: 18 additions & 5 deletions go/internal/feast/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package server
import (
"context"
"fmt"

"github.com/google/uuid"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/google/uuid"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const feastServerVersion = "0.0.1"
Expand All @@ -31,23 +30,34 @@ func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, requ
}, nil
}

// Returns an object containing the response to GetOnlineFeatures.
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
// GetOnlineFeatures Returns an object containing the response to GetOnlineFeatures.
// Metadata contains feature names that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {

span, ctx := tracer.StartSpanFromContext(ctx, "getOnlineFeatures", tracer.ResourceName("ServingService/GetOnlineFeatures"))
defer span.Finish()

logSpanContext := LogWithSpanContext(span)

requestId := GenerateRequestId()
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())

if err != nil {
logSpanContext.Error().Err(err).Msg("Error parsing feature service or feature list from request")
return nil, err
}

featureVectors, err := s.fs.GetOnlineFeatures(
ctx,
featuresOrService.FeaturesRefs,
featuresOrService.FeatureService,
request.GetEntities(),
request.GetRequestContext(),
request.GetFullFeatureNames())

if err != nil {
logSpanContext.Error().Err(err).Msg("Error getting online features")
return nil, err
}

Expand All @@ -66,6 +76,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
featureNames[idx] = vector.Name
values, err := types.ArrowValuesToProtoValues(vector.Values)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error converting Arrow values to proto values")
return nil, err
}
if _, ok := request.Entities[vector.Name]; ok {
Expand All @@ -83,11 +94,13 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
logger, err := s.loggingService.GetOrCreateLogger(featureService)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error to instantiating logger for feature service: " + featuresOrService.FeatureService.Name)
fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err)
}

err = logger.Log(request.Entities, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error to logging to feature service: " + featuresOrService.FeatureService.Name)
fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err)
}
}
Expand Down
15 changes: 1 addition & 14 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -146,25 +145,14 @@ type getOnlineFeaturesRequest struct {
func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *httpServer {
return &httpServer{fs: fs, loggingService: loggingService}
}
func logWithSpanContext(span tracer.Span) zerolog.Logger {

spanContext := span.Context()

var logger = zerolog.New(os.Stderr).With().
Int64("trace_id", int64(spanContext.TraceID())).
Int64("span_id", int64(spanContext.SpanID())).
Timestamp().
Logger()

return logger
}
func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
var err error

span, ctx := tracer.StartSpanFromContext(r.Context(), "getOnlineFeatures", tracer.ResourceName("/get-online-features"))
defer span.Finish(tracer.WithError(err))

logSpanContext := logWithSpanContext(span)
logSpanContext := LogWithSpanContext(span)

if r.Method != "POST" {
http.NotFound(w, r)
Expand All @@ -175,7 +163,6 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {

status := false
if statusQuery != "" {
var err error
status, err = strconv.ParseBool(statusQuery)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error parsing status query parameter")
Expand Down
19 changes: 19 additions & 0 deletions go/internal/feast/server/server_commons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package server

import (
"github.com/rs/zerolog"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"os"
)

func LogWithSpanContext(span tracer.Span) zerolog.Logger {
spanContext := span.Context()

var logger = zerolog.New(os.Stderr).With().
Int64("trace_id", int64(spanContext.TraceID())).
Int64("span_id", int64(spanContext.SpanID())).
Timestamp().
Logger()

return logger
}

0 comments on commit 850e97d

Please sign in to comment.