diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index 7396f33aa..0aa124b00 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -7,14 +7,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/centrifugal/centrifugo/v5/internal/configtypes" "strconv" "strings" "sync/atomic" "testing" "time" - "github.com/centrifugal/centrifuge" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" + "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" @@ -41,19 +41,6 @@ func (m *MockDispatcher) Broadcast(ctx context.Context, req *apiproto.BroadcastR return m.onBroadcast(ctx, req) } -// MockLogger implements the Logger interface for testing. -type MockLogger struct { - // Add necessary fields to simulate behavior or record calls -} - -func (m *MockLogger) LogEnabled(_ centrifuge.LogLevel) bool { - return true // or false based on your test needs -} - -func (m *MockLogger) Log(_ centrifuge.LogEntry) { - // Implement mock logic, e.g., storing log entries for assertions -} - func produceTestMessage(topic string, message []byte, headers []kgo.RecordHeader) error { // Create a new client client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL)) @@ -167,7 +154,7 @@ func TestKafkaConsumer_GreenScenario(t *testing.T) { eventReceived := make(chan struct{}) consumerClosed := make(chan struct{}) - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{ + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { require.Equal(t, testMethod, method) require.Equal(t, testPayload, data) @@ -218,7 +205,7 @@ func TestKafkaConsumer_SeveralConsumers(t *testing.T) { consumerClosed := make(chan struct{}) for i := 0; i < 3; i++ { - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{ + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { require.Equal(t, testMethod, method) require.Equal(t, testPayload, data) @@ -284,7 +271,7 @@ func TestKafkaConsumer_RetryAfterDispatchError(t *testing.T) { return nil }, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) go func() { @@ -354,7 +341,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) { return nil }, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) go func() { @@ -427,7 +414,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T return nil }, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) go func() { @@ -507,7 +494,7 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) { return nil }, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) consumer.testOnlyConfig = testConfig @@ -591,7 +578,7 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) { ConsumerGroup: uuid.New().String(), PartitionBufferSize: tc.partitionBuffer, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) var records []*kgo.Record @@ -690,7 +677,7 @@ func TestKafkaConsumer_TestPauseAfterResumeRace(t *testing.T) { return nil }, } - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) require.NoError(t, err) consumer.testOnlyConfig = testOnlyConfig{ @@ -770,7 +757,7 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { eventReceived := make(chan struct{}) consumerClosed := make(chan struct{}) - consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{ + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{ onBroadcast: func(ctx context.Context, req *apiproto.BroadcastRequest) error { require.Equal(t, testChannels, req.Channels) require.Equal(t, apiproto.Raw(testPayload), req.Data) diff --git a/internal/consuming/postgresql_test.go b/internal/consuming/postgresql_test.go index 57bd1ea90..efab0c8e5 100644 --- a/internal/consuming/postgresql_test.go +++ b/internal/consuming/postgresql_test.go @@ -134,7 +134,7 @@ func TestPostgresConsumer_GreenScenario(t *testing.T) { PartitionPollInterval: configtypes.Duration(300 * time.Millisecond), PartitionNotificationChannel: testNotificationChannel, } - consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{ + consumer, err := NewPostgresConsumer("test", &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { require.Equal(t, testMethod, method) require.Equal(t, testPayload, data) @@ -191,7 +191,7 @@ func TestPostgresConsumer_SeveralConsumers(t *testing.T) { numConsumers := 10 for i := 0; i < numConsumers; i++ { - consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{ + consumer, err := NewPostgresConsumer("test", &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { require.Equal(t, testMethod, method) require.Equal(t, testPayload, data) @@ -249,7 +249,7 @@ func TestPostgresConsumer_NotificationTrigger(t *testing.T) { numEvents := 0 - consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{ + consumer, err := NewPostgresConsumer("test", &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { require.Equal(t, testMethod, method) require.Equal(t, testPayload, data) @@ -317,7 +317,7 @@ func TestPostgresConsumer_DifferentPartitions(t *testing.T) { var dispatchMu sync.Mutex - consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{ + consumer, err := NewPostgresConsumer("test", &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { dispatchMu.Lock() defer dispatchMu.Unlock()