diff --git a/lib/flipper/cloud/configuration.rb b/lib/flipper/cloud/configuration.rb index 70c087b4f..769f203de 100644 --- a/lib/flipper/cloud/configuration.rb +++ b/lib/flipper/cloud/configuration.rb @@ -1,3 +1,4 @@ +require "logger" require "socket" require "flipper/adapters/http" require "flipper/adapters/poll" @@ -5,8 +6,8 @@ require "flipper/adapters/memory" require "flipper/adapters/dual_write" require "flipper/adapters/sync/synchronizer" -require "flipper/cloud/instrumenter" require "flipper/cloud/telemetry" +require "flipper/cloud/telemetry/instrumenter" module Flipper module Cloud @@ -19,8 +20,6 @@ class Configuration DEFAULT_URL = "https://www.flippercloud.io/adapter".freeze - MIN_TELEMETRY_INTERVAL = 10 - # Public: The token corresponding to an environment on flippercloud.io. attr_accessor :token @@ -44,6 +43,10 @@ class Configuration # configuration.debug_output = STDOUT attr_accessor :debug_output + # Public: Set the url for http adapter. Really should only be customized + # by me and you are not me. + attr_accessor :url + # Public: Instrumenter to use for the Flipper instance returned by # Flipper::Cloud.new (default: Flipper::Instrumenters::Noop). # @@ -72,12 +75,15 @@ class Configuration # Public: The telemetry instance to use for tracking feature usage. attr_accessor :telemetry + # Public: The telemetry logger to use for debugging telemetry inner workings. + attr_accessor :telemetry_logger + # Public: The Integer for Float number of seconds between submission of - # telemetry to Cloud. + # telemetry to Cloud (default: 60, minimum: 10). attr_accessor :telemetry_interval # Public: The Integer or Float number of seconds to wait for telemetry - # to shutdown. + # to shutdown (default: 5). attr_accessor :telemetry_shutdown_timeout def initialize(options = {}) @@ -87,34 +93,63 @@ def initialize(options = {}) raise ArgumentError, "Flipper::Cloud token is missing. Please set FLIPPER_CLOUD_TOKEN or provide the token (e.g. Flipper::Cloud.new(token: 'token'))." end - @read_timeout = options.fetch(:read_timeout) { ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f } - @open_timeout = options.fetch(:open_timeout) { ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f } - @write_timeout = options.fetch(:write_timeout) { ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f } - - @sync_interval = options.fetch(:sync_interval) { ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f } - @sync_secret = options.fetch(:sync_secret) { ENV["FLIPPER_CLOUD_SYNC_SECRET"] } - - @local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new } + # Http related setup. + @url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) } @debug_output = options[:debug_output] + @read_timeout = options.fetch(:read_timeout) { + ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f + } + @open_timeout = options.fetch(:open_timeout) { + ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f + } + @write_timeout = options.fetch(:write_timeout) { + ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f + } + enforce_minimum(:read_timeout, 0.1) + enforce_minimum(:open_timeout, 0.1) + enforce_minimum(:write_timeout, 0.1) + + # Sync setup. + @sync_interval = options.fetch(:sync_interval) { + ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f + } + @sync_secret = options.fetch(:sync_secret) { + ENV["FLIPPER_CLOUD_SYNC_SECRET"] + } + enforce_minimum(:sync_interval, 10) + + # Adapter setup. + @local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new } @adapter_block = ->(adapter) { adapter } - self.url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) } - instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) - - @telemetry_interval = options.fetch(:telemetry_interval) { ENV.fetch("FLIPPER_CLOUD_TELEMETRY_INTERVAL", 60).to_f } - if @telemetry_interval < MIN_TELEMETRY_INTERVAL - @telemetry_interval = MIN_TELEMETRY_INTERVAL - end - @telemetry_shutdown_timeout = options.fetch(:telemetry_shutdown_timeout) { ENV.fetch("FLIPPER_CLOUD_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f } + # Telemetry setup. + @telemetry_logger = options.fetch(:telemetry_logger) { + if ENV["FLIPPER_CLOUD_TELEMETRY_LOGGER"] == "0" + Logger.new("/dev/null") + else + Logger.new(STDOUT) + end + } + @telemetry_interval = options.fetch(:telemetry_interval) { + ENV.fetch("FLIPPER_CLOUD_TELEMETRY_INTERVAL", 60).to_f + } + @telemetry_shutdown_timeout = options.fetch(:telemetry_shutdown_timeout) { + ENV.fetch("FLIPPER_CLOUD_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f + } # Needs to be after url and other telemetry config assignments. @telemetry = options.fetch(:telemetry) { Telemetry.instance_for(self) } + enforce_minimum(:telemetry_interval, 10) + enforce_minimum(:telemetry_shutdown_timeout, 0) # This is alpha. Don't use this unless you are me. And you are not me. - cloud_instrument = options.fetch(:cloud_instrument) { ENV["FLIPPER_CLOUD_INSTRUMENT"] == "1" } + provided_instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) + cloud_instrument = options.fetch(:cloud_instrument) { + ENV["FLIPPER_CLOUD_INSTRUMENT"] == "1" + } @instrumenter = if cloud_instrument - Instrumenter.new(telemetry: telemetry, instrumenter: instrumenter) + Telemetry::Instrumenter.new(self, provided_instrumenter) else - instrumenter + provided_instrumenter end end @@ -137,9 +172,7 @@ def adapter(&block) end end - # Public: Set url for the http adapter. - attr_writer :url - + # Public: Force a sync. def sync Flipper::Adapters::Sync::Synchronizer.new(local_adapter, http_adapter, { instrumenter: instrumenter, @@ -190,6 +223,15 @@ def http_adapter }, }) end + + # Enforce minimum interval for tasks that run on a timer. + def enforce_minimum(name, minimum) + provided = send(name) + if provided < minimum + warn "Flipper::Cloud##{name} must be at least #{minimum} seconds but was #{provided}. Using #{minimum} seconds." + send("#{name}=", minimum) + end + end end end end diff --git a/lib/flipper/cloud/instrumenter.rb b/lib/flipper/cloud/instrumenter.rb deleted file mode 100644 index 05d532dbd..000000000 --- a/lib/flipper/cloud/instrumenter.rb +++ /dev/null @@ -1,26 +0,0 @@ -require "delegate" -require "flipper/instrumenters/noop" - -module Flipper - module Cloud - class Instrumenter < SimpleDelegator - def initialize(options = {}) - @telemetry = options.fetch(:telemetry) - @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) - super @instrumenter - end - - def instrument(name, payload = {}, &block) - begin - return_value = @instrumenter.instrument(name, payload, &block) - - if name == Flipper::Feature::InstrumentationName && payload[:operation] == :enabled? - @telemetry.record_enabled(payload[:feature_name].to_s, payload[:result]) - end - ensure - return_value - end - end - end - end -end diff --git a/lib/flipper/cloud/telemetry.rb b/lib/flipper/cloud/telemetry.rb index c511132d4..e048a7902 100644 --- a/lib/flipper/cloud/telemetry.rb +++ b/lib/flipper/cloud/telemetry.rb @@ -1,4 +1,5 @@ require "json" +require "forwardable" require "concurrent/timer_task" require "concurrent/executor/fixed_thread_pool" require "flipper/cloud/telemetry/metric" @@ -7,15 +8,20 @@ module Flipper module Cloud class Telemetry - SCHEMA_VERSION = "V1".freeze + extend Forwardable - attr_reader :cloud_configuration, :metric_storage + SCHEMA_VERSION = "V1".freeze + # Internal: Map of instances of telemetry. def self.instances @instances ||= Concurrent::Map.new end private_class_method :instances + def self.reset + instances.each { |_,instance| instance.stop }.clear + end + # Internal: Fetch an instance of telemetry once per process per url + # token (aka cloud endpoint). Should only ever be one instance unless you # are doing some funky stuff. @@ -25,23 +31,30 @@ def self.instance_for(cloud_configuration) end end + attr_reader :cloud_configuration, :metric_storage + + def_delegator :@cloud_configuration, :telemetry_logger, :logger + def initialize(cloud_configuration) @pid = $$ - @logger = Logger.new(STDOUT) @cloud_configuration = cloud_configuration start at_exit { stop } end - # Records enabled metrics based on feature key and resulting value. - def record_enabled(feature_key, result) + # Public: Records telemetry events based on active support notifications. + def record(name, payload) + return unless name == Flipper::Feature::InstrumentationName + return unless payload[:operation] == :enabled? detect_forking - @metric_storage&.increment Metric.new(feature_key, result) + + metric = Metric.new(payload[:feature_name].to_s.freeze, payload[:result]) + @metric_storage.increment metric end def start - @logger.info "pid=#{@pid} name=flipper_telemetry action=start" + logger.info "pid=#{@pid} name=flipper_telemetry action=start" @metric_storage = MetricStorage.new @pool = Concurrent::FixedThreadPool.new(5, pool_options) @timer = Concurrent::TimerTask.execute(timer_options) { post_to_pool } @@ -49,25 +62,25 @@ def start # Shuts down all the tasks and tries to flush any remaining info to Cloud. def stop - @logger.info "pid=#{@pid} name=flipper_telemetry action=stop" + logger.info "pid=#{@pid} name=flipper_telemetry action=stop" if @timer - @logger.info "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_start" + logger.debug "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_start" @timer.shutdown # no need to wait long for timer, all it does is drain in memory metric # storage and post to the pool of background workers timer_termination_result = @timer.wait_for_termination(1) @timer.kill unless timer_termination_result - @logger.info "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_end result=#{timer_termination_result}" + logger.debug "pid=#{@pid} name=flipper_telemetry action=timer_shutdown_end result=#{timer_termination_result}" end if @pool post_to_pool # one last drain - @logger.info "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_start" + logger.debug "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_start" @pool.shutdown pool_termination_result = @pool.wait_for_termination(@cloud_configuration.telemetry_shutdown_timeout) @pool.kill unless pool_termination_result - @logger.info "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_end result=#{pool_termination_result}" + logger.debug "pid=#{@pid} name=flipper_telemetry action=pool_shutdown_end result=#{pool_termination_result}" end end @@ -80,14 +93,14 @@ def restart def detect_forking if @pid != $$ - @logger.info "pid=#{@pid} name=flipper_telemetry action=fork_detected pid_was#{@pid} pid_is=#{$$}" + logger.info "pid=#{@pid} name=flipper_telemetry action=fork_detected pid_was#{@pid} pid_is=#{$$}" restart @pid = $$ end end def post_to_pool - @logger.info "pid=#{@pid} name=flipper_telemetry action=post_to_pool" + logger.debug "pid=#{@pid} name=flipper_telemetry action=post_to_pool" drained = @metric_storage.drain return if drained.empty? @pool.post { post_to_cloud(drained) } @@ -95,7 +108,7 @@ def post_to_pool def post_to_cloud(drained) return if drained.empty? - @logger.info "pid=#{@pid} name=flipper_telemetry action=post_to_cloud" + logger.debug "pid=#{@pid} name=flipper_telemetry action=post_to_cloud" enabled_metrics = drained.inject([]) do |array, (metric, value)| array << { diff --git a/lib/flipper/cloud/telemetry/instrumenter.rb b/lib/flipper/cloud/telemetry/instrumenter.rb new file mode 100644 index 000000000..1842cb2f2 --- /dev/null +++ b/lib/flipper/cloud/telemetry/instrumenter.rb @@ -0,0 +1,27 @@ +require "delegate" + +module Flipper + module Cloud + class Telemetry + class Instrumenter < SimpleDelegator + def initialize(cloud_configuration, instrumenter) + super cloud_configuration + @instrumenter = instrumenter + end + + def instrument(name, payload = {}, &block) + return_value = @instrumenter.instrument(name, payload, &block) + config.telemetry.record(name, payload) + return_value + end + + private + + # Flipper::Cloud::Configuration instance passed to this instrumenter. + def config + __getobj__ + end + end + end + end +end diff --git a/spec/flipper/cloud/configuration_spec.rb b/spec/flipper/cloud/configuration_spec.rb index fcbd42815..d60a40f86 100644 --- a/spec/flipper/cloud/configuration_spec.rb +++ b/spec/flipper/cloud/configuration_spec.rb @@ -61,14 +61,14 @@ end it "can set sync_interval" do - instance = described_class.new(required_options.merge(sync_interval: 1)) - expect(instance.sync_interval).to eq(1) + instance = described_class.new(required_options.merge(sync_interval: 15)) + expect(instance.sync_interval).to eq(15) end it "can set sync_interval from ENV var" do - with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "5" do + with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "15" do instance = described_class.new(required_options.reject { |k, v| k == :sync_interval }) - expect(instance.sync_interval).to eq(5) + expect(instance.sync_interval).to eq(15) end end @@ -76,9 +76,9 @@ # The initial sync of http to local invokes this web request. stub_request(:get, /flippercloud\.io/).to_return(status: 200, body: "{}") - instance = described_class.new(required_options.merge(sync_interval: 1)) + instance = described_class.new(required_options.merge(sync_interval: 20)) poller = instance.send(:poller) - expect(poller.interval).to eq(1) + expect(poller.interval).to eq(20) end it "can set telemetry_interval" do diff --git a/spec/flipper/cloud/telemetry_spec.rb b/spec/flipper/cloud/telemetry_spec.rb index 835a77d22..7c69f3a15 100644 --- a/spec/flipper/cloud/telemetry_spec.rb +++ b/spec/flipper/cloud/telemetry_spec.rb @@ -7,16 +7,40 @@ stub_request(:post, "https://www.flippercloud.io/adapter/telemetry").to_return(status: 200) begin - telemetry = described_class.new(Flipper::Cloud::Configuration.new(token: "test")) - telemetry.record_enabled(:foo, true) - telemetry.record_enabled(:foo, true) - telemetry.record_enabled(:bar, true) - telemetry.record_enabled(:baz, true) - telemetry.record_enabled(:foo, false) + config = Flipper::Cloud::Configuration.new(token: "test") + telemetry = described_class.new(config) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :bar, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :baz, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: false, + }) + drained = telemetry.metric_storage.drain - foo_true_sum = drained.keys.select { |metric| metric.key == :foo }.select { |metric| metric.result }.map { |metric| drained[metric] }.sum + + foo_true_sum = drained.keys.select { |metric| metric.key == "foo" }.select { |metric| metric.result }.map { |metric| drained[metric] }.sum expect(foo_true_sum).to be(2) - foo_false_sum = drained.keys.select { |metric| metric.key == :foo }.select { |metric| !metric.result }.map { |metric| drained[metric] }.sum + + foo_false_sum = drained.keys.select { |metric| metric.key == "foo" }.select { |metric| !metric.result }.map { |metric| drained[metric] }.sum expect(foo_false_sum).to be(1) ensure telemetry.stop diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d765b08e5..b532de224 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,8 +20,12 @@ Dir[FlipperRoot.join('spec/support/**/*.rb')].sort.each { |f| require f } +# Disable telemetry logging in specs. +ENV["FLIPPER_CLOUD_TELEMETRY_LOGGER"] = "0" + RSpec.configure do |config| config.before(:example) do + Flipper::Cloud::Telemetry.reset if defined?(Flipper::Cloud::Telemetry) Flipper::Poller.reset if defined?(Flipper::Poller) Flipper.unregister_groups Flipper.configuration = nil