diff --git a/CHANGELOG.md b/CHANGELOG.md index f656e8e2..b4325795 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ #### Version 0.14.0 (TBD) Implemented: +* Added HTTP API endpoints to list topics optionally with partitions and + configuration. * Ack timeout can now be greater then subscription timeout. So in absence of incoming consume requests for a topic, rebalancing may be delayed until ack timeout expires for all offered messages. If a new request comes while diff --git a/README.md b/README.md index 8c9ec565..b2e08da1 100644 --- a/README.md +++ b/README.md @@ -293,7 +293,7 @@ group inactivity on all Kafka-Pixy working with the Kafka cluster. ``` GET /topics//consumers -GET /clusters//topics//consumers +GET /clusters//topics//consumers ``` Returns a list of consumers that are subscribed to a topic. @@ -344,6 +344,35 @@ yields: } ``` +### List Topics + +``` +GET /topics +GET /clusters//topics +``` + +Returns a list of topics optionally with detailed configuration and partitions. + + Parameter | Opt | Description +----------------|-----|------------------------------------------------ + cluster | yes | The name of a cluster to operate on. By default the cluster mentioned first in the `proxies` section of the config file is used. + withPartitions | yes | Whether a list of partitions should be returned for every topic. + withConfig | yes | Whether configuration should be returned for every topic. + +### Get Topic Config + +``` +GET /topics/ +GET /clusters//topics/ +``` + +Returns topic configuration optionally with a list of partitions. + + Parameter | Opt | Description +----------------|-----|------------------------------------------------ + cluster | yes | The name of a cluster to operate on. By default the cluster mentioned first in the `proxies` section of the config file is used. + withPartitions | yes | Whether a list of partitions should be returned. + ## Configuration Kafa-Pixy is designed to be very simple to run. It consists of a single diff --git a/admin/admin.go b/admin/admin.go index f01c10c0..226373ee 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -1,6 +1,7 @@ package admin import ( + "encoding/json" "fmt" "sort" "strconv" @@ -65,15 +66,20 @@ type PartitionMetadata struct { ID int32 Leader int32 Replicas []int32 - Isr []int32 + ISR []int32 } type TopicMetadata struct { Topic string - Config []byte + Config *TopicConfig Partitions []PartitionMetadata } +type TopicConfig struct { + Version int32 `json:"version"` + Config map[string]string `json:"config"` +} + type indexedPartition struct { index int partition int32 @@ -173,7 +179,7 @@ func (a *T) getGroupOffsets(group, topic string) ([]PartitionOffset, error) { // Fetch the last committed offsets for all partitions of the group/topic. coordinator, err := kafkaClt.Coordinator(group) if err != nil { - return nil, errors.Wrapf(err, "failed to get coordinator") + return nil, errors.Wrap(err, "failed to get coordinator") } req := sarama.OffsetFetchRequest{ConsumerGroup: group, Version: ProtocolVer1} for _, p := range partitions { @@ -181,7 +187,7 @@ func (a *T) getGroupOffsets(group, topic string) ([]PartitionOffset, error) { } res, err := coordinator.FetchOffset(&req) if err != nil { - return nil, errors.Wrapf(err, "failed to fetch offsets") + return nil, errors.Wrap(err, "failed to fetch offsets") } for i, p := range partitions { block := res.GetBlock(topic, p) @@ -212,7 +218,7 @@ func (a *T) setGroupOffsets(group, topic string, offsets []PartitionOffset) erro } coordinator, err := kafkaClt.Coordinator(group) if err != nil { - return errors.Wrapf(err, "failed to get coordinator") + return errors.Wrap(err, "failed to get coordinator") } req := sarama.OffsetCommitRequest{ @@ -225,7 +231,7 @@ func (a *T) setGroupOffsets(group, topic string, offsets []PartitionOffset) erro } res, err := coordinator.CommitOffset(&req) if err != nil { - return errors.Wrapf(err, "failed to commit offsets") + return errors.Wrap(err, "failed to commit offsets") } for p, err := range res.Errors[topic] { if err != sarama.ErrNoError { @@ -249,7 +255,7 @@ func (a *T) GetTopicConsumers(group, topic string) (map[string][]int32, error) { if err == zk.ErrNoNode { return nil, ErrInvalidParam(errors.New("either group or topic is incorrect")) } - return nil, errors.Wrapf(err, "failed to fetch partition owners data") + return nil, errors.Wrap(err, "failed to fetch partition owners data") } consumers := make(map[string][]int32) @@ -261,7 +267,7 @@ func (a *T) GetTopicConsumers(group, topic string) (map[string][]int32, error) { partitionPath := fmt.Sprintf("%s/%s", consumedPartitionsPath, partitionNode) partitionNodeData, _, err := zkConn.Get(partitionPath) if err != nil { - return nil, errors.Wrapf(err, "failed to fetch partition owner") + return nil, errors.Wrap(err, "failed to fetch partition owner") } clientID := string(partitionNodeData) consumers[clientID] = append(consumers[clientID], int32(partition)) @@ -285,7 +291,7 @@ func (a *T) GetAllTopicConsumers(topic string) (map[string]map[string][]int32, e groupsPath := fmt.Sprintf("%s/consumers", a.cfg.ZooKeeper.Chroot) groups, _, err := kzConn.Children(groupsPath) if err != nil { - return nil, errors.Wrapf(err, "failed to fetch consumer groups") + return nil, errors.Wrap(err, "failed to fetch consumer groups") } consumers := make(map[string]map[string][]int32) @@ -342,7 +348,8 @@ func getOffsetResult(res *sarama.OffsetResponse, topic string, partition int32) return block.Offsets[0], nil } -func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata, error) { +// ListTopics returns a list of all topics existing in the Kafka cluster. +func (a *T) ListTopics(withPartitions, withConfig bool) ([]TopicMetadata, error) { kafkaClt, err := a.lazyKafkaClt() if err != nil { return nil, err @@ -355,15 +362,28 @@ func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata, topicsMetadata := make([]TopicMetadata, len(topics)) for i, topic := range topics { - tm := &topicsMetadata[i] - tm.Topic = topic - if !withPartitions { - continue + tm, err := a.GetTopicMetadata(topic, withPartitions, withConfig) + if err != nil { + return nil, errors.Wrapf(err, "failed to get %s topic metadata", topic) } + topicsMetadata[i] = tm + } + return topicsMetadata, nil +} + +// GetTopicMetadata returns a topic metadata. An optional partition metadata +// can be requested and/or detailed topic configuration can be requested. +func (a *T) GetTopicMetadata(topic string, withPartitions, withConfig bool) (TopicMetadata, error) { + kafkaClt, err := a.lazyKafkaClt() + if err != nil { + return TopicMetadata{}, errors.Wrap(err, "failed to connect to Kafka") + } + tm := TopicMetadata{Topic: topic} + if withPartitions { partitions, err := kafkaClt.Partitions(topic) if err != nil { - return nil, errors.Wrap(err, "failed to get partitions") + return TopicMetadata{}, errors.Wrap(err, "failed to get partitions") } tm.Partitions = make([]PartitionMetadata, len(partitions)) @@ -375,44 +395,38 @@ func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata, if err == sarama.ErrLeaderNotAvailable { pm.Leader = -1 } else if err != nil { - return nil, errors.Wrap(err, "failed to get leader") + return TopicMetadata{}, errors.Wrap(err, "failed to get leader") } pm.Leader = leader.ID() isr, err := kafkaClt.InSyncReplicas(topic, partition) if err != nil { - return nil, errors.Wrap(err, "failed to get ISR") + return TopicMetadata{}, errors.Wrap(err, "failed to get ISR") } - pm.Isr = isr + pm.ISR = isr replicas, err := kafkaClt.Replicas(topic, partition) if err != nil { - return nil, errors.Wrap(err, "failed to get replicas") + return TopicMetadata{}, errors.Wrap(err, "failed to get replicas") } pm.Replicas = replicas } } - if withConfig { kzConn, err := a.lazyZKConn() if err != nil { - return nil, errors.Wrap(err, "failed to connect to zookeeper") + return TopicMetadata{}, errors.Wrap(err, "failed to connect to zookeeper") } - for i := range topicsMetadata { - t := &topicsMetadata[i] - cfgPath := fmt.Sprintf("%s/config/topics/%s", a.cfg.ZooKeeper.Chroot, t.Topic) - cfg, _, err := kzConn.Get(cfgPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to fetch topic configuration") - } - t.Config = cfg + cfgPath := fmt.Sprintf("%s/config/topics/%s", a.cfg.ZooKeeper.Chroot, topic) + cfg, _, err := kzConn.Get(cfgPath) + if err != nil { + return TopicMetadata{}, errors.Wrap(err, "failed to fetch topic configuration") } + topicConfig := TopicConfig{} + if err = json.Unmarshal(cfg, &topicConfig); err != nil { + return TopicMetadata{}, errors.Wrap(err, "bad config") + } + tm.Config = &topicConfig } - return topicsMetadata, nil + return tm, nil } - -type int32Slice []int32 - -func (p int32Slice) Len() int { return len(p) } -func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/config/config.go b/config/config.go index 4e0dcb47..99bc2f38 100644 --- a/config/config.go +++ b/config/config.go @@ -126,10 +126,13 @@ type Proxy struct { // errors, until some of the pending messages are acknowledged. MaxPendingMessages int `yaml:"max_pending_messages"` - // The maximum number of times a message can be offered to a consumer. - // If a message was offered that many times and no acknowledgment has - // been received, then it is considered to be acknowledged and will - // never be offered again. + // The maximum number of retries Kafka-Pixy will make to offer an + // unack message. Messages that exceeded the number of retries are + // discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries + // means that messages will be offered just once. + // + // If you want Kafka-Pixy to retry indefinitely, then set this + // parameter to -1. MaxRetries int `yaml:"max_retries"` // How frequently to commit offsets to Kafka. @@ -367,8 +370,8 @@ func (p *Proxy) validate() error { return errors.New("consumer.long_polling_timeout must be > 0") case p.Consumer.MaxPendingMessages <= 0: return errors.New("consumer.max_pending_messages must be > 0") - case p.Consumer.MaxRetries <= 0: - return errors.New("consumer.max_retries must be > 0") + case p.Consumer.MaxRetries < -1: + return errors.New("consumer.max_retries must be >= -1") case p.Consumer.OffsetsCommitInterval <= 0: return errors.New("consumer.offsets_commit_interval must be > 0") case p.Consumer.OffsetsCommitTimeout <= 0: @@ -422,7 +425,7 @@ func defaultProxyWithClientID(clientID string) *Proxy { c.Consumer.FetchMaxWait = 250 * time.Millisecond c.Consumer.LongPollingTimeout = 3 * time.Second c.Consumer.MaxPendingMessages = 300 - c.Consumer.MaxRetries = 3 + c.Consumer.MaxRetries = -1 c.Consumer.OffsetsCommitInterval = 500 * time.Millisecond c.Consumer.OffsetsCommitTimeout = 1500 * time.Millisecond c.Consumer.RebalanceDelay = 250 * time.Millisecond diff --git a/consumer/offsettrk/offsettrk.go b/consumer/offsettrk/offsettrk.go index 4b72e74e..f57805ee 100644 --- a/consumer/offsettrk/offsettrk.go +++ b/consumer/offsettrk/offsettrk.go @@ -127,8 +127,8 @@ func (ot *T) OnAcked(offset int64) (offsetmgr.Offset, int) { offerRemoved := ot.removeOffer(offset) ackedRangesUpdated := ot.updateAckedRanges(offset) if !offerRemoved || !ackedRangesUpdated { - ot.actDesc.Log().Errorf("Bad ack: offerMissing=%t, duplicateAck=%t", - !offerRemoved, !ackedRangesUpdated) + ot.actDesc.Log().Errorf("Bad ack: offset=%d, offerMissing=%t, duplicateAck=%t", + offset, !offerRemoved, !ackedRangesUpdated) } if ackedRangesUpdated { ot.offset.Meta = encodeAckedRanges(ot.offset.Val, ot.ackedRanges) diff --git a/consumer/partitioncsm/partitioncsm.go b/consumer/partitioncsm/partitioncsm.go index 5885df17..ffd75d4b 100644 --- a/consumer/partitioncsm/partitioncsm.go +++ b/consumer/partitioncsm/partitioncsm.go @@ -247,7 +247,7 @@ func (pc *T) runFetchLoop() bool { // returned or there are no more messages to be retried. func (pc *T) nextRetry() (consumer.Message, bool) { msg, retryNo, ok := pc.offsetTrk.NextRetry() - for ok && retryNo > pc.cfg.Consumer.MaxRetries { + for ok && pc.cfg.Consumer.MaxRetries >= 0 && retryNo > pc.cfg.Consumer.MaxRetries { pc.actDesc.Log().Errorf("Too many retries: retryNo=%d, offset=%d, key=%s, msg=%s", retryNo, msg.Offset, string(msg.Key), base64.StdEncoding.EncodeToString(msg.Value)) pc.submittedOffset, _ = pc.offsetTrk.OnAcked(msg.Offset) diff --git a/consumer/partitioncsm/partitioncsm_test.go b/consumer/partitioncsm/partitioncsm_test.go index 05bc90a5..ee8c16dc 100644 --- a/consumer/partitioncsm/partitioncsm_test.go +++ b/consumer/partitioncsm/partitioncsm_test.go @@ -325,6 +325,72 @@ func (s *PartitionCsmSuite) TestSparseAckedAfterStop(c *C) { c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "1-3,4-7") } +// If the max retries limit set to 0, then messages are offered only once. +func (s *PartitionCsmSuite) TestZeroRetries(c *C) { + offsetsBefore := s.kh.GetOldestOffsets(topic) + s.cfg.Consumer.AckTimeout = 100 * time.Millisecond + s.cfg.Consumer.MaxRetries = 0 + s.kh.SetOffsets(group, topic, []offsetmgr.Offset{{Val: sarama.OffsetOldest}}) + + pc := Spawn(s.ns, group, topic, partition, s.cfg, s.groupMember, s.msgFetcherF, s.offsetMgrF) + + msg0 := <-pc.Messages() + log.Infof("*** First: offset=%v", msg0.Offset) + sendEvOffered(msg0) + + // Wait for the retry timeout to expire. + time.Sleep(200 * time.Millisecond) + + // When/Then: maxRetries has been reached, only new messages are returned. + for i := 0; i < 5; i++ { + msg := <-pc.Messages() + log.Infof("*** Next: offset=%v", msg.Offset) + c.Assert(msg.Offset, Equals, msg0.Offset+int64(1+i), Commentf("i=%d", i)) + sendEvOffered(msg) + sendEvAcked(msg) + } + + pc.Stop() + offsetsAfter := s.kh.GetCommittedOffsets(group, topic) + c.Assert(offsetsAfter[partition].Val, Equals, offsetsBefore[partition]+6) + c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "") +} + +// If the max retries limit is set to -1 then retries never stop. +func (s *PartitionCsmSuite) TestIndefiniteRetries(c *C) { + offsetsBefore := s.kh.GetOldestOffsets(topic) + s.cfg.Consumer.AckTimeout = 100 * time.Millisecond + s.cfg.Consumer.MaxRetries = -1 + s.kh.SetOffsets(group, topic, []offsetmgr.Offset{{Val: sarama.OffsetOldest}}) + + pc := Spawn(s.ns, group, topic, partition, s.cfg, s.groupMember, s.msgFetcherF, s.offsetMgrF) + + msg0 := <-pc.Messages() + sendEvOffered(msg0) + + // The logic is so that the even when an offer is expired, at first a + // freshly fetched message is offered, and then retries follow. + base := offsetsBefore[partition] + int64(1) + for i := 0; i < 5; i++ { + time.Sleep(100 * time.Millisecond) + // Newly fetched message is acknowledged... + msgI := <-pc.Messages() + c.Assert(msgI.Offset, Equals, base+int64(i)) + sendEvOffered(msgI) + sendEvAcked(msgI) + // ...but retried messages are not. + msg0_i := <-pc.Messages() + c.Assert(msg0_i, DeepEquals, msg0, Commentf( + "got: %d, want: %d", msg0_i.Offset, msg0.Offset)) + sendEvOffered(msg0) + } + + pc.Stop() + offsetsAfter := s.kh.GetCommittedOffsets(group, topic) + c.Assert(offsetsAfter[partition].Val, Equals, offsetsBefore[partition]) + c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "1-6") +} + // If the max retries limit is reached for a message that results in // termination of the partition consumer. Note that offset is properly // committed to reflect sparsely acknowledged regions. diff --git a/default.yaml b/default.yaml index dbb8e428..8e8edb30 100644 --- a/default.yaml +++ b/default.yaml @@ -114,11 +114,14 @@ proxies: # the pending messages are acknowledged. max_pending_messages: 300 - # The maximum number of times a message can be offered to a consumer. - # If a message had been offered that many times and no acknowledgment has - # been received, then it is forcefully acknowledged and will never be - # offered again. Such messages are lost from the Kafka-Pixy point of view. - max_retries: 3 + # The maximum number of retries Kafka-Pixy will make to offer an + # unack message. Messages that exceeded the number of retries are + # discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries + # means that messages will be offered just once. + # + # If you want Kafka-Pixy to retry indefinitely, then set this + # parameter to -1. + max_retries: -1 # How frequently to commit offsets to Kafka. offsets_commit_interval: 500ms diff --git a/proxy/proxy.go b/proxy/proxy.go index ab237ef8..daf5d7e4 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -310,6 +310,23 @@ func (p *T) GetAllTopicConsumers(topic string) (map[string]map[string][]int32, e return p.admin.GetAllTopicConsumers(topic) } -func (p *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]admin.TopicMetadata, error) { - return p.admin.GetTopicsMetadata(withPartitions, withConfig) +// ListTopics returns a list of all topics existing in the Kafka cluster. +func (p *T) ListTopics(withPartitions, withConfig bool) ([]admin.TopicMetadata, error) { + p.adminMu.RLock() + defer p.adminMu.RUnlock() + if p.admin == nil { + return nil, ErrUnavailable + } + return p.admin.ListTopics(withPartitions, withConfig) +} + +// GetTopicMetadata returns a topic metadata. An optional partition metadata +// can be requested and/or detailed topic configuration can be requested. +func (p *T) GetTopicMetadata(topic string, withPartitions, withConfig bool) (admin.TopicMetadata, error) { + p.adminMu.RLock() + defer p.adminMu.RUnlock() + if p.admin == nil { + return admin.TopicMetadata{}, ErrUnavailable + } + return p.admin.GetTopicMetadata(topic, withPartitions, withConfig) } diff --git a/server/httpsrv/httpsrv.go b/server/httpsrv/httpsrv.go index 334e51c4..83e60b7c 100644 --- a/server/httpsrv/httpsrv.go +++ b/server/httpsrv/httpsrv.go @@ -110,8 +110,11 @@ func New(addr string, proxySet *proxy.Set) (*T, error) { router.HandleFunc(fmt.Sprintf("/clusters/{%s}/topics/{%s}/consumers", prmCluster, prmTopic), hs.handleGetTopicConsumers).Methods("GET") router.HandleFunc(fmt.Sprintf("/topics/{%s}/consumers", prmTopic), hs.handleGetTopicConsumers).Methods("GET") - router.HandleFunc(fmt.Sprintf("/clusters/{%s}/topics", prmCluster), hs.handleGetTopicsMetadata).Methods("GET") - router.HandleFunc("/topics", hs.handleGetTopicsMetadata).Methods("GET") + router.HandleFunc(fmt.Sprintf("/clusters/{%s}/topics", prmCluster), hs.handleListTopics).Methods("GET") + router.HandleFunc("/topics", hs.handleListTopics).Methods("GET") + + router.HandleFunc(fmt.Sprintf("/clusters/{%s}/topics/{%s}", prmCluster, prmTopic), hs.handleGetTopicMetadata).Methods("GET") + router.HandleFunc(fmt.Sprintf("/topics/{%s}", prmTopic), hs.handleGetTopicMetadata).Methods("GET") router.HandleFunc("/_ping", hs.handlePing).Methods("GET") return hs, nil @@ -458,8 +461,8 @@ func (s *T) handleGetTopicConsumers(w http.ResponseWriter, r *http.Request) { } } -// handleGetTopics is an HTTP request handler for `GET /topics` -func (s *T) handleGetTopicsMetadata(w http.ResponseWriter, r *http.Request) { +// handleListTopics is an HTTP request handler for `GET /topics` +func (s *T) handleListTopics(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() var err error @@ -478,40 +481,19 @@ func (s *T) handleGetTopicsMetadata(w http.ResponseWriter, r *http.Request) { _, withConfig := r.Form[prmTopicsWithConfig] _, withPartitions := r.Form[prmTopicsWithPartitions] - topicsMetadata, err := pxy.GetTopicsMetadata(withPartitions, withConfig) + topicsMetadata, err := pxy.ListTopics(withPartitions, withConfig) if err != nil { s.respondWithJSON(w, http.StatusInternalServerError, errorRs{err.Error()}) return } if withPartitions || withConfig { - topicsMetadataView := make(map[string]*topicMetadata) + topicMetadataViews := make(map[string]*topicMetadata) for _, tm := range topicsMetadata { - tm_view := new(topicMetadata) - if withPartitions { - for _, p := range tm.Partitions { - p_view := partitionMetadata{ - ID: p.ID, - Leader: p.Leader, - Replicas: p.Replicas, - Isr: p.Isr, - } - tm_view.Partitions = append(tm_view.Partitions, p_view) - } - } - if withConfig { - cfg := new(topicConfig) - err = json.Unmarshal(tm.Config, &cfg) - if err != nil { - s.respondWithJSON(w, http.StatusInternalServerError, errorRs{err.Error()}) - return - } - - tm_view.Config = cfg - } - topicsMetadataView[tm.Topic] = tm_view + topicMetadataView := newTopicMetadataView(withPartitions, withConfig, tm) + topicMetadataViews[tm.Topic] = &topicMetadataView } - s.respondWithJSON(w, http.StatusOK, topicsMetadataView) + s.respondWithJSON(w, http.StatusOK, topicMetadataViews) return } @@ -522,6 +504,37 @@ func (s *T) handleGetTopicsMetadata(w http.ResponseWriter, r *http.Request) { s.respondWithJSON(w, http.StatusOK, topics) } +// handleGetTopicMetadata is an HTTP request handler for `GET /topics/{topic}` +func (s *T) handleGetTopicMetadata(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var err error + + pxy, err := s.getProxy(r) + if err != nil { + s.respondWithJSON(w, http.StatusBadRequest, errorRs{err.Error()}) + return + } + topic := mux.Vars(r)[prmTopic] + + err = r.ParseForm() + if err != nil { + s.respondWithJSON(w, http.StatusBadRequest, errorRs{err.Error()}) + return + } + + withConfig := true + _, withPartitions := r.Form[prmTopicsWithPartitions] + + tm, err := pxy.GetTopicMetadata(topic, withPartitions, withConfig) + if err != nil { + s.respondWithJSON(w, http.StatusInternalServerError, errorRs{err.Error()}) + return + } + + tm_view := newTopicMetadataView(withPartitions, withConfig, tm) + s.respondWithJSON(w, http.StatusOK, tm_view) +} + func (s *T) handlePing(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() w.WriteHeader(http.StatusOK) @@ -556,19 +569,19 @@ type errorRs struct { } type topicConfig struct { - Version int32 `json:"version"` - ConfigMap map[string]string `json:"config"` + Version int32 `json:"version"` + Config map[string]string `json:"config"` } type partitionMetadata struct { ID int32 `json:"partition"` Leader int32 `json:"leader"` Replicas []int32 `json:"replicas"` - Isr []int32 `json:"isr"` + ISR []int32 `json:"isr"` } type topicMetadata struct { - Config *topicConfig `json:"topic_config,omitempty"` + Config *topicConfig `json:"config,omitempty"` Partitions []partitionMetadata `json:"partitions,omitempty"` } @@ -662,3 +675,26 @@ func parseAck(r *http.Request, isConsReq bool) (proxy.Ack, error) { } return proxy.NoAck(), errors.Errorf("%s and %s either both should be provided or neither", partitionPrmName, offsetPrmName) } + +func newTopicMetadataView(withPartitions, withConfig bool, tm admin.TopicMetadata) topicMetadata { + topicMetadataView := topicMetadata{} + if withPartitions { + for _, p := range tm.Partitions { + partitionView := partitionMetadata{ + ID: p.ID, + Leader: p.Leader, + Replicas: p.Replicas, + ISR: p.ISR, + } + topicMetadataView.Partitions = append(topicMetadataView.Partitions, partitionView) + } + } + if withConfig { + topicConfig := topicConfig{ + Version: tm.Config.Version, + Config: tm.Config.Config, + } + topicMetadataView.Config = &topicConfig + } + return topicMetadataView +} diff --git a/service/service_grpc_test.go b/service/service_grpc_test.go index 32268041..e6167be1 100644 --- a/service/service_grpc_test.go +++ b/service/service_grpc_test.go @@ -55,11 +55,11 @@ func (s *ServiceGRPCSuite) TearDownTest(c *C) { func (s *ServiceGRPCSuite) TestProduceWithKey(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) offsetsBefore := s.kh.GetNewestOffsets("test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -89,11 +89,11 @@ func (s *ServiceGRPCSuite) TestProduceWithKey(c *C) { func (s *ServiceGRPCSuite) TestProduceKeyUndefined(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) offsetsBefore := s.kh.GetNewestOffsets("test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -124,11 +124,11 @@ func (s *ServiceGRPCSuite) TestProduceKeyUndefined(c *C) { func (s *ServiceGRPCSuite) TestProduceDefaultKey(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) offsetsBefore := s.kh.GetNewestOffsets("test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -159,11 +159,11 @@ func (s *ServiceGRPCSuite) TestProduceSync(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) defer svc.Stop() - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) offsetsBefore := s.kh.GetNewestOffsets("test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -183,9 +183,9 @@ func (s *ServiceGRPCSuite) TestProduceInvalidProxy(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) defer svc.Stop() - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -207,14 +207,14 @@ func (s *ServiceGRPCSuite) TestProduceInvalidProxy(c *C) { func (s *ServiceGRPCSuite) TestConsumeAutoAck(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.4") produced := s.kh.PutMessages("auto-ack", "test.4", map[string]int{"A": 17, "B": 19, "C": 23, "D": 29}) consumed := make(map[string][]*pb.ConsRs) offsetsBefore := s.kh.GetCommittedOffsets("foo", "test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -248,13 +248,13 @@ func (s *ServiceGRPCSuite) TestConsumeNoAck(c *C) { s.proxyCfg.Consumer.SubscriptionTimeout = 500 * time.Millisecond svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.1") s.kh.PutMessages("no-ack", "test.1", map[string]int{"A": 1}) offsetsBefore := s.kh.GetCommittedOffsets("foo", "test.1") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -276,7 +276,7 @@ func (s *ServiceGRPCSuite) TestConsumeNoAck(c *C) { func (s *ServiceGRPCSuite) TestGetOffsets(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.4") s.kh.PutMessages("auto-ack", "test.4", map[string]int{"A": 1}) @@ -316,14 +316,14 @@ func (s *ServiceGRPCSuite) TestGetOffsets(c *C) { func (s *ServiceGRPCSuite) TestConsumeExplicitAck(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.4") produced := s.kh.PutMessages("explicit-ack", "test.4", map[string]int{"A": 17, "B": 19, "C": 23, "D": 29}) consumed := make(map[string][]*pb.ConsRs) offsetsBefore := s.kh.GetCommittedOffsets("foo", "test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When: @@ -377,11 +377,11 @@ func (s *ServiceGRPCSuite) TestConsumeExplicitProxy(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) defer svc.Stop() - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() prodReq := pb.ProdRq{ @@ -412,11 +412,11 @@ func (s *ServiceGRPCSuite) TestConsumeKeyUndefined(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) defer svc.Stop() - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) s.kh.ResetOffsets("foo", "test.4") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() prodReq := pb.ProdRq{ @@ -446,9 +446,9 @@ func (s *ServiceGRPCSuite) TestConsumeInvalidProxy(c *C) { svc, err := Spawn(s.cfg) c.Assert(err, IsNil) defer svc.Stop() - s.waitSvcUp(c, 1*time.Second) + s.waitSvcUp(c, 5*time.Second) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // When @@ -474,5 +474,6 @@ func (s *ServiceGRPCSuite) waitSvcUp(c *C, timeout time.Duration) { c.Errorf("Service is not up") return } + time.Sleep(100 * time.Millisecond) } } diff --git a/service/service_http_test.go b/service/service_http_test.go index 9cb33f5c..3c878b55 100644 --- a/service/service_http_test.go +++ b/service/service_http_test.go @@ -920,7 +920,7 @@ func (s *ServiceHTTPSuite) TestGetTopicsWithConfig(c *C) { c.Error(fmt.Sprintf("Can't obtain config for %s error %v", topic, err)) } - cfg := metadata["topic_config"].(map[string]interface{}) + cfg := metadata["config"].(map[string]interface{}) version := int(cfg["version"].(float64)) c.Assert(version, Equals, 1) @@ -959,7 +959,7 @@ func (s *ServiceHTTPSuite) TestGetTopicsWithPartitionsAndWithConfig(c *C) { metadata := raw_metadata.(map[string]interface{}) c.Assert(metadata["partitions"], NotNil) - c.Assert(metadata["topic_config"], NotNil) + c.Assert(metadata["config"], NotNil) } }