Skip to content

Commit

Permalink
cache pubsub topic and reuse it to publish future messages
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 4, 2025
1 parent 4034cea commit d952732
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 29 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* If you have already created relevant topics, you can use `topic_check_refresh_interval_seconds` to update server's behavior:-
* 0 (default) : Will always attempt to create and check existence of topic before publishing
* -1 : Will attempt to dispatch the message without checking the existence of topic. Set this to -1, if topics were created before deploying
* x : Will only attempt to check existence of topic every x seconds. If the topic was deleted, it will attempt to recreate after x seconds.
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

Expand Down
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Pubsub struct {
// GCP Project ID
ProjectID string `json:"gcp_project_id,omitempty"`

// TopicCheckRefreshIntervalSeconds -1: do not check if topic exists, 0: always check, non zero value signifies refresh interval for check
TopicCheckRefreshIntervalSeconds int `json:"topic_check_refresh_interval_seconds,omitempty"`

Publisher *pubsub.Client
}

Expand Down Expand Up @@ -281,7 +284,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
if c.Pubsub == nil {
return nil, nil, errors.New("expected Pubsub to be configured")
}
googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.Pubsub.TopicCheckRefreshIntervalSeconds, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
if err != nil {
return nil, nil, err
}
Expand Down
92 changes: 64 additions & 28 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@ import (
"github.com/teslamotors/fleet-telemetry/telemetry"
)

type TopicMetadata struct {

Check failure on line 20 in datastore/googlepubsub/publisher.go

View workflow job for this annotation

GitHub Actions / build

exported: exported type TopicMetadata should have comment or be unexported (revive)
lastCheck time.Time
topic *pubsub.Topic
}

// Producer client to handle google pubsub interactions
type Producer struct {
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
pubsubClient *pubsub.Client
projectID string
namespace string
metricsCollector metrics.MetricCollector
prometheusEnabled bool
logger *logrus.Logger
airbrakeHandler *airbrake.Handler
ackChan chan (*telemetry.Record)
reliableAckTxTypes map[string]interface{}
topicCheckRefreshIntervalSeconds int
topicMetadataMap map[string]*TopicMetadata
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -57,45 +64,74 @@ func configurePubsub(projectID string) (*pubsub.Client, error) {
}

// NewProducer establishes the pubsub connection and define the dispatch method
func NewProducer(prometheusEnabled bool, projectID string, namespace string, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(prometheusEnabled bool, projectID string, namespace string, topicCheckRefreshIntervalSeconds int, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)
pubsubClient, err := configurePubsub(projectID)
if err != nil {
return nil, fmt.Errorf("pubsub_connect_error %s", err)
}

p := &Producer{
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
airbrakeHandler: airbrakeHandler,
ackChan: ackChan,
reliableAckTxTypes: reliableAckTxTypes,
topicCheckRefreshIntervalSeconds: topicCheckRefreshIntervalSeconds,
topicMetadataMap: make(map[string]*TopicMetadata, 0),
}
p.logger.ActivityLog("pubsub_registered", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
func (p *Producer) cachedTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
// No need to check for existence again
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
if p.topicCheckRefreshIntervalSeconds < 0 {
p.pubsubClient.Topic(topicName)
}

topicMetadata, ok := p.topicMetadataMap[topicName]
now := time.Now()

if ok && p.topicCheckRefreshIntervalSeconds < 0 {
return topicMetadata.topic
}

if !ok || now.Sub(topicMetadata.lastCheck) > time.Duration(p.topicCheckRefreshIntervalSeconds)*time.Second {
return p.upsertTopic(ctx, entry)
}

return topicMetadata.topic
}

// upsertTopic creates topic if doesn't exist and adds it to map
func (p *Producer) upsertTopic(ctx context.Context, entry *telemetry.Record) *pubsub.Topic {
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
return nil
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
p.topicMetadataMap[topicName] = &TopicMetadata{lastCheck: time.Now(), topic: pubsubTopic}
return pubsubTopic
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}

pubsubTopic := p.cachedTopic(ctx, entry)
if pubsubTopic == nil {
return
}

Expand All @@ -104,7 +140,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {
if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down

0 comments on commit d952732

Please sign in to comment.