diff --git a/examples/cloud/backoff_policy.rb b/examples/cloud/backoff_policy.rb new file mode 100644 index 000000000..5f4fb62bb --- /dev/null +++ b/examples/cloud/backoff_policy.rb @@ -0,0 +1,13 @@ +# Just a simple example that shows how the backoff policy works. +require 'bundler/setup' +require 'flipper/cloud/telemetry/backoff_policy' + +intervals = [] +policy = Flipper::Cloud::Telemetry::BackoffPolicy.new + +10.times do |n| + intervals << policy.next_interval +end + +pp intervals.map { |i| i.round(2) } +puts "Total: #{intervals.sum.round(2)}ms (#{(intervals.sum/1_000.0).round(2)} sec)" diff --git a/examples/cloud/cloud_setup.rb b/examples/cloud/cloud_setup.rb index 28d7f6883..f252f4415 100644 --- a/examples/cloud/cloud_setup.rb +++ b/examples/cloud/cloud_setup.rb @@ -3,9 +3,13 @@ exit end -suffix_rails = ENV["RAILS_VERSION"].split(".").take(2).join -suffix_ruby = RUBY_VERSION.split(".").take(2).join -matrix_key = "FLIPPER_CLOUD_TOKEN_#{suffix_ruby}_#{suffix_rails}" +matrix_key = if ENV["CI"] + suffix_rails = ENV["RAILS_VERSION"].split(".").take(2).join + suffix_ruby = RUBY_VERSION.split(".").take(2).join + "FLIPPER_CLOUD_TOKEN_#{suffix_ruby}_#{suffix_rails}" +else + "FLIPPER_CLOUD_TOKEN" +end if matrix_token = ENV[matrix_key] puts "Using #{matrix_key} for FLIPPER_CLOUD_TOKEN" diff --git a/examples/cloud/forked.rb b/examples/cloud/forked.rb index 329d2ce5b..2409267de 100644 --- a/examples/cloud/forked.rb +++ b/examples/cloud/forked.rb @@ -5,11 +5,16 @@ require 'bundler/setup' require 'flipper/cloud' -pids = 5.times.map do |n| +puts Process.pid + +# Make a call in the parent process so we can detect forking. +Flipper.enabled?(:stats) + +pids = 2.times.map do |n| fork { # Check every second to see if the feature is enabled threads = [] - 5.times do + 2.times do threads << Thread.new do loop do sleep rand diff --git a/examples/cloud/threaded.rb b/examples/cloud/threaded.rb index 04e72ac3d..02f105e58 100644 --- a/examples/cloud/threaded.rb +++ b/examples/cloud/threaded.rb @@ -4,33 +4,30 @@ require_relative "./cloud_setup" require 'bundler/setup' require 'flipper/cloud' -require "active_support/notifications" -require "active_support/isolated_execution_state" -ActiveSupport::Notifications.subscribe(/poller\.flipper/) do |*args| - p args: args -end +puts Process.pid Flipper.configure do |config| config.default { - Flipper::Cloud.new(local_adapter: config.adapter, instrumenter: ActiveSupport::Notifications) + Flipper::Cloud.new( + local_adapter: config.adapter, + debug_output: STDOUT, + ) } end +# You might want to do this at some point to see different results: +# Flipper.enable(:search) +# Flipper.disable(:stats) + # Check every second to see if the feature is enabled -threads = [] -10.times do - threads << Thread.new do +5.times.map { |i| + Thread.new { loop do sleep rand - if Flipper[:stats].enabled? - puts "#{Time.now.to_i} Enabled!" - else - puts "#{Time.now.to_i} Disabled!" - end + Flipper.enabled?(:stats) + Flipper.enabled?(:search) end - end -end - -threads.map(&:join) + } +}.each(&:join) diff --git a/flipper.gemspec b/flipper.gemspec index ca48c66a8..0ab8a62f8 100644 --- a/flipper.gemspec +++ b/flipper.gemspec @@ -35,5 +35,4 @@ Gem::Specification.new do |gem| gem.metadata = Flipper::METADATA gem.add_dependency 'concurrent-ruby', '< 2' - gem.add_dependency 'brow', '~> 0.4.1' end diff --git a/lib/flipper/adapters/active_record.rb b/lib/flipper/adapters/active_record.rb index d4932870f..33ef56dbc 100644 --- a/lib/flipper/adapters/active_record.rb +++ b/lib/flipper/adapters/active_record.rb @@ -221,7 +221,7 @@ def set(feature, gate, thing, options = {}) @gate_class.create! do |g| g.feature_key = feature.key g.key = gate.key - g.value = json_feature ? JSON.dump(thing.value) : thing.value.to_s + g.value = json_feature ? Typecast.to_json(thing.value) : thing.value.to_s end rescue ::ActiveRecord::RecordNotUnique # assume this happened concurrently with the same thing and its fine @@ -263,7 +263,7 @@ def result_for_gates(feature, gates) end when :json if row = gates.detect { |key, value| !key.nil? && key.to_sym == gate.key } - JSON.parse(row.last) + Typecast.from_json(row.last) end when :set gates.select { |key, value| !key.nil? && key.to_sym == gate.key }.map(&:last).to_set diff --git a/lib/flipper/adapters/http.rb b/lib/flipper/adapters/http.rb index 1aeefbb9d..cf190b310 100644 --- a/lib/flipper/adapters/http.rb +++ b/lib/flipper/adapters/http.rb @@ -27,7 +27,7 @@ def initialize(options = {}) def get(feature) response = @client.get("/features/#{feature.key}") if response.is_a?(Net::HTTPOK) - parsed_response = JSON.parse(response.body) + parsed_response = Typecast.from_json(response.body) result_for_feature(feature, parsed_response.fetch('gates')) elsif response.is_a?(Net::HTTPNotFound) default_config @@ -41,7 +41,7 @@ def get_multi(features) response = @client.get("/features?keys=#{csv_keys}&exclude_gate_names=true") raise Error, response unless response.is_a?(Net::HTTPOK) - parsed_response = JSON.parse(response.body) + parsed_response = Typecast.from_json(response.body) parsed_features = parsed_response.fetch('features') gates_by_key = parsed_features.each_with_object({}) do |parsed_feature, hash| hash[parsed_feature['key']] = parsed_feature['gates'] @@ -59,7 +59,7 @@ def get_all response = @client.get("/features?exclude_gate_names=true") raise Error, response unless response.is_a?(Net::HTTPOK) - parsed_response = JSON.parse(response.body) + parsed_response = Typecast.from_json(response.body) parsed_features = parsed_response.fetch('features') gates_by_key = parsed_features.each_with_object({}) do |parsed_feature, hash| hash[parsed_feature['key']] = parsed_feature['gates'] @@ -78,7 +78,7 @@ def features response = @client.get('/features?exclude_gate_names=true') raise Error, response unless response.is_a?(Net::HTTPOK) - parsed_response = JSON.parse(response.body) + parsed_response = Typecast.from_json(response.body) parsed_response['features'].map { |feature| feature['key'] }.to_set end diff --git a/lib/flipper/adapters/http/client.rb b/lib/flipper/adapters/http/client.rb index e1bee6656..c1ecc4bde 100644 --- a/lib/flipper/adapters/http/client.rb +++ b/lib/flipper/adapters/http/client.rb @@ -30,6 +30,10 @@ def initialize(options = {}) @debug_output = options[:debug_output] end + def add_header(key, value) + @headers[key] = value + end + def get(path) perform Net::HTTP::Get, path, @headers end @@ -77,13 +81,13 @@ def build_http(uri) def build_request(http_method, uri, headers, options) request_headers = { - "Client-Language" => "ruby", - "Client-Language-Version" => "#{RUBY_VERSION} p#{RUBY_PATCHLEVEL} (#{RUBY_RELEASE_DATE})", - "Client-Platform" => RUBY_PLATFORM, - "Client-Engine" => defined?(RUBY_ENGINE) ? RUBY_ENGINE : "", - "Client-Pid" => Process.pid.to_s, - "Client-Thread" => Thread.current.object_id.to_s, - "Client-Hostname" => Socket.gethostname, + client_language: "ruby", + client_language_version: "#{RUBY_VERSION} p#{RUBY_PATCHLEVEL} (#{RUBY_RELEASE_DATE})", + client_platform: RUBY_PLATFORM, + client_engine: defined?(RUBY_ENGINE) ? RUBY_ENGINE : "", + client_pid: Process.pid.to_s, + client_thread: Thread.current.object_id.to_s, + client_hostname: Socket.gethostname, }.merge(headers) body = options[:body] diff --git a/lib/flipper/adapters/http/error.rb b/lib/flipper/adapters/http/error.rb index 925c6859e..7b4758635 100644 --- a/lib/flipper/adapters/http/error.rb +++ b/lib/flipper/adapters/http/error.rb @@ -11,7 +11,7 @@ def initialize(response) message = "Failed with status: #{response.code}" begin - data = JSON.parse(response.body) + data = Typecast.from_json(response.body) if error_message = data["message"] message << "\n\n#{data["message"]}" diff --git a/lib/flipper/adapters/mongo.rb b/lib/flipper/adapters/mongo.rb index 08976e695..e91dbc91f 100644 --- a/lib/flipper/adapters/mongo.rb +++ b/lib/flipper/adapters/mongo.rb @@ -80,7 +80,7 @@ def enable(feature, gate, thing) } when :json update feature.key, '$set' => { - gate.key.to_s => JSON.dump(thing.value), + gate.key.to_s => Typecast.to_json(thing.value), } else unsupported_data_type gate.data_type @@ -175,7 +175,7 @@ def result_for_feature(feature, doc) doc.fetch(gate.key.to_s) { Set.new }.to_set when :json value = doc[gate.key.to_s] - JSON.parse(value) if value + Typecast.from_json(value) else unsupported_data_type gate.data_type end diff --git a/lib/flipper/adapters/pstore.rb b/lib/flipper/adapters/pstore.rb index 52120eb85..b9a5534f5 100644 --- a/lib/flipper/adapters/pstore.rb +++ b/lib/flipper/adapters/pstore.rb @@ -85,7 +85,7 @@ def enable(feature, gate, thing) when :set set_add key(feature, gate), thing.value.to_s when :json - write key(feature, gate), JSON.dump(thing.value) + write key(feature, gate), Typecast.to_json(thing.value) else raise "#{gate} is not supported by this adapter yet" end @@ -161,7 +161,7 @@ def result_for_feature(feature) set_members key when :json value = read(key) - JSON.parse(value) if value + Typecast.from_json(value) else raise "#{gate} is not supported by this adapter yet" end diff --git a/lib/flipper/adapters/redis.rb b/lib/flipper/adapters/redis.rb index 1edf6267f..e99b7c319 100644 --- a/lib/flipper/adapters/redis.rb +++ b/lib/flipper/adapters/redis.rb @@ -94,7 +94,7 @@ def enable(feature, gate, thing) when :set @client.hset feature_key, to_field(gate, thing), 1 when :json - @client.hset feature_key, gate.key, JSON.dump(thing.value) + @client.hset feature_key, gate.key, Typecast.to_json(thing.value) else unsupported_data_type gate.data_type end @@ -174,7 +174,7 @@ def result_for_feature(feature, doc) fields_to_gate_value fields, gate when :json value = doc[gate.key.to_s] - JSON.parse(value) if value + Typecast.from_json(value) else unsupported_data_type gate.data_type end diff --git a/lib/flipper/adapters/sequel.rb b/lib/flipper/adapters/sequel.rb index 48be789b4..8058db8fd 100644 --- a/lib/flipper/adapters/sequel.rb +++ b/lib/flipper/adapters/sequel.rb @@ -206,7 +206,7 @@ def gate_attrs(feature, gate, thing, json: false) { feature_key: feature.key.to_s, key: gate.key.to_s, - value: json ? JSON.dump(thing.value) : thing.value.to_s, + value: json ? Typecast.to_json(thing.value) : thing.value.to_s, } end @@ -227,7 +227,7 @@ def result_for_feature(feature, db_gates) db_gates.select { |db_gate| db_gate.key == gate.key.to_s }.map(&:value).to_set when :json if detected_db_gate = db_gates.detect { |db_gate| db_gate.key == gate.key.to_s } - JSON.parse(detected_db_gate.value) + Typecast.from_json(detected_db_gate.value) end else unsupported_data_type gate.data_type diff --git a/lib/flipper/api/action.rb b/lib/flipper/api/action.rb index e913ee688..c604c7a63 100644 --- a/lib/flipper/api/action.rb +++ b/lib/flipper/api/action.rb @@ -116,7 +116,7 @@ def halt(response) def json_response(object, status = 200) header 'content-type', Api::CONTENT_TYPE status(status) - body = JSON.dump(object) + body = Typecast.to_json(object) halt [@code, @headers, [body]] end diff --git a/lib/flipper/api/json_params.rb b/lib/flipper/api/json_params.rb index a8c4334d2..6c8792065 100644 --- a/lib/flipper/api/json_params.rb +++ b/lib/flipper/api/json_params.rb @@ -34,7 +34,7 @@ def call(env) # This method accomplishes similar functionality def update_params(env, data) return if data.empty? - parsed_request_body = JSON.parse(data) + parsed_request_body = Typecast.from_json(data) env["parsed_request_body".freeze] = parsed_request_body parsed_query_string = parse_query(env[QUERY_STRING]) parsed_query_string.merge!(parsed_request_body) diff --git a/lib/flipper/cloud/configuration.rb b/lib/flipper/cloud/configuration.rb index f02ffdc65..707236d22 100644 --- a/lib/flipper/cloud/configuration.rb +++ b/lib/flipper/cloud/configuration.rb @@ -1,3 +1,4 @@ +require "logger" require "socket" require "flipper/adapters/http" require "flipper/adapters/poll" @@ -5,8 +6,9 @@ require "flipper/adapters/memory" require "flipper/adapters/dual_write" require "flipper/adapters/sync/synchronizer" -require "flipper/cloud/instrumenter" -require "brow" +require "flipper/cloud/telemetry" +require "flipper/cloud/telemetry/instrumenter" +require "flipper/cloud/telemetry/submitter" module Flipper module Cloud @@ -19,18 +21,13 @@ class Configuration DEFAULT_URL = "https://www.flippercloud.io/adapter".freeze - # Private: Keeps track of brow instances so they can be shared across - # threads. - def self.brow_instances - @brow_instances ||= Concurrent::Map.new - end - # Public: The token corresponding to an environment on flippercloud.io. attr_accessor :token # Public: The url for http adapter. Really should only be customized for - # development work. Feel free to forget you ever saw this. - attr_reader :url + # development work if you are me and you are not me. Feel free to + # forget you ever saw this. + attr_accessor :url # Public: net/http read timeout for all http requests (default: 5). attr_accessor :read_timeout @@ -73,6 +70,23 @@ def self.brow_instances # occur or not. attr_accessor :sync_secret + # Public: The telemetry instance to use for tracking feature usage. + attr_accessor :telemetry + + # Public: The telemetry submitter to use for sending telemetry to Cloud. + attr_accessor :telemetry_submitter + + # Public: The telemetry logger to use for debugging telemetry inner workings. + attr_accessor :telemetry_logger + + # Public: The Integer for Float number of seconds between submission of + # telemetry to Cloud (default: 60, minimum: 10). + attr_reader :telemetry_interval + + # Public: The Integer or Float number of seconds to wait for telemetry + # to shutdown (default: 5). + attr_accessor :telemetry_shutdown_timeout + def initialize(options = {}) @token = options.fetch(:token) { ENV["FLIPPER_CLOUD_TOKEN"] } @@ -80,27 +94,76 @@ def initialize(options = {}) raise ArgumentError, "Flipper::Cloud token is missing. Please set FLIPPER_CLOUD_TOKEN or provide the token (e.g. Flipper::Cloud.new(token: 'token'))." end - @read_timeout = options.fetch(:read_timeout) { ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f } - @open_timeout = options.fetch(:open_timeout) { ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f } - @write_timeout = options.fetch(:write_timeout) { ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f } - @sync_interval = options.fetch(:sync_interval) { ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f } - @sync_secret = options.fetch(:sync_secret) { ENV["FLIPPER_CLOUD_SYNC_SECRET"] } - @local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new } + # Http related setup. + @url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) } @debug_output = options[:debug_output] + @read_timeout = options.fetch(:read_timeout) { + ENV.fetch("FLIPPER_CLOUD_READ_TIMEOUT", 5).to_f + } + @open_timeout = options.fetch(:open_timeout) { + ENV.fetch("FLIPPER_CLOUD_OPEN_TIMEOUT", 5).to_f + } + @write_timeout = options.fetch(:write_timeout) { + ENV.fetch("FLIPPER_CLOUD_WRITE_TIMEOUT", 5).to_f + } + enforce_minimum(:read_timeout, 0.1) + enforce_minimum(:open_timeout, 0.1) + enforce_minimum(:write_timeout, 0.1) + + # Sync setup. + @sync_interval = options.fetch(:sync_interval) { + ENV.fetch("FLIPPER_CLOUD_SYNC_INTERVAL", 10).to_f + } + @sync_secret = options.fetch(:sync_secret) { + ENV["FLIPPER_CLOUD_SYNC_SECRET"] + } + enforce_minimum(:sync_interval, 10) + + # Adapter setup. + @local_adapter = options.fetch(:local_adapter) { Adapters::Memory.new } @adapter_block = ->(adapter) { adapter } - self.url = options.fetch(:url) { ENV.fetch("FLIPPER_CLOUD_URL", DEFAULT_URL) } - instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) + # Telemetry setup. + @telemetry_logger = options.fetch(:telemetry_logger) { + if Flipper::Typecast.to_boolean(ENV["FLIPPER_CLOUD_TELEMETRY_LOGGING"]) + Logger.new(STDOUT) + else + Logger.new("/dev/null") + end + } + self.telemetry_interval = options.fetch(:telemetry_interval) { + ENV.fetch("FLIPPER_CLOUD_TELEMETRY_INTERVAL", 60).to_f + } + @telemetry_shutdown_timeout = options.fetch(:telemetry_shutdown_timeout) { + ENV.fetch("FLIPPER_CLOUD_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f + } + @telemetry_submitter = options.fetch(:telemetry_submitter) { + ->(drained) { Telemetry::Submitter.new(self).call(drained) } + } + # Needs to be after url and other telemetry config assignments. + @telemetry = options.fetch(:telemetry) { Telemetry.instance_for(self) } + enforce_minimum(:telemetry_shutdown_timeout, 0) # This is alpha. Don't use this unless you are me. And you are not me. - cloud_instrument = options.fetch(:cloud_instrument) { ENV["FLIPPER_CLOUD_INSTRUMENT"] == "1" } + instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) + cloud_instrument = options.fetch(:cloud_instrument) { + Flipper::Typecast.to_boolean(ENV["FLIPPER_CLOUD_INSTRUMENT"]) + } @instrumenter = if cloud_instrument - Instrumenter.new(brow: brow, instrumenter: instrumenter) + Telemetry::Instrumenter.new(self, instrumenter) else instrumenter end end + # Public: Change the telemetry interval. + def telemetry_interval=(value) + value = value.to_f + @telemetry_interval = value + enforce_minimum(:telemetry_interval, 10) + value + end + # Public: Read or customize the http adapter. Calling without a block will # perform a read. Calling with a block yields the cloud adapter # for customization. @@ -120,38 +183,25 @@ def adapter(&block) end end - # Public: Set url for the http adapter. - attr_writer :url - + # Public: Force a sync. def sync Flipper::Adapters::Sync::Synchronizer.new(local_adapter, http_adapter, { instrumenter: instrumenter, }).call end - def brow - self.class.brow_instances.compute_if_absent(url + token) do - uri = URI.parse(url) - uri.path = "#{uri.path}/events".squeeze("/") - - Brow::Client.new({ - url: uri.to_s, - headers: { - "Accept" => "application/json", - "Content-Type" => "application/json", - "User-Agent" => "Flipper v#{VERSION} via Brow v#{Brow::VERSION}", - "Flipper-Cloud-Token" => @token, - } - }) - end - end - # Public: The method that will be used to synchronize local adapter with # cloud. (default: :poll, will be :webhook if sync_secret is set). def sync_method sync_secret ? :webhook : :poll end + # Internal: The http client used by the http adapter. Exposed so we can + # use the same client for posting telemetry. + def http_client + http_adapter.client + end + private def app_adapter @@ -184,6 +234,15 @@ def http_adapter }, }) end + + # Enforce minimum interval for tasks that run on a timer. + def enforce_minimum(name, minimum) + provided = send(name) + if provided < minimum + warn "Flipper::Cloud##{name} must be at least #{minimum} seconds but was #{provided}. Using #{minimum} seconds." + send(:instance_variable_set, "@#{name}", minimum) + end + end end end end diff --git a/lib/flipper/cloud/instrumenter.rb b/lib/flipper/cloud/instrumenter.rb deleted file mode 100644 index c3fb76dc6..000000000 --- a/lib/flipper/cloud/instrumenter.rb +++ /dev/null @@ -1,48 +0,0 @@ -require "delegate" -require "flipper/instrumenters/noop" - -module Flipper - module Cloud - class Instrumenter < SimpleDelegator - def initialize(options = {}) - @brow = options.fetch(:brow) - @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) - super @instrumenter - end - - def instrument(name, payload = {}, &block) - result = @instrumenter.instrument(name, payload, &block) - push name, payload - result - end - - private - - def push(name, payload) - return unless name == Flipper::Feature::InstrumentationName - return unless :enabled? == payload[:operation] - - dimensions = { - "feature" => payload[:feature_name].to_s, - "result" => payload[:result].to_s, - } - - if (thing = payload[:thing]) - dimensions["flipper_id"] = thing.value.to_s - end - - if (actors = payload[:actors]) - dimensions["flipper_ids"] = actors.map { |actor| actor.value.to_s } - end - - event = { - type: "enabled", - dimensions: dimensions, - measures: {}, - ts: Time.now.utc, - } - @brow.push event - end - end - end -end diff --git a/lib/flipper/cloud/telemetry.rb b/lib/flipper/cloud/telemetry.rb new file mode 100644 index 000000000..51e9986b5 --- /dev/null +++ b/lib/flipper/cloud/telemetry.rb @@ -0,0 +1,131 @@ +require "forwardable" +require "concurrent/timer_task" +require "concurrent/executor/fixed_thread_pool" +require "flipper/cloud/telemetry/metric" +require "flipper/cloud/telemetry/metric_storage" + +module Flipper + module Cloud + class Telemetry + extend Forwardable + + # Internal: Map of instances of telemetry. + def self.instances + @instances ||= Concurrent::Map.new + end + private_class_method :instances + + def self.reset + instances.each { |_, instance| instance.stop }.clear + end + + # Internal: Fetch an instance of telemetry once per process per url + + # token (aka cloud endpoint). Should only ever be one instance unless you + # are doing some funky stuff. + def self.instance_for(cloud_configuration) + instances.compute_if_absent(cloud_configuration.url + cloud_configuration.token) do + new(cloud_configuration) + end + end + + attr_reader :cloud_configuration, :metric_storage, :pool, :timer + + def_delegator :@cloud_configuration, :telemetry_logger, :logger + + def initialize(cloud_configuration) + @pid = $$ + @cloud_configuration = cloud_configuration + start + at_exit { stop } + end + + # Public: Records telemetry events based on active support notifications. + def record(name, payload) + return unless name == Flipper::Feature::InstrumentationName + return unless payload[:operation] == :enabled? + detect_forking + + metric = Metric.new(payload[:feature_name].to_s.freeze, payload[:result]) + @metric_storage.increment metric + end + + # Start all the tasks and setup new metric storage. + def start + logger.info "name=flipper_telemetry action=start" + + @metric_storage = MetricStorage.new + + @pool = Concurrent::FixedThreadPool.new(2, { + max_queue: 5, + fallback_policy: :discard, + name: "flipper-telemetry-post-to-cloud-pool".freeze, + }) + + @timer = Concurrent::TimerTask.execute({ + execution_interval: @cloud_configuration.telemetry_interval, + name: "flipper-telemetry-post-to-pool-timer".freeze, + }) { post_to_pool } + end + + # Shuts down all the tasks and tries to flush any remaining info to Cloud. + def stop + logger.info "name=flipper_telemetry action=stop" + + if @timer + logger.debug "name=flipper_telemetry action=timer_shutdown_start" + @timer.shutdown + # no need to wait long for timer, all it does is drain in memory metric + # storage and post to the pool of background workers + timer_termination_result = @timer.wait_for_termination(1) + @timer.kill unless timer_termination_result + logger.debug "name=flipper_telemetry action=timer_shutdown_end result=#{timer_termination_result}" + end + + if @pool + post_to_pool # one last drain + logger.debug "name=flipper_telemetry action=pool_shutdown_start" + @pool.shutdown + pool_termination_result = @pool.wait_for_termination(@cloud_configuration.telemetry_shutdown_timeout) + @pool.kill unless pool_termination_result + logger.debug "name=flipper_telemetry action=pool_shutdown_end result=#{pool_termination_result}" + end + end + + def restart + stop + start + end + + private + + def detect_forking + if @pid != $$ + logger.info "name=flipper_telemetry action=fork_detected pid_was#{@pid} pid_is=#{$$}" + restart + @pid = $$ + end + end + + def post_to_pool + logger.debug "name=flipper_telemetry action=post_to_pool" + drained = @metric_storage.drain + return if drained.empty? + @pool.post { post_to_cloud(drained) } + end + + def post_to_cloud(drained) + response, error = @cloud_configuration.telemetry_submitter.call(drained) + # Some of the errors are response code errors which have a response and + # thus may have a telemetry-interval header for us to respect. + response ||= error.response if error.respond_to?(:response) + if response && telemetry_interval = response["telemetry-interval"] + telemetry_interval = telemetry_interval.to_i + @timer.execution_interval = telemetry_interval + @cloud_configuration.telemetry_interval = telemetry_interval + end + rescue => error + logger.debug "name=flipper_telemetry action=post_to_cloud error=#{error.inspect}" + end + end + end +end diff --git a/lib/flipper/cloud/telemetry/backoff_policy.rb b/lib/flipper/cloud/telemetry/backoff_policy.rb new file mode 100644 index 000000000..68349bcce --- /dev/null +++ b/lib/flipper/cloud/telemetry/backoff_policy.rb @@ -0,0 +1,93 @@ +module Flipper + module Cloud + class Telemetry + class BackoffPolicy + # Private: The default minimum timeout between intervals in milliseconds. + MIN_TIMEOUT_MS = 1_000 + + # Private: The default maximum timeout between intervals in milliseconds. + MAX_TIMEOUT_MS = 30_000 + + # Private: The value to multiply the current interval with for each + # retry attempt. + MULTIPLIER = 1.5 + + # Private: The randomization factor to use to create a range around the + # retry interval. + RANDOMIZATION_FACTOR = 0.5 + + # Private + attr_reader :min_timeout_ms, :max_timeout_ms, :multiplier, :randomization_factor + + # Private + attr_reader :attempts + + # Public: Create new instance of backoff policy. + # + # options - The Hash of options. + # :min_timeout_ms - The minimum backoff timeout. + # :max_timeout_ms - The maximum backoff timeout. + # :multiplier - The value to multiply the current interval with for each + # retry attempt. + # :randomization_factor - The randomization factor to use to create a range + # around the retry interval. + def initialize(options = {}) + @min_timeout_ms = options.fetch(:min_timeout_ms) { + ENV.fetch("FLIPPER_BACKOFF_MIN_TIMEOUT_MS", MIN_TIMEOUT_MS).to_i + } + @max_timeout_ms = options.fetch(:max_timeout_ms) { + ENV.fetch("FLIPPER_BACKOFF_MAX_TIMEOUT_MS", MAX_TIMEOUT_MS).to_i + } + @multiplier = options.fetch(:multiplier) { + ENV.fetch("FLIPPER_BACKOFF_MULTIPLIER", MULTIPLIER).to_f + } + @randomization_factor = options.fetch(:randomization_factor) { + ENV.fetch("FLIPPER_BACKOFF_RANDOMIZATION_FACTOR", RANDOMIZATION_FACTOR).to_f + } + + unless @min_timeout_ms >= 0 + raise ArgumentError, ":min_timeout_ms must be >= 0 but was #{@min_timeout_ms.inspect}" + end + + unless @max_timeout_ms >= 0 + raise ArgumentError, ":max_timeout_ms must be >= 0 but was #{@max_timeout_ms.inspect}" + end + + unless @min_timeout_ms <= max_timeout_ms + raise ArgumentError, ":min_timeout_ms (#{@min_timeout_ms.inspect}) must be <= :max_timeout_ms (#{@max_timeout_ms.inspect})" + end + + @attempts = 0 + end + + # Public: Returns the next backoff interval in milliseconds. + def next_interval + interval = @min_timeout_ms * (@multiplier**@attempts) + interval = add_jitter(interval, @randomization_factor) + + @attempts += 1 + + [interval, @max_timeout_ms].min + end + + def reset + @attempts = 0 + end + + private + + def add_jitter(base, randomization_factor) + random_number = rand + max_deviation = base * randomization_factor + deviation = random_number * max_deviation + + if random_number < 0.5 + base - deviation + else + base + deviation + end + end + end + end + end +end diff --git a/lib/flipper/cloud/telemetry/instrumenter.rb b/lib/flipper/cloud/telemetry/instrumenter.rb new file mode 100644 index 000000000..8e70d237d --- /dev/null +++ b/lib/flipper/cloud/telemetry/instrumenter.rb @@ -0,0 +1,26 @@ +require "delegate" + +module Flipper + module Cloud + class Telemetry + class Instrumenter < SimpleDelegator + def initialize(cloud_configuration, instrumenter) + super instrumenter + @cloud_configuration = cloud_configuration + end + + def instrument(name, payload = {}, &block) + return_value = instrumenter.instrument(name, payload, &block) + @cloud_configuration.telemetry.record(name, payload) + return_value + end + + private + + def instrumenter + __getobj__ + end + end + end + end +end diff --git a/lib/flipper/cloud/telemetry/metric.rb b/lib/flipper/cloud/telemetry/metric.rb new file mode 100644 index 000000000..46c0e2475 --- /dev/null +++ b/lib/flipper/cloud/telemetry/metric.rb @@ -0,0 +1,39 @@ +module Flipper + module Cloud + class Telemetry + class Metric + attr_reader :key, :time, :result + + def initialize(key, result, time = Time.now) + @key = key + @result = result + @time = time.to_i / 60 * 60 + end + + def as_json(options = {}) + data = { + "key" => key.to_s, + "time" => time, + "result" => result, + } + + if options[:with] + data.merge!(options[:with]) + end + + data + end + + def eql?(other) + self.class.eql?(other.class) && + @key == other.key && @time == other.time && @result == other.result + end + alias :== :eql? + + def hash + [self.class, @key, @time, @result].hash + end + end + end + end +end diff --git a/lib/flipper/cloud/telemetry/metric_storage.rb b/lib/flipper/cloud/telemetry/metric_storage.rb new file mode 100644 index 000000000..a647467a5 --- /dev/null +++ b/lib/flipper/cloud/telemetry/metric_storage.rb @@ -0,0 +1,30 @@ +require 'concurrent/map' +require 'concurrent/atomic/atomic_fixnum' + +module Flipper + module Cloud + class Telemetry + class MetricStorage + def initialize + @storage = Concurrent::Map.new { |h, k| h[k] = Concurrent::AtomicFixnum.new(0) } + end + + def increment(metric) + @storage[metric].increment + end + + def drain + metrics = {} + @storage.keys.each do |metric| + metrics[metric] = @storage.delete(metric).value + end + metrics.freeze + end + + def empty? + @storage.empty? + end + end + end + end +end diff --git a/lib/flipper/cloud/telemetry/submitter.rb b/lib/flipper/cloud/telemetry/submitter.rb new file mode 100644 index 000000000..eb6a6c449 --- /dev/null +++ b/lib/flipper/cloud/telemetry/submitter.rb @@ -0,0 +1,113 @@ +require "securerandom" +require "flipper/typecast" +require "flipper/cloud/telemetry/backoff_policy" + +module Flipper + module Cloud + class Telemetry + class Submitter + PATH = "/telemetry".freeze + SCHEMA_VERSION = "V1".freeze + GZIP_ENCODING = "gzip".freeze + + Error = Class.new(StandardError) do + attr_reader :request_id, :response + + def initialize(request_id, response) + @request_id = request_id + @response = response + super "Unexpected response code=#{response.code} request_id=#{request_id}" + end + end + + attr_reader :cloud_configuration, :request_id, :backoff_policy + + def initialize(cloud_configuration, backoff_policy: nil) + @cloud_configuration = cloud_configuration + @backoff_policy = backoff_policy || BackoffPolicy.new + reset + end + + def call(drained) + return if drained.empty? + body = to_body(drained) + retry_with_backoff(10) { submit(body) } + ensure + reset + end + + private + + def reset + @backoff_policy.reset + @request_id = SecureRandom.uuid + end + + def submit(body) + client = @cloud_configuration.http_client + client.add_header :schema_version, SCHEMA_VERSION + client.add_header :content_encoding, GZIP_ENCODING + + response = client.post PATH, body + code = response.code.to_i + + # what about redirects? + if code < 200 || code == 429 || code >= 500 + raise Error.new(request_id, response) + end + + response + end + + def retry_with_backoff(attempts, &block) + result, caught_exception = nil + should_retry = false + attempts_remaining = attempts - 1 + + begin + result, should_retry = yield + return [result, nil] unless should_retry + rescue => error + logger.error "name=flipper_telemetry action=post_to_cloud attempts_remaining=#{attempts_remaining} error=#{error.inspect}" + should_retry = true + caught_exception = error + end + + if should_retry && attempts_remaining > 0 + sleep @backoff_policy.next_interval.to_f / 1000 + retry_with_backoff attempts_remaining, &block + else + [result, caught_exception] + end + end + + def to_body(drained) + enabled_metrics = drained.map { |metric, value| + metric.as_json(with: {"value" => value}) + } + + json = Typecast.to_json({ + request_id: request_id, + enabled_metrics: enabled_metrics, + }) + + Typecast.to_gzip(json) + rescue => exception + error(exception) + end + + def debug(message) + logger.debug { "name=flipper_telemetry action=post_to_cloud #{message}" } + end + + def error(error) + logger.error "name=flipper_telemetry action=post_to_cloud request_id=#{request_id} error=#{error.inspect}" + end + + def logger + @cloud_configuration.telemetry_logger + end + end + end + end +end diff --git a/lib/flipper/exporters/json/export.rb b/lib/flipper/exporters/json/export.rb index 580b7c644..d27cc11c1 100644 --- a/lib/flipper/exporters/json/export.rb +++ b/lib/flipper/exporters/json/export.rb @@ -18,7 +18,7 @@ def initialize(contents:, version: 1) # Public: The features hash identical to calling get_all on adapter. def features @features ||= begin - features = JSON.parse(contents).fetch("features") + features = Typecast.from_json(contents).fetch("features") Typecast.features_hash(features) rescue JSON::ParserError raise JsonError diff --git a/lib/flipper/exporters/json/v1.rb b/lib/flipper/exporters/json/v1.rb index b999d7b47..52f81f773 100644 --- a/lib/flipper/exporters/json/v1.rb +++ b/lib/flipper/exporters/json/v1.rb @@ -20,7 +20,7 @@ def call(adapter) end end - json = JSON.dump({ + json = Typecast.to_json({ version: VERSION, features: features, }) diff --git a/lib/flipper/poller.rb b/lib/flipper/poller.rb index 30072190c..29bdaef44 100644 --- a/lib/flipper/poller.rb +++ b/lib/flipper/poller.rb @@ -17,7 +17,7 @@ def self.get(key, options = {}) end def self.reset - instances.each {|_,poller| poller.stop }.clear + instances.each {|_, instance| instance.stop }.clear end def initialize(options = {}) diff --git a/lib/flipper/serializers/gzip.rb b/lib/flipper/serializers/gzip.rb new file mode 100644 index 000000000..1274c4f57 --- /dev/null +++ b/lib/flipper/serializers/gzip.rb @@ -0,0 +1,24 @@ +require "zlib" +require "stringio" + +module Flipper + module Serializers + module Gzip + module_function + + def serialize(source) + return if source.nil? + output = StringIO.new + gz = Zlib::GzipWriter.new(output) + gz.write(source) + gz.close + output.string + end + + def deserialize(source) + return if source.nil? + Zlib::GzipReader.wrap(StringIO.new(source), &:read) + end + end + end +end diff --git a/lib/flipper/serializers/json.rb b/lib/flipper/serializers/json.rb new file mode 100644 index 000000000..df09b403d --- /dev/null +++ b/lib/flipper/serializers/json.rb @@ -0,0 +1,19 @@ +require "json" + +module Flipper + module Serializers + module Json + module_function + + def serialize(source) + return if source.nil? + JSON.generate(source) + end + + def deserialize(source) + return if source.nil? + JSON.parse(source) + end + end + end +end diff --git a/lib/flipper/typecast.rb b/lib/flipper/typecast.rb index 03b6da4cc..269ecf016 100644 --- a/lib/flipper/typecast.rb +++ b/lib/flipper/typecast.rb @@ -1,4 +1,6 @@ require 'set' +require "flipper/serializers/json" +require "flipper/serializers/gzip" module Flipper module Typecast @@ -89,5 +91,21 @@ def self.features_hash(source) end normalized_source end + + def self.to_json(source) + Serializers::Json.serialize(source) + end + + def self.from_json(source) + Serializers::Json.deserialize(source) + end + + def self.to_gzip(source) + Serializers::Gzip.serialize(source) + end + + def self.from_gzip(source) + Serializers::Gzip.deserialize(source) + end end end diff --git a/lib/flipper/ui/action.rb b/lib/flipper/ui/action.rb index 4d41da079..54d647dbf 100644 --- a/lib/flipper/ui/action.rb +++ b/lib/flipper/ui/action.rb @@ -176,7 +176,7 @@ def json_response(object) when String object else - JSON.dump(object) + Typecast.to_json(object) end halt [@code, @headers, [body]] end diff --git a/spec/flipper/cloud/configuration_spec.rb b/spec/flipper/cloud/configuration_spec.rb index 731a16f8c..d60a40f86 100644 --- a/spec/flipper/cloud/configuration_spec.rb +++ b/spec/flipper/cloud/configuration_spec.rb @@ -61,14 +61,14 @@ end it "can set sync_interval" do - instance = described_class.new(required_options.merge(sync_interval: 1)) - expect(instance.sync_interval).to eq(1) + instance = described_class.new(required_options.merge(sync_interval: 15)) + expect(instance.sync_interval).to eq(15) end it "can set sync_interval from ENV var" do - with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "5" do + with_env "FLIPPER_CLOUD_SYNC_INTERVAL" => "15" do instance = described_class.new(required_options.reject { |k, v| k == :sync_interval }) - expect(instance.sync_interval).to eq(5) + expect(instance.sync_interval).to eq(15) end end @@ -76,9 +76,24 @@ # The initial sync of http to local invokes this web request. stub_request(:get, /flippercloud\.io/).to_return(status: 200, body: "{}") - instance = described_class.new(required_options.merge(sync_interval: 1)) + instance = described_class.new(required_options.merge(sync_interval: 20)) poller = instance.send(:poller) - expect(poller.interval).to eq(1) + expect(poller.interval).to eq(20) + end + + it "can set telemetry_interval" do + instance = described_class.new(required_options.merge(telemetry_interval: 10)) + expect(instance.telemetry_interval).to eq(10) + end + + it "defaults telemetry_interval" do + instance = described_class.new(required_options) + expect(instance.telemetry_interval).to eq(60) + end + + it "cannot set telemetry_interval to lower than 10" do + instance = described_class.new(required_options.merge(telemetry_interval: 9)) + expect(instance.telemetry_interval).to eq(10) end it "can set debug_output" do @@ -249,21 +264,4 @@ expect(all["search"][:boolean]).to eq("true") expect(all["history"][:boolean]).to eq(nil) end - - it "can setup brow to report events to cloud" do - # skip logging brow - Brow.logger = Logger.new(File::NULL) - brow = described_class.new(required_options).brow - - stub = stub_request(:post, "https://www.flippercloud.io/adapter/events") - .with { |request| - data = JSON.parse(request.body) - data.keys == ["uuid", "messages"] && data["messages"] == [{"n" => 1}] - } - .to_return(status: 201, body: "{}", headers: {}) - - brow.push({"n" => 1}) - brow.worker.stop - expect(stub).to have_been_requested.times(1) - end end diff --git a/spec/flipper/cloud/telemetry/backoff_policy_spec.rb b/spec/flipper/cloud/telemetry/backoff_policy_spec.rb new file mode 100644 index 000000000..71cf847a6 --- /dev/null +++ b/spec/flipper/cloud/telemetry/backoff_policy_spec.rb @@ -0,0 +1,108 @@ +require 'flipper/cloud/telemetry/backoff_policy' + +RSpec.describe Flipper::Cloud::Telemetry::BackoffPolicy do + context "#initialize" do + it "with no options" do + policy = described_class.new + expect(policy.min_timeout_ms).to eq(1_000) + expect(policy.max_timeout_ms).to eq(30_000) + expect(policy.multiplier).to eq(1.5) + expect(policy.randomization_factor).to eq(0.5) + end + + it "with options" do + policy = described_class.new({ + min_timeout_ms: 1234, + max_timeout_ms: 5678, + multiplier: 24, + randomization_factor: 0.4, + }) + expect(policy.min_timeout_ms).to eq(1234) + expect(policy.max_timeout_ms).to eq(5678) + expect(policy.multiplier).to eq(24) + expect(policy.randomization_factor).to eq(0.4) + end + + it "with min higher than max" do + expect { + described_class.new({ + min_timeout_ms: 2, + max_timeout_ms: 1, + }) + }.to raise_error(ArgumentError, ":min_timeout_ms (2) must be <= :max_timeout_ms (1)") + end + + it "with invalid min_timeout_ms" do + expect { + described_class.new({ + min_timeout_ms: -1, + }) + }.to raise_error(ArgumentError, ":min_timeout_ms must be >= 0 but was -1") + end + + it "with invalid max_timeout_ms" do + expect { + described_class.new({ + max_timeout_ms: -1, + }) + }.to raise_error(ArgumentError, ":max_timeout_ms must be >= 0 but was -1") + end + + it "from env" do + env = { + "FLIPPER_BACKOFF_MIN_TIMEOUT_MS" => "1000", + "FLIPPER_BACKOFF_MAX_TIMEOUT_MS" => "2000", + "FLIPPER_BACKOFF_MULTIPLIER" => "1.9", + "FLIPPER_BACKOFF_RANDOMIZATION_FACTOR" => "0.1", + } + with_env env do + policy = described_class.new + expect(policy.min_timeout_ms).to eq(1000) + expect(policy.max_timeout_ms).to eq(2000) + expect(policy.multiplier).to eq(1.9) + expect(policy.randomization_factor).to eq(0.1) + end + end + end + + context "#next_interval" do + it "works" do + policy = described_class.new({ + min_timeout_ms: 1_000, + max_timeout_ms: 10_000, + multiplier: 2, + randomization_factor: 0.5, + }) + + expect(policy.next_interval).to be_within(500).of(1000) + expect(policy.next_interval).to be_within(1000).of(2000) + expect(policy.next_interval).to be_within(2000).of(4000) + expect(policy.next_interval).to be_within(4000).of(8000) + end + + it "caps maximum duration at max_timeout_secs" do + policy = described_class.new({ + min_timeout_ms: 1_000, + max_timeout_ms: 10_000, + multiplier: 2, + randomization_factor: 0.5, + }) + 10.times { policy.next_interval } + expect(policy.next_interval).to eq(10_000) + end + end + + it "can reset" do + policy = described_class.new({ + min_timeout_ms: 1_000, + max_timeout_ms: 10_000, + multiplier: 2, + randomization_factor: 0.5, + }) + 10.times { policy.next_interval } + + expect(policy.attempts).to eq(10) + policy.reset + expect(policy.attempts).to eq(0) + end +end diff --git a/spec/flipper/cloud/telemetry/metric_spec.rb b/spec/flipper/cloud/telemetry/metric_spec.rb new file mode 100644 index 000000000..90e4ceb93 --- /dev/null +++ b/spec/flipper/cloud/telemetry/metric_spec.rb @@ -0,0 +1,87 @@ +require 'flipper/cloud/telemetry/metric' + +RSpec.describe Flipper::Cloud::Telemetry::Metric do + it 'has key, result and time' do + metric = described_class.new(:search, true, 1696793160) + expect(metric.key).to eq(:search) + expect(metric.result).to eq(true) + expect(metric.time).to eq(1696793160) + end + + it "clamps time to minute" do + metric = described_class.new(:search, true, 1696793204) + expect(metric.time).to eq(1696793160) + end + + describe "#eql?" do + it "returns true when key, time and result are the same" do + metric = described_class.new(:search, true, 1696793204) + other = described_class.new(:search, true, 1696793204) + expect(metric.eql?(other)).to be(true) + end + + it "returns false for other class" do + metric = described_class.new(:search, true, 1696793204) + other = Object.new + expect(metric.eql?(other)).to be(false) + end + + it "returns false for sub class" do + metric = described_class.new(:search, true, 1696793204) + other = Class.new(described_class).new(:search, true, 1696793204) + expect(metric.eql?(other)).to be(false) + end + + it "returns false if key is different" do + metric = described_class.new(:search, true, 1696793204) + other = described_class.new(:other, true, 1696793204) + expect(metric.eql?(other)).to be(false) + end + + it "returns false if time is different" do + metric = described_class.new(:search, true, 1696793204) + other = described_class.new(:search, true, 1696793204 - 60 - 60) + expect(metric.eql?(other)).to be(false) + end + + it "returns true with different times if times are in same minute" do + metric = described_class.new(:search, true, 1696793204) + other = described_class.new(:search, true, 1696793206) + expect(metric.eql?(other)).to be(true) + end + + it "returns false if result is different" do + metric = described_class.new(:search, true, 1696793204) + other = described_class.new(:search, false, 1696793204) + expect(metric.eql?(other)).to be(false) + end + end + + describe "#hash" do + it "returns hash based on class, key, time and result" do + metric = described_class.new(:search, true, 1696793204) + expect(metric.hash).to eq([described_class, metric.key, metric.time, metric.result].hash) + end + end + + describe "#as_json" do + it "returns key time and result" do + metric = described_class.new(:search, true, 1696793160) + expect(metric.as_json).to eq({ + "key" => "search", + "result" => true, + "time" => 1696793160, + }) + end + + it "can include other hashes" do + metric = described_class.new(:search, true, 1696793160) + expect(metric.as_json(with: {"value" => 2})).to eq({ + "key" => "search", + "result" => true, + "time" => 1696793160, + "value" => 2, + }) + end + end +end diff --git a/spec/flipper/cloud/telemetry/metric_storage_spec.rb b/spec/flipper/cloud/telemetry/metric_storage_spec.rb new file mode 100644 index 000000000..b653f155b --- /dev/null +++ b/spec/flipper/cloud/telemetry/metric_storage_spec.rb @@ -0,0 +1,58 @@ +require 'flipper/cloud/telemetry/metric_storage' +require 'flipper/cloud/telemetry/metric' + +RSpec.describe Flipper::Cloud::Telemetry::MetricStorage do + describe "#increment" do + it "increments the counter for the metric" do + metric_storage = described_class.new + storage = metric_storage.instance_variable_get(:@storage) + metric = Flipper::Cloud::Telemetry::Metric.new(:search, true, 1696793160) + other = Flipper::Cloud::Telemetry::Metric.new(:search, false, 1696793160) + + metric_storage.increment(metric) + expect(storage[metric].value).to be(1) + + 5.times { metric_storage.increment(metric) } + expect(storage[metric].value).to be(6) + + metric_storage.increment(other) + expect(storage[other].value).to be(1) + end + end + + describe "#drain" do + it "returns clears metrics and return hash" do + metric_storage = described_class.new + storage = metric_storage.instance_variable_get(:@storage) + storage[Flipper::Cloud::Telemetry::Metric.new(:search, true, 1696793160)] = Concurrent::AtomicFixnum.new(10) + storage[Flipper::Cloud::Telemetry::Metric.new(:search, false, 1696793161)] = Concurrent::AtomicFixnum.new(15) + storage[Flipper::Cloud::Telemetry::Metric.new(:plausible, true, 1696793162)] = Concurrent::AtomicFixnum.new(25) + storage[Flipper::Cloud::Telemetry::Metric.new(:administrator, true, 1696793164)] = Concurrent::AtomicFixnum.new(1) + storage[Flipper::Cloud::Telemetry::Metric.new(:administrator, false, 1696793164)] = Concurrent::AtomicFixnum.new(24) + + drained = metric_storage.drain + expect(drained).to be_frozen + expect(drained).to eq({ + Flipper::Cloud::Telemetry::Metric.new(:search, true, 1696793160) => 10, + Flipper::Cloud::Telemetry::Metric.new(:search, false, 1696793161) => 15, + Flipper::Cloud::Telemetry::Metric.new(:plausible, true, 1696793162) => 25, + Flipper::Cloud::Telemetry::Metric.new(:administrator, true, 1696793164) => 1, + Flipper::Cloud::Telemetry::Metric.new(:administrator, false, 1696793164) => 24, + }) + expect(storage.keys).to eq([]) + end + end + + describe "#empty?" do + it "returns true if empty" do + metric_storage = described_class.new + expect(metric_storage).to be_empty + end + + it "returns false if not empty" do + metric_storage = described_class.new + metric_storage.increment Flipper::Cloud::Telemetry::Metric.new(:search, true, 1696793160) + expect(metric_storage).not_to be_empty + end + end +end diff --git a/spec/flipper/cloud/telemetry/submitter_spec.rb b/spec/flipper/cloud/telemetry/submitter_spec.rb new file mode 100644 index 000000000..3e35d342c --- /dev/null +++ b/spec/flipper/cloud/telemetry/submitter_spec.rb @@ -0,0 +1,145 @@ +require "stringio" +require 'flipper/cloud/configuration' +require 'flipper/cloud/telemetry/submitter' + +RSpec.describe Flipper::Cloud::Telemetry::Submitter do + let(:cloud_configuration) { + Flipper::Cloud::Configuration.new({token: "asdf"}) + } + let(:fake_backoff_policy) { FakeBackoffPolicy.new } + let(:subject) { described_class.new(cloud_configuration, backoff_policy: fake_backoff_policy) } + + describe "#initialize" do + it "works with cloud_configuration" do + submitter = described_class.new(cloud_configuration) + expect(submitter.cloud_configuration).to eq(cloud_configuration) + end + end + + describe "#call" do + let(:enabled_metrics) { + { + Flipper::Cloud::Telemetry::Metric.new(:search, true, 1696793160) => 10, + Flipper::Cloud::Telemetry::Metric.new(:search, false, 1696793161) => 15, + Flipper::Cloud::Telemetry::Metric.new(:plausible, true, 1696793162) => 25, + Flipper::Cloud::Telemetry::Metric.new(:administrator, true, 1696793164) => 1, + Flipper::Cloud::Telemetry::Metric.new(:administrator, false, 1696793164) => 24, + } + } + + it "does not submit blank metrics" do + expect(subject.call({})).to be(nil) + end + + it "submits present metrics" do + expected_body = { + "request_id" => subject.request_id, + "enabled_metrics" =>[ + {"key" => "search", "time" => 1696793160, "result" => true, "value" => 10}, + {"key" => "search", "time" => 1696793160, "result" => false, "value" => 15}, + {"key" => "plausible", "time" => 1696793160, "result" => true, "value" => 25}, + {"key" => "administrator", "time" => 1696793160, "result" => true, "value" => 1}, + {"key" => "administrator", "time" => 1696793160, "result" => false, "value" => 24}, + ] + } + expected_headers = { + 'Accept' => 'application/json', + 'Client-Engine' => defined?(RUBY_ENGINE) ? RUBY_ENGINE : "", + 'Client-Hostname' => Socket.gethostname, + 'Client-Language' => 'ruby', + 'Client-Language-Version' => "#{RUBY_VERSION} p#{RUBY_PATCHLEVEL} (#{RUBY_RELEASE_DATE})", + 'Client-Pid' => Process.pid.to_s, + 'Client-Platform' => RUBY_PLATFORM, + 'Client-Thread' => Thread.current.object_id.to_s, + 'Content-Encoding' => 'gzip', + 'Content-Type' => 'application/json', + 'Flipper-Cloud-Token' => 'asdf', + 'Schema-Version' => 'V1', + 'User-Agent' => "Flipper HTTP Adapter v#{Flipper::VERSION}", + } + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + with { |request| + gunzipped = Flipper::Typecast.from_gzip(request.body) + body = Flipper::Typecast.from_json(gunzipped) + body == expected_body && request.headers == expected_headers + }.to_return(status: 200, body: "{}", headers: {}) + subject.call(enabled_metrics) + end + + it "defaults backoff_policy" do + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 429, body: "{}", headers: {}). + to_return(status: 200, body: "{}", headers: {}) + instance = described_class.new(cloud_configuration) + expect(instance.backoff_policy.min_timeout_ms).to eq(1_000) + expect(instance.backoff_policy.max_timeout_ms).to eq(30_000) + end + + it "tries 10 times by default" do + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 500, body: "{}", headers: {}) + subject.call(enabled_metrics) + expect(subject.backoff_policy.retries).to eq(9) # 9 retries + 1 initial attempt + end + + [ + EOFError, + Errno::ECONNABORTED, + Errno::ECONNREFUSED, + Errno::ECONNRESET, + Errno::EHOSTUNREACH, + Errno::EINVAL, + Errno::ENETUNREACH, + Errno::ENOTSOCK, + Errno::EPIPE, + Errno::ETIMEDOUT, + Net::HTTPBadResponse, + Net::HTTPHeaderSyntaxError, + Net::ProtocolError, + Net::ReadTimeout, + OpenSSL::SSL::SSLError, + SocketError, + Timeout::Error, # Also covers subclasses like Net::OpenTimeout. + ].each do |error_class| + it "retries on #{error_class}" do + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_raise(error_class) + subject.call(enabled_metrics) + expect(subject.backoff_policy.retries).to eq(9) + end + end + + it "retries on 429" do + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 429, body: "{}", headers: {}). + to_return(status: 429, body: "{}", headers: {}). + to_return(status: 200, body: "{}", headers: {}) + subject.call(enabled_metrics) + expect(subject.backoff_policy.retries).to eq(2) + end + + it "retries on 500" do + stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 500, body: "{}", headers: {}). + to_return(status: 503, body: "{}", headers: {}). + to_return(status: 502, body: "{}", headers: {}). + to_return(status: 200, body: "{}", headers: {}) + subject.call(enabled_metrics) + expect(subject.backoff_policy.retries).to eq(3) + end + end + + def with_telemetry_debug_logging(&block) + output = StringIO.new + original_logger = cloud_configuration.telemetry_logger + + begin + cloud_configuration.telemetry_logger = Logger.new(output) + block.call + ensure + cloud_configuration.telemetry_logger = original_logger + end + + output.string + end +end diff --git a/spec/flipper/cloud/telemetry_spec.rb b/spec/flipper/cloud/telemetry_spec.rb new file mode 100644 index 000000000..0aa8fddb2 --- /dev/null +++ b/spec/flipper/cloud/telemetry_spec.rb @@ -0,0 +1,151 @@ +require 'flipper/cloud/telemetry' +require 'flipper/cloud/configuration' + +RSpec.describe Flipper::Cloud::Telemetry do + describe '#record' do + it "phones home and does not update telemetry interval if missing" do + stub = stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 200, body: "{}", headers: {}) + + cloud_configuration = Flipper::Cloud::Configuration.new(token: "test") + + # Record some telemetry and stop the threads so we submit a response. + telemetry = described_class.new(cloud_configuration) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.stop + + expect(cloud_configuration.telemetry_interval).to eq(60) + expect(telemetry.timer.execution_interval).to eq(60) + expect(stub).to have_been_requested + end + + it "phones home and updates telemetry interval if present" do + stub = stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 200, body: "{}", headers: {"telemetry-interval" => "120"}) + + cloud_configuration = Flipper::Cloud::Configuration.new(token: "test") + + # Record some telemetry and stop the threads so we submit a response. + telemetry = described_class.new(cloud_configuration) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.stop + + expect(cloud_configuration.telemetry_interval).to eq(120) + expect(telemetry.timer.execution_interval).to eq(120) + expect(stub).to have_been_requested + end + + it "can update telemetry interval from error" do + stub = stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_return(status: 500, body: "{}", headers: {"telemetry-interval" => "120"}) + + cloud_configuration = Flipper::Cloud::Configuration.new(token: "test") + # Override the submitter to use back off policy that doesn't actually + # sleep. If we don't then the stop below kills the working thread and the + # interval is never updated. + cloud_configuration.telemetry_submitter = ->(drained) { + Flipper::Cloud::Telemetry::Submitter.new( + cloud_configuration, + backoff_policy: FakeBackoffPolicy.new + ).call(drained) + } + + # Record some telemetry and stop the threads so we submit a response. + telemetry = described_class.new(cloud_configuration) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.stop + + # Check the conig interval and the timer interval. + expect(cloud_configuration.telemetry_interval).to eq(120) + expect(telemetry.timer.execution_interval).to eq(120) + expect(stub).to have_been_requested.times(10) + end + + it "doesn't try to update telemetry interval from error if not response error" do + stub = stub_request(:post, "https://www.flippercloud.io/adapter/telemetry"). + to_raise(Net::OpenTimeout) + + cloud_configuration = Flipper::Cloud::Configuration.new(token: "test") + cloud_configuration.telemetry_submitter = ->(drained) { + Flipper::Cloud::Telemetry::Submitter.new( + cloud_configuration, + backoff_policy: FakeBackoffPolicy.new + ).call(drained) + } + + # Record some telemetry and stop the threads so we submit a response. + telemetry = described_class.new(cloud_configuration) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.stop + + expect(cloud_configuration.telemetry_interval).to eq(60) + expect(telemetry.timer.execution_interval).to eq(60) + expect(stub).to have_been_requested.times(10) + end + + it "increments in metric storage" do + begin + config = Flipper::Cloud::Configuration.new(token: "test") + telemetry = described_class.new(config) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :bar, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :baz, + result: true, + }) + telemetry.record(Flipper::Feature::InstrumentationName, { + operation: :enabled?, + feature_name: :foo, + result: false, + }) + + drained = telemetry.metric_storage.drain + metrics_by_key = drained.keys.group_by(&:key) + + foo_true, foo_false = metrics_by_key["foo"].partition { |metric| metric.result } + foo_true_sum = foo_true.map { |metric| drained[metric] }.sum + expect(foo_true_sum).to be(2) + foo_false_sum = foo_false.map { |metric| drained[metric] }.sum + expect(foo_false_sum).to be(1) + + bar_true_sum = metrics_by_key["bar"].map { |metric| drained[metric] }.sum + expect(bar_true_sum).to be(1) + + baz_true_sum = metrics_by_key["baz"].map { |metric| drained[metric] }.sum + expect(baz_true_sum).to be(1) + ensure + telemetry.stop + end + end + end +end diff --git a/spec/flipper/serializers/gzip_spec.rb b/spec/flipper/serializers/gzip_spec.rb new file mode 100644 index 000000000..cc1cc919c --- /dev/null +++ b/spec/flipper/serializers/gzip_spec.rb @@ -0,0 +1,13 @@ +require 'flipper/serializers/gzip' + +RSpec.describe Flipper::Serializers::Gzip do + it "serializes and deserializes" do + serialized = described_class.serialize("my data") + expect(described_class.deserialize(serialized)).to eq("my data") + end + + it "doesn't fail with nil" do + expect(described_class.serialize(nil)).to be(nil) + expect(described_class.deserialize(nil)).to be(nil) + end +end diff --git a/spec/flipper/serializers/json_spec.rb b/spec/flipper/serializers/json_spec.rb new file mode 100644 index 000000000..cb130c12a --- /dev/null +++ b/spec/flipper/serializers/json_spec.rb @@ -0,0 +1,13 @@ +require 'flipper/serializers/json' + +RSpec.describe Flipper::Serializers::Json do + it "serializes and deserializes" do + serialized = described_class.serialize("my data") + expect(described_class.deserialize(serialized)).to eq("my data") + end + + it "doesn't fail with nil" do + expect(described_class.serialize(nil)).to be(nil) + expect(described_class.deserialize(nil)).to be(nil) + end +end diff --git a/spec/flipper/typecast_spec.rb b/spec/flipper/typecast_spec.rb index 7a1c7179c..a3792ec71 100644 --- a/spec/flipper/typecast_spec.rb +++ b/spec/flipper/typecast_spec.rb @@ -217,4 +217,16 @@ }) end end + + it "converts to and from json" do + source = {"foo" => "bar"} + output = described_class.to_json(source) + expect(described_class.from_json(output)).to eq(source) + end + + it "converts to and from gzip" do + source = "foo bar" + output = described_class.to_gzip(source) + expect(described_class.from_gzip(output)).to eq(source) + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d765b08e5..86e8c1d67 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,8 +20,12 @@ Dir[FlipperRoot.join('spec/support/**/*.rb')].sort.each { |f| require f } +# Disable telemetry logging in specs. +ENV["FLIPPER_CLOUD_TELEMETRY_LOGGING"] = "0" + RSpec.configure do |config| config.before(:example) do + Flipper::Cloud::Telemetry.reset if defined?(Flipper::Cloud::Telemetry) && Flipper::Cloud::Telemetry.respond_to?(:reset) Flipper::Poller.reset if defined?(Flipper::Poller) Flipper.unregister_groups Flipper.configuration = nil diff --git a/spec/support/fake_backoff_policy.rb b/spec/support/fake_backoff_policy.rb new file mode 100644 index 000000000..e89307518 --- /dev/null +++ b/spec/support/fake_backoff_policy.rb @@ -0,0 +1,15 @@ +class FakeBackoffPolicy + def initialize + @retries = 0 + end + + attr_reader :retries + + def next_interval + @retries += 1 + 0 + end + + def reset + end +end