Skip to content

Commit

Permalink
cache pubsub topic and reuse it to publish future messages
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 3, 2025
1 parent 4034cea commit 9614fed
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 25 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Pubsub struct {
// GCP Project ID
ProjectID string `json:"gcp_project_id,omitempty"`

// TopicCheckRefreshIntervalSeconds -1: fetch only once, 0: always check, non zero value signifies refresh interval for check
TopicCheckRefreshIntervalSeconds int `json:"topic_check_refresh_interval_seconds,omitempty"`

Publisher *pubsub.Client
}

Expand Down
87 changes: 62 additions & 25 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@ import (
"github.com/teslamotors/fleet-telemetry/telemetry"
)

type TopicMetadata struct {
lastCheck time.Time
topic *pubsub.Topic
}

// Producer client to handle google pubsub interactions
type Producer struct {
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
topicCheckRefreshIntervalSeconds int
topicMetadataMap map[string]*TopicMetadata
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -57,45 +64,75 @@ func configurePubsub(projectID string) (*pubsub.Client, error) {
}

// NewProducer establishes the pubsub connection and define the dispatch method
func NewProducer(prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(prometheusEnabled bool, projectID string, namespace string, topicCheckRefreshIntervalSeconds int, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)
pubsubClient, err := configurePubsub(projectID)
if err != nil {
return nil, fmt.Errorf("pubsub_connect_error %s", err)
}

p := &Producer{
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
topicCheckRefreshIntervalSeconds: topicCheckRefreshIntervalSeconds,
topicMetadataMap: make(map[string]*TopicMetadata, 0),
}
p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
func (p *Producer) cachedTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)

topicMetadata, ok := p.topicMetadataMap[topicName]
now := time.Now()

if ok && p.topicCheckRefreshIntervalSeconds < 0 {
return topicMetadata.topic
}

if !ok || now.Sub(topicMetadata.lastCheck) > time.Duration(p.topicCheckRefreshIntervalSeconds)*time.Second {
return p.updateTopicCheck(ctx, entry)
}

return topicMetadata.topic
}

func (p *Producer) updateTopicCheck(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
return nil
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return nil
}

p.topicMetadataMap[topicName] = &TopicMetadata{lastCheck: time.Now(), topic: pubsubTopic}
return pubsubTopic
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}

pubsubTopic := p.cachedTopic(ctx, entry)
if pubsubTopic == nil {
return
}

Expand All @@ -104,7 +141,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {
if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down

0 comments on commit 9614fed

Please sign in to comment.