Skip to content

Commit

Permalink
Telemetry improvements
Browse files Browse the repository at this point in the history
* reset telemetry stuff in tests so they are always stopped
* allow logging telemetry thread stuff but don't in specs
* enforce minimum seconds around communication via polling or telemetry
  • Loading branch information
jnunemaker committed Oct 18, 2023
1 parent 838f5a6 commit d8e85fa
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 82 deletions.
96 changes: 69 additions & 27 deletions lib/flipper/cloud/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
require "logger"
require "socket"
require "flipper/adapters/http"
require "flipper/adapters/poll"
require "flipper/poller"
require "flipper/adapters/memory"
require "flipper/adapters/dual_write"
require "flipper/adapters/sync/synchronizer"
require "flipper/cloud/instrumenter"
require "flipper/cloud/telemetry"
require "flipper/cloud/telemetry/instrumenter"

module Flipper
module Cloud
Expand All @@ -19,8 +20,6 @@ class Configuration

DEFAULT_URL = "https://www.flippercloud.io/adapter".freeze

MIN_TELEMETRY_INTERVAL = 10

# Public: The token corresponding to an environment on flippercloud.io.
attr_accessor :token

Expand All @@ -44,6 +43,10 @@ class Configuration
# configuration.debug_output = STDOUT
attr_accessor :debug_output

# Public: Set the url for http adapter. Really should only be customized
# by me and you are not me.
attr_accessor :url

# Public: Instrumenter to use for the Flipper instance returned by
# Flipper::Cloud.new (default: Flipper::Instrumenters::Noop).
#
Expand Down Expand Up @@ -72,12 +75,15 @@ class Configuration
# Public: The telemetry instance to use for tracking feature usage.
attr_accessor :telemetry

# Public: The telemetry logger to use for debugging telemetry inner workings.
attr_accessor :telemetry_logger

# Public: The Integer for Float number of seconds between submission of
# telemetry to Cloud.
# telemetry to Cloud (default: 60, minimum: 10).
attr_accessor :telemetry_interval

# Public: The Integer or Float number of seconds to wait for telemetry
# to shutdown.
# to shutdown (default: 5).
attr_accessor :telemetry_shutdown_timeout

def initialize(options = {})
Expand All @@ -87,34 +93,63 @@ def initialize(options = {})
raise ArgumentError, "Flipper::Cloud token is missing. Please set FLIPPER_CLOUD_TOKEN or provide the token (e.g. Flipper::Cloud.new(token: 'token'))."
end

@read_timeout = options.fetch(:read_timeout) { ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f }
@open_timeout = options.fetch(:open_timeout) { ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f }
@write_timeout = options.fetch(:write_timeout) { ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f }

@sync_interval = options.fetch(:sync_interval) { ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f }
@sync_secret = options.fetch(:sync_secret) { ENV["FLIPPER_CLOUD_SYNC_SECRET"] }

@local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new }
# Http related setup.
@url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) }
@debug_output = options[:debug_output]
@read_timeout = options.fetch(:read_timeout) {
ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f
}
@open_timeout = options.fetch(:open_timeout) {
ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f
}
@write_timeout = options.fetch(:write_timeout) {
ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f
}
enforce_minimum(:read_timeout, 0.1)
enforce_minimum(:open_timeout, 0.1)
enforce_minimum(:write_timeout, 0.1)

# Sync setup.
@sync_interval = options.fetch(:sync_interval) {
ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f
}
@sync_secret = options.fetch(:sync_secret) {
ENV["FLIPPER_CLOUD_SYNC_SECRET"]
}
enforce_minimum(:sync_interval, 10)

# Adapter setup.
@local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new }
@adapter_block = ->(adapter) { adapter }
self.url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) }

instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)

