diff --git a/.rubocop.yml b/.rubocop.yml index ca78757..157be72 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -9,7 +9,7 @@ AllCops: - 'vendor/**/*' Metrics/ClassLength: - Max: 200 + Max: 300 Metrics/ModuleLength: Max: 150 diff --git a/lib/cloudtasker/batch/batch_progress.rb b/lib/cloudtasker/batch/batch_progress.rb index 4e5a3a4..19325bd 100644 --- a/lib/cloudtasker/batch/batch_progress.rb +++ b/lib/cloudtasker/batch/batch_progress.rb @@ -6,15 +6,28 @@ module Cloudtasker module Batch # Capture the progress of a batch class BatchProgress - attr_reader :batch_state + attr_reader :batches # # Build a new instance of the class. # - # @param [Hash] batch_state The batch state + # @param [Array] batches The batches to consider # - def initialize(batch_state = {}) - @batch_state = batch_state + def initialize(batches = []) + @batches = batches + end + + # Count the number of items in a given status + + # + # Count the number of items in a given status + # + # @param [String] status The status to count + # + # @return [Integer] The number of jobs in the status + # + def count(status = 'all') + batches.sum { |e| e.batch_state_count(status) } end # @@ -122,16 +135,7 @@ def percent(min_total: 0, smoothing: 0) # @return [Cloudtasker::Batch::BatchProgress] The sum of the two batch progresses. # def +(other) - self.class.new(batch_state.to_h.merge(other.batch_state.to_h)) - end - - private - - # Count the number of items in a given status - def count(status = nil) - return batch_state.to_h.keys.size unless status - - batch_state.to_h.values.count { |e| e == status } + self.class.new(batches + other.batches) end end end diff --git a/lib/cloudtasker/batch/job.rb b/lib/cloudtasker/batch/job.rb index 10f72fa..2fd56ff 100644 --- a/lib/cloudtasker/batch/job.rb +++ b/lib/cloudtasker/batch/job.rb @@ -24,6 +24,7 @@ class Job # means that the job will never succeed. There is no point in blocking # the batch forever so we proceed forward eventually. # + BATCH_STATUSES = %w[scheduled processing completed errored dead all].freeze COMPLETION_STATUSES = %w[completed dead].freeze # These callbacks do not need to raise errors on their own @@ -182,6 +183,25 @@ def batch_state_gid key("#{STATES_NAMESPACE}/#{batch_id}") end + # + # Return the key under which the batch progress is stored + # for a specific state. + # + # @return [String] The batch progress state namespaced id. + # + def batch_state_count_gid(state) + "#{batch_state_gid}/state_count/#{state}" + end + + # + # Return the number of jobs in a given state + # + # @return [String] The batch progress state namespaced id. + # + def batch_state_count(state) + redis.get(batch_state_count_gid(state)).to_i + end + # # The list of jobs to be enqueued in the batch # @@ -258,6 +278,28 @@ def migrate_batch_state_to_redis_hash end end + # + # This method initializes the batch job counters if not set already + # + def migrate_progress_stats_to_redis_counters + # Abort if counters have already been set. The 'all' counter acts as a feature flag. + return if redis.exists?(batch_state_count_gid('all')) + + # Get all job states + values = batch_state.values + + # Count by value + redis.multi do |m| + # Per status + values.tally.each do |k, v| + m.set(batch_state_count_gid(k), v) + end + + # All counter + m.set(batch_state_count_gid('all'), values.size) + end + end + # # Save serialized version of the worker. # @@ -278,8 +320,17 @@ def save def update_state(batch_id, status) migrate_batch_state_to_redis_hash + # Get current status + current_status = redis.hget(batch_state_gid, batch_id) + return if current_status == status.to_s + # Update the batch state batch_id entry with the new status - redis.hset(batch_state_gid, batch_id, status) + # and update counters + redis.multi do |m| + m.hset(batch_state_gid, batch_id, status) + m.decr(batch_state_count_gid(current_status)) + m.incr(batch_state_count_gid(status)) + end end # @@ -385,8 +436,11 @@ def cleanup redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup } # Delete batch redis entries - redis.del(batch_gid) - redis.del(batch_state_gid) + redis.multi do |m| + m.del(batch_gid) + m.del(batch_state_gid) + BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) } + end end # @@ -400,17 +454,17 @@ def cleanup def progress(depth: 0) depth = depth.to_i - # Capture batch state - state = batch_state + # Initialize counters from batch state. This is only applicable to running batches + # that started before the counter-based progress was implemented/released. + migrate_progress_stats_to_redis_counters # Return immediately if we do not need to go down the tree - return BatchProgress.new(state) if depth <= 0 + return BatchProgress.new([self]) if depth <= 0 # Sum batch progress of current batch and sub-batches up to the specified # depth - state.to_h.reduce(BatchProgress.new(state)) do |memo, (child_id, child_status)| - memo + (self.class.find(child_id)&.progress(depth: depth - 1) || - BatchProgress.new(child_id => child_status)) + batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)| + memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new) end end @@ -432,7 +486,11 @@ def schedule_pending_jobs # having never-ending batches - which could occur if a batch was crashing # while enqueuing children due to a OOM error and since 'scheduled' is a # blocking status. - redis.hsetnx(batch_state_gid, j.job_id, 'scheduled') + redis.multi do |m| + m.hsetnx(batch_state_gid, j.job_id, 'scheduled') + m.incr(batch_state_count_gid('scheduled')) + m.incr(batch_state_count_gid('all')) + end # Flag job as enqueued ret_list << j diff --git a/spec/cloudtasker/batch/batch_progress_spec.rb b/spec/cloudtasker/batch/batch_progress_spec.rb index 18b2c88..74c83b4 100644 --- a/spec/cloudtasker/batch/batch_progress_spec.rb +++ b/spec/cloudtasker/batch/batch_progress_spec.rb @@ -3,118 +3,119 @@ require 'cloudtasker/batch/middleware' RSpec.describe Cloudtasker::Batch::BatchProgress do - let(:batch_state) do - { - '1' => 'completed', - '2' => 'scheduled', - '3' => 'processing', - '4' => 'errored', - '5' => 'dead' - } - end - let(:batch_progress) { described_class.new(batch_state) } + let(:batch) { instance_double(Cloudtasker::Batch::Job) } + let(:batch_progress) { described_class.new([batch]) } describe '.new' do subject { batch_progress } - it { is_expected.to have_attributes(batch_state: batch_state) } + it { is_expected.to have_attributes(batches: [batch]) } end - describe '#total' do - subject { batch_progress.total } + describe '#count' do + subject { batch_progress.count(*args) } - it { is_expected.to eq(batch_state.keys.count) } - end + let(:args) { [] } + let(:count) { 18 } - describe '#completed' do - subject { batch_progress.completed } - - it { is_expected.to eq(batch_state.values.count { |e| e == 'completed' }) } - end + context 'with no args' do + before { allow(batch).to receive(:batch_state_count).with('all').and_return(count) } + it { is_expected.to eq(count) } + end - describe '#scheduled' do - subject { batch_progress.scheduled } + context 'with status provided' do + let(:args) { ['processing'] } - it { is_expected.to eq(batch_state.values.count { |e| e == 'scheduled' }) } + before { allow(batch).to receive(:batch_state_count).with(args[0]).and_return(count) } + it { is_expected.to eq(count) } + end end - describe '#errored' do - subject { batch_progress.errored } - - it { is_expected.to eq(batch_state.values.count { |e| e == 'errored' }) } - end + describe '#total' do + subject { batch_progress.total } - describe '#dead' do - subject { batch_progress.dead } + let(:count) { 18 } - it { is_expected.to eq(batch_state.values.count { |e| e == 'dead' }) } + before { allow(batch_progress).to receive(:count).and_return(count) } + it { is_expected.to eq(count) } end - describe '#processing' do - subject { batch_progress.processing } + %w[scheduled processing completed errored dead].each do |tested_status| + describe "##{tested_status}" do + subject { batch_progress.send(tested_status) } + + let(:count) { 18 } - it { is_expected.to eq(batch_state.values.count { |e| e == 'processing' }) } + before { allow(batch_progress).to receive(:count).with(tested_status).and_return(count) } + it { is_expected.to eq(count) } + end end describe '#pending' do subject { batch_progress.pending } - it { is_expected.to eq(batch_state.values.count { |e| !%w[dead completed].include?(e) }) } + let(:total) { 25 } + let(:done) { 18 } + + before do + allow(batch_progress).to receive_messages(total: total, done: done) + end + + it { is_expected.to eq(total - done) } end describe '#done' do subject { batch_progress.done } - it { is_expected.to eq(batch_state.values.count { |e| %w[dead completed].include?(e) }) } + let(:completed) { 25 } + let(:dead) { 18 } + + before do + allow(batch_progress).to receive_messages(completed: completed, dead: dead) + end + + it { is_expected.to eq(completed + dead) } end describe '#percent' do subject { batch_progress.percent(**opts) } let(:opts) { {} } + let(:total) { 25 } + let(:done) { 18 } + + before do + allow(batch_progress).to receive_messages(total: total, done: done) + end context 'with batch' do - it { is_expected.to eq((batch_progress.done.to_f / batch_progress.total) * 100) } + it { is_expected.to eq((done.to_f / total) * 100) } end context 'with min_total > total' do let(:opts) { { min_total: 1000 } } - it { is_expected.to eq((batch_progress.done.to_f / opts[:min_total]) * 100) } + it { is_expected.to eq((done.to_f / opts[:min_total]) * 100) } end context 'with min_total < total' do - let(:opts) { { min_total: batch_progress.total / 2 } } + let(:opts) { { min_total: total / 2 } } - it { is_expected.to eq((batch_progress.done.to_f / batch_progress.total) * 100) } + it { is_expected.to eq((done.to_f / total) * 100) } end context 'with additive smoothing' do let(:opts) { { smoothing: 10 } } - it { is_expected.to eq((batch_progress.done.to_f / (batch_progress.total + opts[:smoothing])) * 100) } - end - - context 'with empty elements' do - let(:batch_state) { {} } - - it { is_expected.to be_zero } + it { is_expected.to eq((done.to_f / (total + opts[:smoothing])) * 100) } end end describe '#+' do subject { batch_progress + other } - let(:other_state) do - { - '4' => 'completed', - '5' => 'scheduled', - '6' => 'processing' - } - end - let(:other) { described_class.new(other_state) } + let(:other) { described_class.new([instance_double(Cloudtasker::Batch::Job)]) } - it { is_expected.to be_a(described_class) } - it { is_expected.to have_attributes(batch_state: batch_state.merge(other_state)) } + it { is_expected.to have_attributes(class: described_class, batches: batch_progress.batches + other.batches) } end end diff --git a/spec/cloudtasker/batch/job_spec.rb b/spec/cloudtasker/batch/job_spec.rb index fe19f88..69dd1d0 100644 --- a/spec/cloudtasker/batch/job_spec.rb +++ b/spec/cloudtasker/batch/job_spec.rb @@ -157,6 +157,24 @@ it { is_expected.to eq(batch.key("#{described_class::STATES_NAMESPACE}/#{batch.batch_id}")) } end + describe '#batch_state_count_gid' do + subject { batch.batch_state_count_gid(state) } + + let(:state) { 'processing' } + + it { is_expected.to eq("#{batch.batch_state_gid}/state_count/#{state}") } + end + + describe '#batch_state_count' do + subject { batch.batch_state_count(state) } + + let(:state) { 'processing' } + let(:count) { 18 } + + before { redis.set(batch.batch_state_count_gid(state), count.to_s) } + it { is_expected.to eq(count) } + end + describe '#pending_jobs' do subject { batch.pending_jobs } @@ -240,6 +258,50 @@ end end + describe '#migrate_progress_stats_to_redis_counters' do + subject do + described_class::BATCH_STATUSES.each_with_object({}) do |elem, memo| + memo[elem] = batch.batch_state_count(elem) + end + end + + let(:expected_counters) do + h = (described_class::BATCH_STATUSES - ['all']).each_with_object({}).with_index do |(elem, memo), i| + memo[elem] = i + 1 + end + h.merge('all' => h.values.sum) + end + + context 'with counters already set' do + before do + expected_counters.each { |k, v| redis.set(batch.batch_state_count_gid(k), v) } + + # Since counters are already set, we expect the migration script not to set them + expect(redis).not_to receive(:set) + + # Perform migration + batch.migrate_progress_stats_to_redis_counters + end + it { is_expected.to eq(expected_counters) } + end + + context 'with no counters set' do + let(:batch_state) do + expected_counters.except('all').each_with_object({}) do |(k, v), memo| + v.times { memo[SecureRandom.uuid] = k } + end + end + + before do + allow(batch).to receive(:batch_state).and_return(batch_state) + + # Perform migration + batch.migrate_progress_stats_to_redis_counters + end + it { is_expected.to eq(expected_counters) } + end + end + describe '#save' do let(:batch_content) { redis.fetch(batch.batch_gid) } @@ -262,6 +324,8 @@ expect(batch_state).to eq(child_worker.job_id => 'scheduled') expect(batch.pending_jobs).to be_empty expect(batch.enqueued_jobs).to eq([child_worker]) + expect(batch.batch_state_count('scheduled')).to eq(1) + expect(batch.batch_state_count('all')).to eq(1) end it { is_expected.to eq([child_worker]) } @@ -338,14 +402,20 @@ let(:child_id) { child_batch.batch_id } let(:status) { 'processing' } - let(:initial_state) { { 'foo' => 'bar' } } + let(:initial_state) { { child_id => 'scheduled' } } before do - redis.hset(batch.batch_state_gid, initial_state) if initial_state.present? + redis.hset(batch.batch_state_gid, initial_state) + redis.set(batch.batch_state_count_gid('scheduled'), 1) batch.update_state(child_id, status) expect(batch).to receive(:migrate_batch_state_to_redis_hash).and_call_original end + after do + expect(batch.batch_state_count('scheduled')).to eq(0) + expect(batch.batch_state_count(status)).to eq(1) + end + it { is_expected.to eq(status) } end @@ -580,7 +650,14 @@ subject { redis.keys.sort } let(:side_batch) { described_class.new(worker.new_instance) } - let(:expected_keys) { [side_batch.batch_gid, side_batch.batch_state_gid].sort } + let(:expected_keys) do + [ + side_batch.batch_gid, + side_batch.batch_state_gid, + side_batch.batch_state_count_gid('all'), + side_batch.batch_state_count_gid('scheduled') + ].sort + end before do # Do not enqueue jobs @@ -597,6 +674,9 @@ child_batch.pending_jobs.push(worker.new_instance) child_batch.setup + # Flag a child batch job as completed + child_batch.update_state(child_batch.enqueued_jobs[1].job_id, 'completed') + # Attach child batch to main batch batch.pending_jobs.push(child_worker) batch.setup