diff --git a/config/config.go b/config/config.go index d23865a..537fb92 100644 --- a/config/config.go +++ b/config/config.go @@ -131,6 +131,9 @@ type Pubsub struct { // GCP Project ID ProjectID string `json:"gcp_project_id,omitempty"` + // TopicCheckRefreshIntervalSeconds -1: fetch only once, 0: always check, non zero value signifies refresh interval for check + TopicCheckRefreshIntervalSeconds int `json:"topic_check_refresh_interval_seconds,omitempty"` + Publisher *pubsub.Client } diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index ea0f026..a380fd6 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -17,17 +17,24 @@ import ( "github.com/teslamotors/fleet-telemetry/telemetry" ) +type TopicMetadata struct { + lastCheck time.Time + topic *pubsub.Topic +} + // Producer client to handle google pubsub interactions type Producer struct { - pubsubClient *pubsub.Client - projectID string - namespace string - metricsCollector metrics.MetricCollector - prometheusEnabled bool - logger *logrus.Logger - airbrakeHandler *airbrake.Handler - ackChan chan (*telemetry.Record) - reliableAckTxTypes map[string]interface{} + pubsubClient *pubsub.Client + projectID string + namespace string + metricsCollector metrics.MetricCollector + prometheusEnabled bool + logger *logrus.Logger + airbrakeHandler *airbrake.Handler + ackChan chan (*telemetry.Record) + reliableAckTxTypes map[string]interface{} + topicCheckRefreshIntervalSeconds int + topicMetadataMap map[string]*TopicMetadata } // Metrics stores metrics reported from this package @@ -57,7 +64,7 @@ func configurePubsub(projectID string) (*pubsub.Client, error) { } // NewProducer establishes the pubsub connection and define the dispatch method -func NewProducer(prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) { +func NewProducer(prometheusEnabled bool, projectID string, namespace string, topicCheckRefreshIntervalSeconds int, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metricsCollector) pubsubClient, err := configurePubsub(projectID) if err != nil { @@ -65,37 +72,67 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met } p := &Producer{ - projectID: projectID, - namespace: namespace, - pubsubClient: pubsubClient, - prometheusEnabled: prometheusEnabled, - metricsCollector: metricsCollector, - logger: logger, - airbrakeHandler: airbrakeHandler, - ackChan: ackChan, - reliableAckTxTypes: reliableAckTxTypes, + projectID: projectID, + namespace: namespace, + pubsubClient: pubsubClient, + prometheusEnabled: prometheusEnabled, + metricsCollector: metricsCollector, + logger: logger, + airbrakeHandler: airbrakeHandler, + ackChan: ackChan, + reliableAckTxTypes: reliableAckTxTypes, + topicCheckRefreshIntervalSeconds: topicCheckRefreshIntervalSeconds, + topicMetadataMap: make(map[string]*TopicMetadata, 0), } p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace}) return p, nil } -// Produce sends the record payload to pubsub -func (p *Producer) Produce(entry *telemetry.Record) { - ctx := context.Background() +func (p *Producer) cachedTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic { + topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) + + topicMetadata, ok := p.topicMetadataMap[topicName] + now := time.Now() + + if ok && p.topicCheckRefreshIntervalSeconds < 0 { + return topicMetadata.topic + } + + if !ok || now.Sub(topicMetadata.lastCheck) > time.Duration(p.topicCheckRefreshIntervalSeconds)*time.Second { + return p.updateTopicCheck(ctx, entry) + } + + return topicMetadata.topic +} +func (p *Producer) updateTopicCheck(ctx context.Context, entry *telemetry.Record) *pubsub.Topic { topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid} pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName) - if err != nil { p.ReportError("pubsub_topic_creation_error", err, logInfo) metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) - return + return nil } if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil { p.ReportError("pubsub_topic_check_error", err, logInfo) metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) + return nil + } + + p.topicMetadataMap[topicName] = &TopicMetadata{lastCheck: time.Now(), topic: pubsubTopic} + return pubsubTopic +} + +// Produce sends the record payload to pubsub +func (p *Producer) Produce(entry *telemetry.Record) { + ctx := context.Background() + topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) + logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid} + + pubsubTopic := p.cachedTopic(ctx, entry) + if pubsubTopic == nil { return } @@ -104,7 +141,7 @@ func (p *Producer) Produce(entry *telemetry.Record) { Data: entry.Payload(), Attributes: entry.Metadata(), }) - if _, err = result.Get(ctx); err != nil { + if _, err := result.Get(ctx); err != nil { p.ReportError("pubsub_err", err, logInfo) metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType}) return