Skip to content

Commit

Permalink
create http dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickdemers6 committed Dec 21, 2023
1 parent 1e01c89 commit bfeb4d8
Show file tree
Hide file tree
Showing 17 changed files with 564 additions and 81 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* HTTP: Send events to an HTTP endpoint. This should **never** be used in production grade systems as failed requests are discarded and there is limited scalability. It is suitable for owners wishing to stream data directly from their own vehicles. [View configuration options](./datastore/http/http.go#L37).
* Logger: This is a simple STDOUT logger that serializes the protos to json.

>NOTE: To add a new dispatcher, please provide integration tests and updated documentation.
Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"

"github.com/teslamotors/fleet-telemetry/datastore/googlepubsub"
"github.com/teslamotors/fleet-telemetry/datastore/http"
"github.com/teslamotors/fleet-telemetry/datastore/kafka"
"github.com/teslamotors/fleet-telemetry/datastore/kinesis"
"github.com/teslamotors/fleet-telemetry/datastore/simple"
Expand Down Expand Up @@ -65,6 +66,9 @@ type Config struct {
// ZMQ configures a zeromq socket
ZMQ *zmq.Config `json:"zmq,omitempty"`

// HTTP is a configuration for HTTP producer
HTTP *http.Config `json:"http,omitempty"`

// Namespace defines a prefix for the kafka/pubsub topic
Namespace string `json:"namespace,omitempty"`

Expand Down Expand Up @@ -251,6 +255,17 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
producers[telemetry.ZMQ] = zmqProducer
}

if _, ok := requiredDispatchers[telemetry.HTTP]; ok {
if c.HTTP == nil {
return nil, errors.New("expected http to be configured")
}
httpProducer, err := http.NewProducer(c.HTTP, c.MetricCollector, c.Namespace, logger)
if err != nil {
return nil, err
}
producers[telemetry.HTTP] = httpProducer
}

dispatchProducerRules := make(map[string][]telemetry.Producer)
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
Expand Down
28 changes: 28 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,34 @@ var _ = Describe("Test full application config", func() {
})
})

Context("configure http", func() {
var httpConfig *Config

BeforeEach(func() {
var err error
httpConfig, err = loadTestApplicationConfig(TestHTTPConfig)
Expect(err).NotTo(HaveOccurred())
})

It("returns an error if http isn't included", func() {
log, _ := test.NewNullLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"http"}}
var err error
producers, err = config.ConfigureProducers(log)
Expect(err).To(MatchError("expected http to be configured"))
Expect(producers).To(BeNil())
producers, err = httpConfig.ConfigureProducers(log)
Expect(err).To(BeNil())
})

It("http config works", func() {
log, _ := test.NewNullLogger()
producers, err := httpConfig.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
})

