diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 9c5e1a9f63..d2127c4d4f 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/apache/arrow/go/v8/arrow/memory" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" @@ -314,6 +315,10 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p requestedFeatureViewNames []string, requestedFeatureNames []string, ) ([][]onlinestore.FeatureData, error) { + // Create a Datadog span from context + span, _ := tracer.StartSpanFromContext(ctx, "fs.readFromOnlineStore") + defer span.Finish() + numRows := len(entityRows) entityRowsValue := make([]*prototypes.EntityKey, numRows) for index, entityKey := range entityRows { diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index e9f5ed0fd5..b640e96d4f 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -18,6 +18,7 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -145,39 +146,58 @@ type getOnlineFeaturesRequest struct { func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *httpServer { return &httpServer{fs: fs, loggingService: loggingService} } +func logWithSpanContext(span tracer.Span) zerolog.Logger { + spanContext := span.Context() + + var logger = zerolog.New(os.Stderr).With(). + Int64("trace_id", int64(spanContext.TraceID())). + Int64("span_id", int64(spanContext.SpanID())). + Timestamp(). + Logger() + + return logger +} func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { + var err error + + span, ctx := tracer.StartSpanFromContext(r.Context(), "getOnlineFeatures", tracer.ResourceName("/get-online-features")) + defer span.Finish(tracer.WithError(err)) + + logSpanContext := logWithSpanContext(span) + if r.Method != "POST" { http.NotFound(w, r) return } statusQuery := r.URL.Query().Get("status") + status := false if statusQuery != "" { var err error status, err = strconv.ParseBool(statusQuery) if err != nil { - log.Error().Err(err).Msg("Error parsing status query parameter") - http.Error(w, fmt.Sprintf("Error parsing status query parameter: %+v", err), http.StatusBadRequest) + logSpanContext.Error().Err(err).Msg("Error parsing status query parameter") + writeJSONError(w, fmt.Errorf("Error parsing status query parameter: %+v", err), http.StatusBadRequest) return } } decoder := json.NewDecoder(r.Body) var request getOnlineFeaturesRequest - err := decoder.Decode(&request) + err = decoder.Decode(&request) if err != nil { - log.Error().Err(err).Msg("Error decoding JSON request data") - http.Error(w, fmt.Sprintf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msg("Error decoding JSON request data") + writeJSONError(w, fmt.Errorf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError) return } var featureService *model.FeatureService if request.FeatureService != nil { featureService, err = s.fs.GetFeatureService(*request.FeatureService) if err != nil { - log.Error().Err(err).Msg("Error getting feature service from registry") - http.Error(w, fmt.Sprintf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msg("Error getting feature service from registry") + writeJSONError(w, fmt.Errorf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError) return } } @@ -191,7 +211,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { } featureVectors, err := s.fs.GetOnlineFeatures( - r.Context(), + ctx, request.Features, featureService, entitiesProto, @@ -199,8 +219,8 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { request.FullFeatureNames) if err != nil { - log.Error().Err(err).Msg("Error getting feature vector") - http.Error(w, fmt.Sprintf("Error getting feature vector: %+v", err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msg("Error getting feature vector") + writeJSONError(w, fmt.Errorf("Error getting feature vector: %+v", err), http.StatusInternalServerError) return } @@ -241,16 +261,16 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { err = json.NewEncoder(w).Encode(response) if err != nil { - log.Error().Err(err).Msg("Error encoding response") - http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msg("Error encoding response") + writeJSONError(w, fmt.Errorf("Error encoding response: %+v", err), http.StatusInternalServerError) return } if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil { logger, err := s.loggingService.GetOrCreateLogger(featureService) if err != nil { - log.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name) - http.Error(w, fmt.Sprintf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name) + writeJSONError(w, fmt.Errorf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError) return } @@ -262,8 +282,8 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { for _, vector := range featureVectors[len(request.Entities):] { values, err := types.ArrowValuesToProtoValues(vector.Values) if err != nil { - log.Error().Err(err).Msg("Couldn't convert arrow values into protobuf") - http.Error(w, fmt.Sprintf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError) + logSpanContext.Error().Err(err).Msg("Couldn't convert arrow values into protobuf") + writeJSONError(w, fmt.Errorf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError) return } featureVectorProtos = append(featureVectorProtos, &serving.GetOnlineFeaturesResponse_FeatureVector{ @@ -275,7 +295,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { err = logger.Log(entitiesProto, featureVectorProtos, featureNames[len(request.Entities):], requestContextProto, requestId) if err != nil { - http.Error(w, fmt.Sprintf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError) + writeJSONError(w, fmt.Errorf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError) return } } @@ -304,15 +324,27 @@ func logStackTrace() { } } +func writeJSONError(w http.ResponseWriter, err error, statusCode int) { + errMap := map[string]interface{}{ + "error": fmt.Sprintf("%+v", err), + "status_code": statusCode, + } + errJSON, _ := json.Marshal(errMap) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + w.Write(errJSON) +} + func recoverMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() { if r := recover(); r != nil { - log.Printf("Panic recovered: %v", r) + log.Error().Err(fmt.Errorf("Panic recovered: %v", r)).Msg("A panic occurred in the server") // Log the stack trace logStackTrace() - http.Error(w, "Internal Server Error", http.StatusInternalServerError) + writeJSONError(w, fmt.Errorf("Internal Server Error: %v", r), http.StatusInternalServerError) } }() next.ServeHTTP(w, r) @@ -321,8 +353,7 @@ func recoverMiddleware(next http.Handler) http.Handler { func (s *httpServer) Serve(host string, port int) error { if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" { - - tracer.Start(tracer.WithRuntimeMetrics()) + tracer.Start() defer tracer.Stop() } mux := httptrace.NewServeMux() @@ -334,7 +365,7 @@ func (s *httpServer) Serve(host string, port int) error { if err == http.ErrServerClosed { return nil } - log.Error().Stack().Err(err).Msg("Startup failed") + log.Fatal().Stack().Err(err).Msg("Failed to start HTTP server") return err } diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 9d986abb89..65a19fb856 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -14,6 +14,7 @@ import ( "github.com/apache/arrow/go/v8/arrow/memory" "github.com/rs/zerolog/log" "google.golang.org/protobuf/types/known/timestamppb" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" @@ -45,6 +46,9 @@ func AugmentResponseWithOnDemandTransforms( fullFeatureNames bool, ) ([]*onlineserving.FeatureVector, error) { + span, _ := tracer.StartSpanFromContext(ctx, "transformation.AugmentResponseWithOnDemandTransforms") + defer span.Finish() + result := make([]*onlineserving.FeatureVector, 0) var err error