diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 05360c1db..30787ee2c 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -63,6 +63,7 @@ import ( "github.com/conduitio/conduit/pkg/web/ui" apiv1 "github.com/conduitio/conduit/proto/api/v1" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/jpillora/backoff" "github.com/piotrkowalczuk/promgrpc/v4" promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -201,7 +202,15 @@ func createServices(r *Runtime) error { tokenService, ) - plService := pipeline.NewService(r.logger, r.DB) + errRecovery := r.Config.Pipelines.ErrorRecovery + backoffCfg := &backoff.Backoff{ + Min: errRecovery.MinDelay, + Max: errRecovery.MaxDelay, + Factor: float64(errRecovery.BackoffFactor), + Jitter: true, + } + + plService := pipeline.NewService(r.logger, r.DB, backoffCfg) connService := connector.NewService(r.logger, r.DB, r.connectorPersister) procService := processor.NewService(r.logger, r.DB, procPluginService) diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index e75deb2c9..c2d336514 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -37,6 +37,7 @@ import ( proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" "github.com/conduitio/conduit/pkg/processor" "github.com/google/go-cmp/cmp" + "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -90,10 +91,12 @@ func TestPipelineSimple(t *testing.T) { nil, ) + b := &backoff.Backoff{} + orc := NewOrchestrator( db, logger, - pipeline.NewService(logger, db), + pipeline.NewService(logger, db, b), connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)), processor.NewService(logger, db, procPluginService), connPluginService, diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index a9c35d98d..db2246198 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -629,9 +629,14 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { pl.SetStatus(StatusUserStopped) } default: - pl.SetStatus(StatusDegraded) - // we use %+v to get the stack trace too - pl.Error = fmt.Sprintf("%+v", err) + if cerrors.IsFatalError(err) { + pl.SetStatus(StatusDegraded) + // we use %+v to get the stack trace too + pl.Error = fmt.Sprintf("%+v", err) + } else { + pl.SetStatus(StatusRecovering) + // TODO: Implement backoff strategy + } } s.logger. diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index a71ae6bf3..124da0e00 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -35,6 +35,7 @@ import ( pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock" "github.com/conduitio/conduit/pkg/processor" "github.com/google/uuid" + "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -50,8 +51,9 @@ func TestServiceLifecycle_buildNodes(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) source := dummySource(persister) destination := dummyDestination(persister) @@ -133,8 +135,9 @@ func TestService_buildNodes_NoSourceNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) wantErr := "can't build pipeline without any source connectors" @@ -180,8 +183,9 @@ func TestService_buildNodes_NoDestinationNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) wantErr := "can't build pipeline without any destination connectors" @@ -228,8 +232,9 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) defer persister.Wait() + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) // create a host pipeline pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI) @@ -287,8 +292,9 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) // create a host pipeline pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI) @@ -371,8 +377,9 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) // create a host pipeline pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI) @@ -472,8 +479,9 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) // create a host pipeline pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI) @@ -533,8 +541,9 @@ func TestService_Run_Rerun(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + b := &backoff.Backoff{} - ps := NewService(logger, db) + ps := NewService(logger, db, b) // create a host pipeline pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI) @@ -571,7 +580,7 @@ func TestService_Run_Rerun(t *testing.T) { is.NoErr(err) // create a new pipeline service and initialize it - ps = NewService(logger, db) + ps = NewService(logger, db, b) err = ps.Init(ctx) is.NoErr(err) err = ps.Run( diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index ca8373335..f3ca7c30e 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -24,6 +24,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/foundation/metrics/measure" + "github.com/jpillora/backoff" ) var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:.]*$`) @@ -51,15 +52,17 @@ type Service struct { instances map[string]*Instance instanceNames map[string]bool handlers []FailureHandler + backoffCfg *backoff.Backoff } // NewService initializes and returns a pipeline Service. -func NewService(logger log.CtxLogger, db database.DB) *Service { +func NewService(logger log.CtxLogger, db database.DB, backoffCfg *backoff.Backoff) *Service { return &Service{ logger: logger.WithComponent("pipeline.Service"), store: NewStore(db), instances: make(map[string]*Instance), instanceNames: make(map[string]bool), + backoffCfg: backoffCfg, } } diff --git a/pkg/pipeline/service_test.go b/pkg/pipeline/service_test.go index a5a80502a..48533df6e 100644 --- a/pkg/pipeline/service_test.go +++ b/pkg/pipeline/service_test.go @@ -25,6 +25,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/google/uuid" + "github.com/jpillora/backoff" "github.com/matryer/is" "go.uber.org/mock/gomock" ) @@ -34,15 +35,16 @@ func TestService_Init_Simple(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) _, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) want := service.List(ctx) // create a new pipeline service and initialize it - service = NewService(logger, db) + service = NewService(logger, db, b) err = service.Init(ctx) is.NoErr(err) @@ -61,6 +63,7 @@ func TestService_Check(t *testing.T) { ctx := context.Background() logger := log.Nop() db := mock.NewDB(gomock.NewController(t)) + b := &backoff.Backoff{} testCases := []struct { name string @@ -80,7 +83,7 @@ func TestService_Check(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr) - service := NewService(logger, db) + service := NewService(logger, db, b) gotErr := service.Check(ctx) is.Equal(tc.wantErr, gotErr) @@ -92,8 +95,9 @@ func TestService_CreateSuccess(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) testCases := []struct { id string @@ -151,8 +155,9 @@ func TestService_Create_ValidateSuccess(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) testCases := []struct { name string @@ -193,8 +198,9 @@ func TestService_Create_ValidateError(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) testCases := []struct { name string @@ -262,8 +268,9 @@ func TestService_Create_PipelineNameExists(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) conf := Config{Name: "test-pipeline"} got, err := service.Create(ctx, uuid.NewString(), conf, ProvisionTypeAPI) @@ -279,8 +286,9 @@ func TestService_CreateEmptyName(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) got, err := service.Create(ctx, uuid.NewString(), Config{Name: ""}, ProvisionTypeAPI) is.True(err != nil) is.Equal(got, nil) @@ -291,8 +299,9 @@ func TestService_GetSuccess(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) want, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) @@ -306,8 +315,9 @@ func TestService_GetInstanceNotFound(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) // get pipeline instance that does not exist got, err := service.Get(ctx, uuid.NewString()) @@ -321,8 +331,9 @@ func TestService_DeleteSuccess(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) @@ -339,8 +350,9 @@ func TestService_List(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) want := make(map[string]*Instance) for i := 0; i < 10; i++ { @@ -358,8 +370,9 @@ func TestService_UpdateSuccess(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) @@ -378,8 +391,9 @@ func TestService_Update_PipelineNameExists(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) _, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) instance2, err2 := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline2"}, ProvisionTypeAPI) @@ -400,8 +414,9 @@ func TestService_UpdateInvalidConfig(t *testing.T) { ctx := context.Background() logger := log.Nop() db := &inmemory.DB{} + b := &backoff.Backoff{} - service := NewService(logger, db) + service := NewService(logger, db, b) instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI) is.NoErr(err) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 2491cd27a..e1308df99 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -40,6 +40,7 @@ import ( p4 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines4-integration-test" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -513,8 +514,13 @@ func TestService_IntegrationTestServices(t *testing.T) { proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry), nil, ) + b := &backoff.Backoff{ + Factor: 2, + Min: time.Millisecond * 100, + Max: time.Second, // 8 tries + } - plService := pipeline.NewService(logger, db) + plService := pipeline.NewService(logger, db, b) connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) procService := processor.NewService(logger, db, procPluginService)