Context("configureMetricsCollector", func() {
It("does not fail when TLS is nil ", func() {
log, _ := test.NewNullLogger()
Expand Down
14 changes: 14 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,17 @@ const TestZMQConfig = `
}
}
`

const TestHTTPConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"http": {
"url": "https://tesla.com/webhook"
},
"records": {
"FS": ["http"]
}
}
`
208 changes: 208 additions & 0 deletions datastore/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package http

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/sirupsen/logrus"

"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

// Producer client to handle http
type Producer struct {
namespace string
metricsCollector metrics.MetricCollector
logger *logrus.Logger
records chan *telemetry.Record
address string
httpClient http.Client
}

// Metrics stores metrics reported from this package
type Metrics struct {
produceCount adapter.Counter
byteTotal adapter.Counter
errorCount adapter.Counter
bufferSize adapter.Gauge
}

// Config contains the data necessary to configure an http producer.
type Config struct {
// WorkerCount is the number of http producer routines running.
// This number should be increased if `http_produce_buffer_size` is growing.
WorkerCount int `json:"worker_count"`

// Address is the address to produce requests to.
Address string `json:"address"`

// Timeout is the number of seconds to wait for a response. Defaults to 10.
Timeout int `json:"timeout"`
}

const (
statusCodePreSendErr = "NOT_SENT"
bufferSize = 128
)

var (
metricsRegistry Metrics
metricsOnce sync.Once
)

// NewProducer sets up an HTTP producer
func NewProducer(config *Config, metricsCollector metrics.MetricCollector, namespace string, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

err := validateConfig(config)
if err != nil {
return nil, err
}
requests := make(chan *telemetry.Record, bufferSize)

producer := &Producer{
namespace: namespace,
metricsCollector: metricsCollector,
records: requests,
address: config.Address,
logger: logger,
httpClient: http.Client{
Timeout: time.Duration(config.Timeout) * time.Second,
},
}

for i := 0; i < config.WorkerCount; i++ {
go producer.worker(requests)
}

logger.Infof("registered http producer for namespace: %s", namespace)
return producer, nil
}

// validateConfig validates configuration values and sets defaults if value not set
func validateConfig(c *Config) error {
if c.WorkerCount < 0 {
return errors.New("invalid http worker count")
}
if c.WorkerCount == 0 {
c.WorkerCount = 5
}
if c.Timeout < 0 {
return errors.New("invalid http timeout")
}
if c.Timeout == 0 {
c.Timeout = 10
}
return nil
}

func (p *Producer) worker(records <-chan *telemetry.Record) {
for record := range records {
metricsRegistry.bufferSize.Sub(1, map[string]string{})
p.sendHTTP(record)
}
}

func (p *Producer) sendHTTP(record *telemetry.Record) {
url := fmt.Sprintf("%s?namespace=%s&type=%s", p.address, p.namespace, record.TxType)
body, err := p.makeJSONBody(record)
if err != nil {
p.logError(fmt.Errorf("serialize_body_err %s", err.Error()))
return
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
p.logError(fmt.Errorf("create_request_err %s", err.Error()))
return
}

req.Header.Set("Content-Type", "application/x-protobuf")

resp, err := p.httpClient.Do(req)
if err != nil {
p.logError(fmt.Errorf("send_request_err %s", err.Error()))
return
}

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
metricsRegistry.produceCount.Inc(map[string]string{"record_type": record.TxType})
} else {
p.logError(fmt.Errorf("response_status_code %d", resp.StatusCode))
metricsRegistry.byteTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType})
}
_ = resp.Body.Close()
}

func (p *Producer) makeJSONBody(record *telemetry.Record) ([]byte, error) {
payloadJSON, err := record.ToJSON()
if err != nil {
return nil, err
}

metadata := record.Metadata()
metadataJSON, err := json.Marshal(metadata)
if err != nil {
return nil, err
}

outputMap := map[string]json.RawMessage{
"record": json.RawMessage(payloadJSON),
"metadata": json.RawMessage(metadataJSON),
}

outputJSON, err := json.Marshal(outputMap)
if err != nil {
return nil, err
}

return outputJSON, nil
}

// Produce asynchronously sends the record payload to http endpoint
func (p *Producer) Produce(record *telemetry.Record) {
p.records <- record
metricsRegistry.bufferSize.Inc(map[string]string{})
}

func (p *Producer) logError(err error) {
p.logger.Errorf("http_producer_err err: %v", err)
metricsRegistry.errorCount.Inc(map[string]string{})
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}

func registerMetrics(metricsCollector metrics.MetricCollector) {
metricsRegistry.produceCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_total",
Help: "The number of records produced to http.",
Labels: []string{"record_type"},
})

metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_total_bytes",
Help: "The number of bytes produced to http.",
Labels: []string{"record_type"},
})

metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "http_produce_err",
Help: "The number of errors while producing to http.",
Labels: []string{},
})

metricsRegistry.bufferSize = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "http_produce_buffer_size",
Help: "The number of records waiting to be produced.",
Labels: []string{},
})
}
13 changes: 13 additions & 0 deletions datastore/http/http_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package http_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestConfigs(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Http Producer Suite Tests")
}
50 changes: 50 additions & 0 deletions datastore/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package http_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/sirupsen/logrus"
"github.com/teslamotors/fleet-telemetry/datastore/http"
"github.com/teslamotors/fleet-telemetry/metrics"
)

var _ = Describe("HTTP Producer", func() {
var (
mockLogger *logrus.Logger
mockCollector metrics.MetricCollector
mockConfig *http.Config
)

BeforeEach(func() {
mockLogger = logrus.New()
mockCollector = metrics.NewCollector(nil, mockLogger)
mockConfig = &http.Config{
WorkerCount: 2,
Address: "https://tesla.com",
Timeout: 5,
}
})

Context("NewProducer", func() {
It("creates a new HTTP producer with valid config", func() {
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger)
Expect(err).ToNot(HaveOccurred())
Expect(producer).ToNot(BeNil())
})

It("returns an error for negative worker count", func() {
mockConfig.WorkerCount = -1
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger)
Expect(err).To(HaveOccurred())
Expect(producer).To(BeNil())
})

It("returns an error for negative timeout", func() {
mockConfig.Timeout = -1
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger)
Expect(err).To(HaveOccurred())
Expect(producer).To(BeNil())
})
})
})
Loading

0 comments on commit bfeb4d8

Please sign in to comment.