From 55eccb0a6c7f8963ea98b3cce1d397a1fb5b0e74 Mon Sep 17 00:00:00 2001 From: Maxim Vladimirsky Date: Tue, 3 May 2016 12:36:13 -0700 Subject: [PATCH 1/2] Add incomplete response test for offset manager --- consumer/offsetmgr/offsetmgr.go | 2 +- consumer/offsetmgr/offsetmgr_test.go | 95 ++++++++++++++++++++++------ 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/consumer/offsetmgr/offsetmgr.go b/consumer/offsetmgr/offsetmgr.go index 03408e2f..a517ef90 100644 --- a/consumer/offsetmgr/offsetmgr.go +++ b/consumer/offsetmgr/offsetmgr.go @@ -326,7 +326,7 @@ func (om *offsetManager) run() { return } case <-commitTicker.C: - isRequestTimeout := time.Now().UTC().Sub(lastSubmitTime) > (om.f.cfg.Consumer.OffsetsCommitInterval << 2) + isRequestTimeout := time.Now().UTC().Sub(lastSubmitTime) > (om.f.cfg.Consumer.OffsetsCommitInterval << 1) if isRequestTimeout && !isSameDecoratedOffset(lastSubmitRequest, lastCommittedOffset) { triggerOrScheduleReassign(ErrRequestTimeout, "offset commit failed") } diff --git a/consumer/offsetmgr/offsetmgr_test.go b/consumer/offsetmgr/offsetmgr_test.go index 4281aecd..7f756da0 100644 --- a/consumer/offsetmgr/offsetmgr_test.go +++ b/consumer/offsetmgr/offsetmgr_test.go @@ -54,17 +54,16 @@ func (s *OffsetMgrSuite) TestInitialOffset(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() // When om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8) c.Assert(err, IsNil) + defer om.Stop() // Then - fo := <-om.InitialOffset() - c.Assert(fo, DeepEquals, DecoratedOffset{2000, "bar"}) - - om.Stop() - f.Stop() + initialOffset := <-om.InitialOffset() + c.Assert(initialOffset, DeepEquals, DecoratedOffset{2000, "bar"}) } // A partition offset manager can be closed even while it keeps trying to @@ -87,17 +86,16 @@ func (s *OffsetMgrSuite) TestInitialNoCoordinator(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() // When om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8) c.Assert(err, IsNil) + defer om.Stop() // Then oce := <-om.Errors() c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 8, ErrNoCoordinator}) - - om.Stop() - f.Stop() } // A partition offset manager can be closed even while it keeps trying to @@ -122,17 +120,16 @@ func (s *OffsetMgrSuite) TestInitialFetchError(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() // When om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) + defer om.Stop() // Then oce := <-om.Errors() c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 7, sarama.ErrNotLeaderForPartition}) - - om.Stop() - f.Stop() } // If offset commit fails then the corresponding error is sent down to the @@ -161,6 +158,7 @@ func (s *OffsetMgrSuite) TestCommitError(c *C) { c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) @@ -183,7 +181,65 @@ func (s *OffsetMgrSuite) TestCommitError(c *C) { wg.Wait() committedOffset := lastCommittedOffset(broker1, "g1", "t1", 7) c.Assert(committedOffset, DeepEquals, DecoratedOffset{1000, "foo"}) - f.Stop() +} + +// If offset a response received from Kafka for an offset commit request does +// not contain information for a submitted offset, then offset manager keeps, +// retrying until it succeeds. +func (s *OffsetMgrSuite) TestCommitIncompleteResponse(c *C) { + // Given + broker1 := sarama.NewMockBroker(c, 101) + defer broker1.Close() + + offsetCommitResponse := sarama.OffsetCommitResponse{ + Errors: map[string]map[int32]sarama.KError{"t1": {2: sarama.ErrNoError}}} + + broker1.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(c). + SetBroker(broker1.Addr(), broker1.BrokerID()), + "ConsumerMetadataRequest": sarama.NewMockConsumerMetadataResponse(c). + SetCoordinator("g1", broker1), + "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(c). + SetOffset("g1", "t1", 1, 1000, "foo", sarama.ErrNoError). + SetOffset("g1", "t1", 2, 2000, "bar", sarama.ErrNoError), + "OffsetCommitRequest": sarama.NewMockWrapper(&offsetCommitResponse), + }) + + cfg := testhelpers.NewTestConfig("c1") + cfg.Consumer.BackOffTimeout = 1000 * time.Millisecond + cfg.Consumer.OffsetsCommitInterval = 50 * time.Millisecond + cfg.Consumer.ReturnErrors = true + client, err := sarama.NewClient([]string{broker1.Addr()}, nil) + c.Assert(err, IsNil) + + f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() + om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 1), "g1", "t1", 1) + c.Assert(err, IsNil) + om2, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 2), "g1", "t1", 2) + c.Assert(err, IsNil) + + // When + om1.SubmitOffset(1001, "foo1") + om2.SubmitOffset(2001, "bar2") + var wg sync.WaitGroup + actor.Spawn(actor.RootID.NewChild("stopper"), &wg, om1.Stop) + actor.Spawn(actor.RootID.NewChild("stopper"), &wg, om2.Stop) + + // Then + oce := <-om1.Errors() + c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 1, sarama.ErrIncompleteResponse}) + c.Assert(<-om2.CommittedOffsets(), Equals, DecoratedOffset{2001, "bar2"}) + + broker1.SetHandlerByMap(map[string]sarama.MockResponse{ + "ConsumerMetadataRequest": sarama.NewMockConsumerMetadataResponse(c). + SetCoordinator("g1", broker1), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(c). + SetError("g1", "t1", 1, sarama.ErrNoError), + }) + + wg.Wait() + c.Assert(<-om1.CommittedOffsets(), Equals, DecoratedOffset{1001, "foo1"}) } // It is guaranteed that a partition offset manager commits all pending offsets @@ -207,6 +263,7 @@ func (s *OffsetMgrSuite) TestCommitBeforeClose(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() c.Assert(err, IsNil) om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) @@ -273,7 +330,6 @@ func (s *OffsetMgrSuite) TestCommitBeforeClose(c *C) { committedOffset := lastCommittedOffset(broker1, "g1", "t1", 7) c.Assert(committedOffset, DeepEquals, DecoratedOffset{1001, "foo"}) - f.Stop() } // Different consumer groups can keep different offsets for the same @@ -302,6 +358,7 @@ func (s *OffsetMgrSuite) TestCommitDifferentGroups(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) om2, err := f.SpawnOffsetManager(s.ns.NewChild("g2", "t1", 7), "g2", "t1", 7) @@ -322,7 +379,6 @@ func (s *OffsetMgrSuite) TestCommitDifferentGroups(c *C) { c.Assert(committedOffset1, DeepEquals, DecoratedOffset{1017, "foo3"}) committedOffset2 := lastCommittedOffset(broker1, "g2", "t1", 7) c.Assert(committedOffset2, DeepEquals, DecoratedOffset{2019, "bar3"}) - f.Stop() } func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) { @@ -351,6 +407,7 @@ func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) om2, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8) @@ -389,7 +446,6 @@ func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) { c.Assert(committedOffset2, DeepEquals, DecoratedOffset{2001, "bar2"}) committedOffset3 := lastCommittedOffset(broker1, "g2", "t1", 7) c.Assert(committedOffset3, DeepEquals, DecoratedOffset{3001, "bar3"}) - f.Stop() } func (s *OffsetMgrSuite) TestCommittedChannel(c *C) { @@ -413,6 +469,7 @@ func (s *OffsetMgrSuite) TestCommittedChannel(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) @@ -430,7 +487,6 @@ func (s *OffsetMgrSuite) TestCommittedChannel(c *C) { committedOffsets = append(committedOffsets, committedOffset) } c.Assert(committedOffsets, DeepEquals, []DecoratedOffset{{1005, "bar5"}}) - f.Stop() } // Test for issue https://github.com/mailgun/kafka-pixy/issues/29. The problem @@ -460,6 +516,7 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) @@ -496,6 +553,7 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) { // It will be assigned the broken connection to broker2. om, err = f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7) c.Assert(err, IsNil) + defer om.Stop() log.Infof(" THEN") // Then: the new partition offset manager re-establishes connection with @@ -507,9 +565,6 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) { case <-time.After(200 * time.Millisecond): } c.Assert(do.Offset, Equals, int64(1000), Commentf("Failed to retrieve initial offset: %s", oce.Err)) - - om.Stop() - f.Stop() } // Test for issue https://github.com/mailgun/kafka-pixy/issues/62. The problem @@ -537,6 +592,7 @@ func (s *OffsetMgrSuite) TestBugOffsetDroppedOnStop(c *C) { client, err := sarama.NewClient([]string{broker1.Addr()}, nil) c.Assert(err, IsNil) f := SpawnFactory(s.ns.NewChild(), cfg, client) + defer f.Stop() om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 1), "g1", "t1", 1) c.Assert(err, IsNil) time.Sleep(100 * time.Millisecond) @@ -561,7 +617,6 @@ func (s *OffsetMgrSuite) TestBugOffsetDroppedOnStop(c *C) { committedOffsets = append(committedOffsets, committedOffset) } c.Assert(committedOffsets, DeepEquals, []DecoratedOffset{{1001, "bar1"}, {1002, "bar2"}}) - f.Stop() } // lastCommittedOffset traverses the mock broker history backwards searching From bd8d100e4ef11a4eef35c62d1a779b33415faa7b Mon Sep 17 00:00:00 2001 From: Maxim Vladimirsky Date: Tue, 3 May 2016 13:22:03 -0700 Subject: [PATCH 2/2] Prep for v0.11.0 --- CHANGELOG.md | 8 +++++++- README.md | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60eb3a90..1f964be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,13 @@ # Changelog -#### Version 0.11.0 (TBD) +#### Version 0.11.0 (20016-05-03) +Major overhaul and refactoring of the implementation to make it easier to +understand how the internal components interact with each other. It is an +important step before implementation of explicit acknowledgements can be +started. + +During refactoring the following bugs were detected and fixed: * [#56](https://github.com/mailgun/kafka-pixy/issues/56) Invalid stored offset makes consumer panic. * [#59](https://github.com/mailgun/kafka-pixy/issues/59) Messages are skipped by consumer during rebalancing. * [#62](https://github.com/mailgun/kafka-pixy/issues/62) Messages consumed twice during rebalancing. diff --git a/README.md b/README.md index 6e367595..f30869e1 100644 --- a/README.md +++ b/README.md @@ -262,13 +262,13 @@ pretty much the same on Mac. ### Step 1. Download ``` -curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.10.1/kafka-pixy-v0.10.1-linux-amd64.tar.gz | tar xz +curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.11.0/kafka-pixy-v0.11.0-linux-amd64.tar.gz | tar xz ``` ### Step 2. Start ``` -cd kafka-pixy-v0.10.1-linux-amd64 +cd kafka-pixy-v0.11.0-linux-amd64 ./kafka-pixy --kafkaPeers ":9092,...,:9092" --zookeeperPeers ":2181,...,:2181" ```