@telemetry_interval = options.fetch(:telemetry_interval) { ENV.fetch("FLIPPER_CLOUD_TELEMETRY_INTERVAL", 60).to_f }
if @telemetry_interval < MIN_TELEMETRY_INTERVAL
@telemetry_interval = MIN_TELEMETRY_INTERVAL
end
@telemetry_shutdown_timeout = options.fetch(:telemetry_shutdown_timeout) { ENV.fetch("FLIPPER_CLOUD_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f }
# Telemetry setup.
@telemetry_logger = options.fetch(:telemetry_logger) {
if ENV["FLIPPER_CLOUD_TELEMETRY_LOGGER"] == "0"
Logger.new("/dev/null")
else
Logger.new(STDOUT)
end
}
@telemetry_interval = options.fetch(:telemetry_interval) {
ENV.fetch("FLIPPER_CLOUD_TELEMETRY_INTERVAL", 60).to_f
}
@telemetry_shutdown_timeout = options.fetch(:telemetry_shutdown_timeout) {
ENV.fetch("FLIPPER_CLOUD_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f
}
# Needs to be after url and other telemetry config assignments.
@telemetry = options.fetch(:telemetry) { Telemetry.instance_for(self) }
enforce_minimum(:telemetry_interval, 10)
enforce_minimum(:telemetry_shutdown_timeout, 0)

# This is alpha. Don't use this unless you are me. And you are not me.
cloud_instrument = options.fetch(:cloud_instrument) { ENV["FLIPPER_CLOUD_INSTRUMENT"] == "1" }
provided_instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
cloud_instrument = options.fetch(:cloud_instrument) {
ENV["FLIPPER_CLOUD_INSTRUMENT"] == "1"
}
@instrumenter = if cloud_instrument
Instrumenter.new(telemetry: telemetry, instrumenter: instrumenter)
Telemetry::Instrumenter.new(self, provided_instrumenter)
else
instrumenter
provided_instrumenter
end
end

Expand All @@ -137,9 +172,7 @@ def adapter(&block)
end
end

# Public: Set url for the http adapter.
attr_writer :url

# Public: Force a sync.
def sync
Flipper::Adapters::Sync::Synchronizer.new(local_adapter, http_adapter, {
instrumenter: instrumenter,
Expand Down Expand Up @@ -190,6 +223,15 @@ def http_adapter
},
})
end

# Enforce minimum interval for tasks that run on a timer.
def enforce_minimum(name, minimum)
provided = send(name)
if provided < minimum
warn "Flipper::Cloud##{name} must be at least #{minimum} seconds but was #{provided}. Using #{minimum} seconds."
send("#{name}=", minimum)
end
end
end
end
end
26 changes: 0 additions & 26 deletions lib/flipper/cloud/instrumenter.rb

This file was deleted.

43 changes: 28 additions & 15 deletions lib/flipper/cloud/telemetry.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "json"
require "forwardable"
require "concurrent/timer_task"
require "concurrent/executor/fixed_thread_pool"
require "flipper/cloud/telemetry/metric"
Expand All @@ -7,15 +8,20 @@
module Flipper
module Cloud
class Telemetry
SCHEMA_VERSION = "V1".freeze
extend Forwardable

attr_reader :cloud_configuration, :metric_storage
SCHEMA_VERSION = "V1".freeze

# Internal: Map of instances of telemetry.
def self.instances
@instances ||= Concurrent::Map.new
end
private_class_method :instances

def self.reset
instances.each { |_,instance| instance.stop }.clear
end

# Internal: Fetch an instance of telemetry once per process per url +
# token (aka cloud endpoint). Should only ever be one instance unless you
# are doing some funky stuff.
Expand All @@ -25,49 +31,56 @@ def self.instance_for(cloud_configuration)
end
end

attr_reader :cloud_configuration, :metric_storage

def_delegator :@cloud_configuration, :telemetry_logger, :logger

def initialize(cloud_configuration)
@pid = $$
@logger = Logger.new(STDOUT)
@cloud_configuration = cloud_configuration
start

at_exit { stop }
end

# Records enabled metrics based on feature key and resulting value.
def record_enabled(feature_key, result)
# Public: Records telemetry events based on active support notifications.
def record(name, payload)
return unless name == Flipper::Feature::InstrumentationName
return unless payload[:operation] == :enabled?
detect_forking
@metric_storage&.increment Metric.new(feature_key, result)

metric = Metric.new(payload[:feature_name].to_s.freeze, payload[:result])
@metric_storage.increment metric
end

def start
@logger.info "pid=#{@pid} name=flipper_telemetry action=start"
logger.info "pid=#{@pid} name=flipper_telemetry action=start"
@metric_storage = MetricStorage.new
@pool = Concurrent::FixedThreadPool.new(5, pool_options)
@timer = Concurrent::TimerTask.execute(timer_options) { post_to_pool }
end

# Shuts down all the tasks and tries to flush any remaining info to Cloud.
def stop
@logger.info "pid=#{@pid} name=flipper_telemetry action=stop"
logger.info "pid=#{@pid} name=flipper_telemetry action=stop"

if @timer
@logger.info "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_start"
logger.debug "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_start"
@timer.shutdown
# no need to wait long for timer, all it does is drain in memory metric
# storage and post to the pool of background workers
timer_termination_result = @timer.wait_for_termination(1)
@timer.kill unless timer_termination_result
@logger.info "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_end result=#{timer_termination_result}"
logger.debug "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_end result=#{timer_termination_result}"
end

if @pool
post_to_pool # one last drain
@logger.info "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_start"
logger.debug "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_start"
@pool.shutdown
pool_termination_result = @pool.wait_for_termination(@cloud_configuration.telemetry_shutdown_timeout)
@pool.kill unless pool_termination_result
@logger.info "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_end result=#{pool_termination_result}"
logger.debug "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_end result=#{pool_termination_result}"
end
end

Expand All @@ -80,22 +93,22 @@ def restart

def detect_forking
if @pid != $$
@logger.info "pid=#{@pid} name=flipper_telemetry action=fork_detected pid_was#{@pid} pid_is=#{$$}"
logger.info "pid=#{@pid} name=flipper_telemetry action=fork_detected pid_was#{@pid} pid_is=#{$$}"
restart
@pid = $$
end
end

def post_to_pool
@logger.info "pid=#{@pid} name=flipper_telemetry action=post_to_pool"
logger.debug "pid=#{@pid} name=flipper_telemetry action=post_to_pool"
drained = @metric_storage.drain
return if drained.empty?
@pool.post { post_to_cloud(drained) }
end

def post_to_cloud(drained)
return if drained.empty?
@logger.info "pid=#{@pid} name=flipper_telemetry action=post_to_cloud"
logger.debug "pid=#{@pid} name=flipper_telemetry action=post_to_cloud"

enabled_metrics = drained.inject([]) do |array, (metric, value)|
array << {
Expand Down
27 changes: 27 additions & 0 deletions lib/flipper/cloud/telemetry/instrumenter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
require "delegate"

module Flipper
module Cloud
class Telemetry
class Instrumenter < SimpleDelegator
def initialize(cloud_configuration, instrumenter)
super cloud_configuration
@instrumenter = instrumenter
end

def instrument(name, payload = {}, &block)
return_value = @instrumenter.instrument(name, payload, &block)
config.telemetry.record(name, payload)
return_value
end

private

# Flipper::Cloud::Configuration instance passed to this instrumenter.
def config
__getobj__
end
end
end
end
end
12 changes: 6 additions & 6 deletions spec/flipper/cloud/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@
end

it "can set sync_interval" do
instance = described_class.new(required_options.merge(sync_interval: 1))
expect(instance.sync_interval).to eq(1)
instance = described_class.new(required_options.merge(sync_interval: 15))
expect(instance.sync_interval).to eq(15)
end

it "can set sync_interval from ENV var" do
with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "5" do
with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "15" do
instance = described_class.new(required_options.reject { |k, v| k == :sync_interval })
expect(instance.sync_interval).to eq(5)
expect(instance.sync_interval).to eq(15)
end
end

it "passes sync_interval into sync adapter" do
# The initial sync of http to local invokes this web request.
stub_request(:get, /flippercloud\.io/).to_return(status: 200, body: "{}")

instance = described_class.new(required_options.merge(sync_interval: 1))
instance = described_class.new(required_options.merge(sync_interval: 20))
poller = instance.send(:poller)
expect(poller.interval).to eq(1)
expect(poller.interval).to eq(20)
end

it "can set telemetry_interval" do
Expand Down
Loading

0 comments on commit d8e85fa

Please sign in to comment.