diff --git a/README.md b/README.md index 5267b8a..a0b81b0 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,7 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` * Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V` * Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS` + * Fleet Telemetry server will try to create topics on startup. If it topic doesn't exists or creation of topic fails, server will fail to startup and crash. * ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) * Logger: This is a simple STDOUT logger that serializes the protos to json. diff --git a/config/config.go b/config/config.go index d23865a..f69906e 100644 --- a/config/config.go +++ b/config/config.go @@ -250,6 +250,7 @@ func (c *Config) prometheusEnabled() bool { // ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger) func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) { + var pubsubTxTypes []string reliableAckSources, err := c.configureReliableAckSources() if err != nil { return nil, nil, err @@ -319,6 +320,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l for recordName, dispatchRules := range c.Records { var dispatchFuncs []telemetry.Producer for _, dispatchRule := range dispatchRules { + if dispatchRule == telemetry.Pubsub { + pubsubTxTypes = append(pubsubTxTypes, recordName) + } dispatchFuncs = append(dispatchFuncs, producers[dispatchRule]) } dispatchProducerRules[recordName] = dispatchFuncs @@ -328,6 +332,12 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l } } + if len(pubsubTxTypes) > 0 { + if err := producers[telemetry.Pubsub].(*googlepubsub.Producer).ProvisionTopics(pubsubTxTypes); err != nil { + return nil, nil, err + } + } + return producers, dispatchProducerRules, nil } diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index ea0f026..a0753ea 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -79,32 +79,37 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met return p, nil } +func (p *Producer) ProvisionTopics(txTypes []string) error { + ctx := context.Background() + for _, txType := range txTypes { + topicName := telemetry.BuildTopicName(p.namespace, txType) + logInfo := logrus.LogInfo{"topic_name": topicName, "txType": txType} + _, err := p.createTopicIfNotExists(ctx, topicName) + if err != nil { + p.ReportError("pubsub_topic_creation_error", err, logInfo) + metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": txType}) + return err + } + p.logger.ActivityLog("pubsub_topic_created", logInfo) + } + return nil +} + // Produce sends the record payload to pubsub func (p *Producer) Produce(entry *telemetry.Record) { ctx := context.Background() topicName := telemetry.BuildTopicName(p.namespace, entry.TxType) + pubsubTopic := p.pubsubClient.Topic(topicName) logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid} - pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName) - - if err != nil { - p.ReportError("pubsub_topic_creation_error", err, logInfo) - metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) - return - } - - if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil { - p.ReportError("pubsub_topic_check_error", err, logInfo) - metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType}) - return - } entry.ProduceTime = time.Now() result := pubsubTopic.Publish(ctx, &pubsub.Message{ Data: entry.Payload(), Attributes: entry.Metadata(), }) - if _, err = result.Get(ctx); err != nil { + + if _, err := result.Get(ctx); err != nil { p.ReportError("pubsub_err", err, logInfo) metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType}) return