From d95273243214a005b7b70f46b87f47e86bee60fa Mon Sep 17 00:00:00 2001 From: Abhishek Patro Date: Fri, 3 Jan 2025 15:58:23 -0800 Subject: [PATCH] cache pubsub topic and reuse it to publish future messages --- README.md | 4 ++ config/config.go | 5 +- datastore/googlepubsub/publisher.go | 92 ++++++++++++++++++++--------- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 5267b8a..5c5986b 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,10 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` * Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V` * Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS` + * If you have already created relevant topics, you can use `topic_check_refresh_interval_seconds` to update server's behavior:- + * 0 (default) : Will always attempt to create and check existence of topic before publishing + * -1 : Will attempt to dispatch the message without checking the existence of topic. Set this to -1, if topics were created before deploying + * x : Will only attempt to check existence of topic every x seconds. If the topic was deleted, it will attempt to recreate after x seconds. * ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) * Logger: This is a simple STDOUT logger that serializes the protos to json. diff --git a/config/config.go b/config/config.go index d23865a..416e398 100644 --- a/config/config.go +++ b/config/config.go @@ -131,6 +131,9 @@ type Pubsub struct { // GCP Project ID ProjectID string `json:"gcp_project_id,omitempty"` + // TopicCheckRefreshIntervalSeconds -1: do not check if topic exists, 0: always check, non zero value signifies refresh interval for check + TopicCheckRefreshIntervalSeconds int `json:"topic_check_refresh_interval_seconds,omitempty"` + Publisher *pubsub.Client } @@ -281,7 +284,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l if c.Pubsub == nil { return nil, nil, errors.New("expected Pubsub to be configured") } - googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger) + googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.Pubsub.TopicCheckRefreshIntervalSeconds, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger) if err != nil { return nil, nil, err } diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index ea0f026..31da0cc 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -17,17 +17,24 @@ import ( "github.com/teslamotors/fleet-telemetry/telemetry" ) +type TopicMetadata struct { + lastCheck time.Time + topic *pubsub.Topic +} + // Producer client to handle google pubsub interactions type Producer struct { - pubsubClient *pubsub.Client - projectID string - namespace string - metricsCollector metrics.MetricCollector - prometheusEnabled bool - logger *logrus.Logger - airbrakeHandler *airbrake.Handler - ackChan chan (*telemetry.Record) - reliableAckTxTypes map[string]interface{} + pubsubClient *pubsub.Client + projectID string + namespace string + metricsCollector metrics.MetricCollector + prometheusEnabled bool + logger *logrus.Logger + airbrakeHandler *airbrake.Handler + ackChan chan (*telemetry.Record) + reliableAckTxTypes map[string]interface{} + topicCheckRefreshIntervalSeconds int + topicMetadataMap map[string]*TopicMetadata } // Metrics stores metrics reported from this package @@ -57,7 +64,7 @@ func configurePubsub(projectID string) (*pubsub.Client, error) { } // NewProducer establishes the pubsub connection and define the dispatch method -func NewProducer(prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) { +func NewProducer(prometheusEnabled bool, projectID string, namespace string, topicCheckRefreshIntervalSeconds int, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metricsCollector) pubsubClient, err := configurePubsub(projectID) if err != nil { @@ -65,37 +72,66 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met } p := &Producer{ - projectID: projectID, - namespace: namespace, - pubsubClient: pubsubClient, - prometheusEnabled: prometheusEnabled, - metricsCollector: metricsCollector, - logger: logger, - airbrakeHandler: airbrakeHandler, - ackChan: ackChan, - reliableAckTxTypes: reliableAckTxTypes, + projectID: projectID, + namespace: namespace, + pubsubClient: pubsubClient, + prometheusEnabled: prometheusEnabled, + metricsCollector: metricsCollector, + logger: logger, + airbrakeHandler: airbrakeHandler, + ackChan: ackChan, + reliableAckTxTypes: reliableAckTxTypes, + topicCheckRefreshIntervalSeconds: topicCheckRefreshIntervalSeconds, + topicMetadataMap: make(map[string]*TopicMetadata, 0), } p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace}) return p, nil } -// Produce sends the record payload to pubsub -func (p *Producer) Produce(entry *telemetry.Record) { - ctx := context.Background() +func (p *Producer) cachedTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic { + // No need to check for existence again + topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) + if p.topicCheckRefreshIntervalSeconds < 0 { + p.pubsubClient.Topic(topicName) + } + topicMetadata, ok := p.topicMetadataMap[topicName] + now := time.Now() + + if ok && p.topicCheckRefreshIntervalSeconds < 0 { + return topicMetadata.topic + } + + if !ok || now.Sub(topicMetadata.lastCheck) > time.Duration(p.topicCheckRefreshIntervalSeconds)*time.Second { + return p.upsertTopic(ctx, entry) + } + + return topicMetadata.topic +} + +// upsertTopic creates topic if doesn't exist and adds it to map +func (p *Producer) upsertTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic { topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid} pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName) - if err != nil { p.ReportError("pubsub_topic_creation_error", err, logInfo) metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) - return + return nil } - if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil { - p.ReportError("pubsub_topic_check_error", err, logInfo) - metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) + p.topicMetadataMap[topicName] = &TopicMetadata{lastCheck: time.Now(), topic: pubsubTopic} + return pubsubTopic +} + +// Produce sends the record payload to pubsub +func (p *Producer) Produce(entry *telemetry.Record) { + ctx := context.Background() + topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) + logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid} + + pubsubTopic := p.cachedTopic(ctx, entry) + if pubsubTopic == nil { return } @@ -104,7 +140,7 @@ func (p *Producer) Produce(entry *telemetry.Record) { Data: entry.Payload(), Attributes: entry.Metadata(), }) - if _, err = result.Get(ctx); err != nil { + if _, err := result.Get(ctx); err != nil { p.ReportError("pubsub_err", err, logInfo) metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType}) return