Skip to content

Commit

Permalink
Fix monitoring metrics for individual collectors (#389)
Browse files Browse the repository at this point in the history
* fix monitoring metrics for individual collectors

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>

* add collector cache TTL

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>

* reuse

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>

* Add collector cache test

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>

* Fix caching logic and TTL for cache

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>

---------

Signed-off-by: Ananya Kumar Mallik <ananya.mallik@paymentsense.com>
  • Loading branch information
ananya-mallik-ps authored Dec 12, 2024
1 parent 231987c commit 6a15a74
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 12 deletions.
76 changes: 76 additions & 0 deletions collectors/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,79 @@ func (d *descriptorCache) Store(prefix string, data []*monitoring.MetricDescript
defer d.lock.Unlock()
d.cache[prefix] = &entry
}

// collectorCache is a cache for MonitoringCollectors
type CollectorCache struct {
cache map[string]*collectorCacheEntry
lock sync.RWMutex
ttl time.Duration
}

// collectorCacheEntry is a cache entry for a MonitoringCollector
type collectorCacheEntry struct {
collector *MonitoringCollector
expiry time.Time
}

// NewCollectorCache returns a new CollectorCache with the given TTL
func NewCollectorCache(ttl time.Duration) *CollectorCache {
c := &CollectorCache{
cache: make(map[string]*collectorCacheEntry),
ttl: ttl,
}

go c.cleanup()
return c
}

// Get returns a MonitoringCollector if the key is found and not expired
// If key is found it resets the TTL for the collector
func (c *CollectorCache) Get(key string) (*MonitoringCollector, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

entry, ok := c.cache[key]

if !ok {
return nil, false
}

if time.Now().After(entry.expiry) {
delete(c.cache, key)
return nil, false
}

entry.expiry = time.Now().Add(c.ttl)
return entry.collector, true
}

func (c *CollectorCache) Store(key string, collector *MonitoringCollector) {
entry := &collectorCacheEntry{
collector: collector,
expiry: time.Now().Add(c.ttl),
}

c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = entry
}

func (c *CollectorCache) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
c.removeExpired()
}
}

func (c *CollectorCache) removeExpired() {
c.lock.Lock()
defer c.lock.Unlock()

now := time.Now()
for key, entry := range c.cache {
if now.After(entry.expiry) {
delete(c.cache, key)
}
}
}
53 changes: 53 additions & 0 deletions collectors/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,56 @@ func TestDescriptorCache(t *testing.T) {
t.Error("cache entries should have expired")
}
}

func TestCollectorCache(t *testing.T) {
createCollector := func(id string) *MonitoringCollector {
return &MonitoringCollector{
projectID: id,
}
}

t.Run("basic cache Op", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)
collector := createCollector("test-project")
key := "test-key"

cache.Store(key, collector)

if _, found := cache.Get("test-key"); !found {
t.Error("Collector should be available in cache before TTL")
}

time.Sleep(2 * ttl)
if _, found := cache.Get("test-key"); found {
t.Error("Collector should have expired")
}
})

t.Run("multiple collectors", func(t *testing.T) {
ttl := 1 * time.Second
cache := NewCollectorCache(ttl)

collectors := map[string]*MonitoringCollector{
"test-key-1": createCollector("test-project-1"),
"test-key-2": createCollector("test-project-2"),
"test-key-3": createCollector("test-project-3"),
}

for k, v := range collectors {
cache.Store(k, v)
}

for k, original := range collectors {
cached, found := cache.Get(k)
if !found {
t.Errorf("Collector %s not found in cache", k)
continue
}

if cached.projectID != original.projectID {
t.Errorf("Wrong collector for key %s. Got projectId %s, want %s", k, cached.projectID, original.projectID)
}
}
})
}
56 changes: 44 additions & 12 deletions stackdriver_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"slices"
"strings"
"time"

"github.com/PuerkitoBio/rehttp"
"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -185,6 +186,7 @@ type handler struct {
metricsExtraFilters []collectors.MetricFilter
additionalGatherer prometheus.Gatherer
m *monitoring.Service
collectors *collectors.CollectorCache
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -203,35 +205,65 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler {
var ttl time.Duration
// Add collector caching TTL as max of deltas aggregation or descriptor caching
if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 {
ttl = *monitoringMetricsDeltasTTL
if *monitoringDescriptorCacheTTL > ttl {
ttl = *monitoringDescriptorCacheTTL
}
} else {
ttl = 2 * time.Hour
}

logger.Info("Creating collector cache", "ttl", ttl)

h := &handler{
logger: logger,
projectIDs: projectIDs,
metricsPrefixes: metricPrefixes,
metricsExtraFilters: metricExtraFilters,
additionalGatherer: additionalGatherer,
m: m,
collectors: collectors.NewCollectorCache(ttl),
}

h.handler = h.innerHandler(nil)
return h
}

func (h *handler) getCollector(project string, filters map[string]bool) (*collectors.MonitoringCollector, error) {
filterdPrefixes := h.filterMetricTypePrefixes(filters)
collectorKey := fmt.Sprintf("%s-%v", project, filterdPrefixes)

if collector, found := h.collectors.Get(collectorKey); found {
return collector, nil
}

collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: filterdPrefixes,
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
if err != nil {
return nil, err
}
h.collectors.Store(collectorKey, collector)
return collector, nil
}

func (h *handler) innerHandler(filters map[string]bool) http.Handler {
registry := prometheus.NewRegistry()

for _, project := range h.projectIDs {
monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
MetricTypePrefixes: h.filterMetricTypePrefixes(filters),
ExtraFilters: h.metricsExtraFilters,
RequestInterval: *monitoringMetricsInterval,
RequestOffset: *monitoringMetricsOffset,
IngestDelay: *monitoringMetricsIngestDelay,
FillMissingLabels: *collectorFillMissingLabels,
DropDelegatedProjects: *monitoringDropDelegatedProjects,
AggregateDeltas: *monitoringMetricsAggregateDeltas,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
monitoringCollector, err := h.getCollector(project, filters)
if err != nil {
h.logger.Error("error creating monitoring collector", "err", err)
os.Exit(1)
Expand Down

0 comments on commit 6a15a74

Please sign in to comment.