-
Notifications
You must be signed in to change notification settings - Fork 91
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1e01c89
commit 9d8cc25
Showing
17 changed files
with
540 additions
and
82 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
package http | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"sync" | ||
"time" | ||
|
||
"github.com/sirupsen/logrus" | ||
|
||
"github.com/teslamotors/fleet-telemetry/metrics" | ||
"github.com/teslamotors/fleet-telemetry/metrics/adapter" | ||
"github.com/teslamotors/fleet-telemetry/telemetry" | ||
) | ||
|
||
// Producer client to handle http | ||
type Producer struct { | ||
namespace string | ||
metricsCollector metrics.MetricCollector | ||
logger *logrus.Logger | ||
records chan *telemetry.Record | ||
address string | ||
httpClient http.Client | ||
} | ||
|
||
// Metrics stores metrics reported from this package | ||
type Metrics struct { | ||
produceCount adapter.Counter | ||
byteTotal adapter.Counter | ||
errorCount adapter.Counter | ||
bufferSize adapter.Gauge | ||
} | ||
|
||
// Config contains the data necessary to configure an http producer. | ||
type Config struct { | ||
// WorkerCount is the number of http producer routines running. | ||
// This number should be increased if `http_produce_buffer_size` is growing. | ||
WorkerCount int `json:"worker_count"` | ||
|
||
// Address is the address to produce requests to. | ||
Address string `json:"address"` | ||
|
||
// Timeout is the number of seconds to wait for a response. Defaults to 10. | ||
Timeout int `json:"timeout"` | ||
} | ||
|
||
const ( | ||
statusCodePreSendErr = "NOT_SENT" | ||
bufferSize = 128 | ||
) | ||
|
||
var ( | ||
metricsRegistry Metrics | ||
metricsOnce sync.Once | ||
) | ||
|
||
// NewProducer sets up an HTTP producer | ||
func NewProducer(config *Config, metricsCollector metrics.MetricCollector, namespace string, logger *logrus.Logger) (telemetry.Producer, error) { | ||
registerMetricsOnce(metricsCollector) | ||
|
||
err := validateConfig(config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
requests := make(chan *telemetry.Record, bufferSize) | ||
|
||
producer := &Producer{ | ||
namespace: namespace, | ||
metricsCollector: metricsCollector, | ||
records: requests, | ||
address: config.Address, | ||
logger: logger, | ||
httpClient: http.Client{ | ||
Timeout: time.Duration(config.Timeout) * time.Second, | ||
}, | ||
} | ||
|
||
for i := 0; i < config.WorkerCount; i++ { | ||
go producer.worker(requests) | ||
} | ||
|
||
logger.Infof("registered http producer for namespace: %s", namespace) | ||
return producer, nil | ||
} | ||
|
||
// validateConfig validates configuration values and sets defaults if value not set | ||
func validateConfig(c *Config) error { | ||
if c.WorkerCount < 0 { | ||
return errors.New("invalid http worker count") | ||
} | ||
if c.WorkerCount == 0 { | ||
c.WorkerCount = 5 | ||
} | ||
if c.Timeout < 0 { | ||
return errors.New("invalid http timeout") | ||
} | ||
if c.Timeout == 0 { | ||
c.Timeout = 10 | ||
} | ||
return nil | ||
} | ||
|
||
func (p *Producer) worker(records <-chan *telemetry.Record) { | ||
for record := range records { | ||
metricsRegistry.bufferSize.Sub(1, map[string]string{}) | ||
p.sendHTTP(record) | ||
} | ||
} | ||
|
||
func (p *Producer) sendHTTP(record *telemetry.Record) { | ||
url := fmt.Sprintf("%s?namespace=%s&type=%s", p.address, p.namespace, record.TxType) | ||
payloadJSON, err := record.ToJSON() | ||
if err != nil { | ||
p.logError(fmt.Errorf("serialize_body_err %s", err.Error())) | ||
return | ||
} | ||
|
||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(json.RawMessage(payloadJSON))) | ||
if err != nil { | ||
p.logError(fmt.Errorf("create_request_err %s", err.Error())) | ||
return | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
for key, value := range record.Metadata() { | ||
req.Header.Set(key, value) | ||
} | ||
|
||
resp, err := p.httpClient.Do(req) | ||
if err != nil { | ||
p.logError(fmt.Errorf("send_request_err %s", err.Error())) | ||
return | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode >= 200 && resp.StatusCode <= 299 { | ||
metricsRegistry.produceCount.Inc(map[string]string{"record_type": record.TxType}) | ||
metricsRegistry.byteTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType}) | ||
return | ||
} | ||
p.logError(fmt.Errorf("response_status_code %d", resp.StatusCode)) | ||
} | ||
|
||
// Produce asynchronously sends the record payload to http endpoint | ||
func (p *Producer) Produce(record *telemetry.Record) { | ||
p.records <- record | ||
metricsRegistry.bufferSize.Inc(map[string]string{}) | ||
} | ||
|
||
func (p *Producer) logError(err error) { | ||
p.logger.Errorf("http_producer_err err: %v", err) | ||
metricsRegistry.errorCount.Inc(map[string]string{}) | ||
} | ||
|
||
func registerMetricsOnce(metricsCollector metrics.MetricCollector) { | ||
metricsOnce.Do(func() { registerMetrics(metricsCollector) }) | ||
} | ||
|
||
func registerMetrics(metricsCollector metrics.MetricCollector) { | ||
metricsRegistry.produceCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ | ||
Name: "http_produce_total", | ||
Help: "The number of records produced to http.", | ||
Labels: []string{"record_type"}, | ||
}) | ||
|
||
metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{ | ||
Name: "http_produce_total_bytes", | ||
Help: "The number of bytes produced to http.", | ||
Labels: []string{"record_type"}, | ||
}) | ||
|
||
metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ | ||
Name: "http_produce_err", | ||
Help: "The number of errors while producing to http.", | ||
Labels: []string{}, | ||
}) | ||
|
||
metricsRegistry.bufferSize = metricsCollector.RegisterGauge(adapter.CollectorOptions{ | ||
Name: "http_produce_buffer_size", | ||
Help: "The number of records waiting to be produced.", | ||
Labels: []string{}, | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package http_test | ||
|
||
import ( | ||
"testing" | ||
|
||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
func TestConfigs(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "Http Producer Suite Tests") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package http_test | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/teslamotors/fleet-telemetry/datastore/http" | ||
"github.com/teslamotors/fleet-telemetry/metrics" | ||
) | ||
|
||
var _ = Describe("HTTP Producer", func() { | ||
var ( | ||
mockLogger *logrus.Logger | ||
mockCollector metrics.MetricCollector | ||
mockConfig *http.Config | ||
) | ||
|
||
BeforeEach(func() { | ||
mockLogger = logrus.New() | ||
mockCollector = metrics.NewCollector(nil, mockLogger) | ||
mockConfig = &http.Config{ | ||
WorkerCount: 2, | ||
Address: "https://tesla.com", | ||
Timeout: 5, | ||
} | ||
}) | ||
|
||
Context("NewProducer", func() { | ||
It("creates a new HTTP producer with valid config", func() { | ||
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(producer).ToNot(BeNil()) | ||
}) | ||
|
||
It("returns an error for negative worker count", func() { | ||
mockConfig.WorkerCount = -1 | ||
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) | ||
Expect(err).To(HaveOccurred()) | ||
Expect(producer).To(BeNil()) | ||
}) | ||
|
||
It("returns an error for negative timeout", func() { | ||
mockConfig.Timeout = -1 | ||
producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) | ||
Expect(err).To(HaveOccurred()) | ||
Expect(producer).To(BeNil()) | ||
}) | ||
}) | ||
}) |
Oops, something went wrong.