Skip to content

Commit

Permalink
Updated custom_size_based_buffer.rb
Browse files Browse the repository at this point in the history
Removed the temp file buffer and added retry to prevent data loss
  • Loading branch information
MonishkaDas committed Oct 17, 2024
1 parent b7aba1a commit ca981ba
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 106 deletions.
9 changes: 2 additions & 7 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
default :codec, 'json_lines'

def register
# Set buffer_file to a valid file path
buffer_file = File.join(Dir.pwd, 'buffer', 'kusto_buffer.dat')

# Ensure the buffer directory exists
FileUtils.mkdir_p(File.dirname(buffer_file))
# Initialize the custom buffer with size, interval, and buffer file
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, buffer_file) do |events|
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end

Expand Down Expand Up @@ -146,9 +141,9 @@ def flush_buffer(events)
begin
@ingestor.upload_async(events.join)
rescue => e
# Log the error and continue
@logger.error("Error during flush: #{e.message}")
@logger.error(e.backtrace.join("\n"))
raise e # Exception is raised to trigger the rescue block in buffer_flush
end
end

Expand Down
99 changes: 13 additions & 86 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
require 'logger'
require 'thread'
require 'fileutils'

module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size_mb, max_interval, buffer_file, &flush_callback)
raise ArgumentError, "buffer_file cannot be nil" if buffer_file.nil?

def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
buffer_file: buffer_file,
logger: Logger.new(STDOUT)
}
@buffer_state = {
Expand All @@ -29,7 +25,6 @@ def initialize(max_size_mb, max_interval, buffer_file, &flush_callback)
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
load_buffer_from_file
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
end

Expand All @@ -51,7 +46,6 @@ def shutdown
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
clear_file_buffer
end

private
Expand All @@ -73,6 +67,8 @@ def buffer_flush(options = {})
end

items_flushed = 0
max_retries = 5
retries = 0

begin
outgoing_items = []
Expand Down Expand Up @@ -101,17 +97,20 @@ def buffer_flush(options = {})
end

begin
@buffer_config[:logger].info("Attempting to flush #{outgoing_items.size} items to the network")
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
clear_flushed_buffer_states(outgoing_items) unless ::File.zero?(@buffer_config[:buffer_file]) # Clear the flushed items from the file
@buffer_config[:logger].info("Successfully flushed #{outgoing_items.size} items to the network")
rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
# Save the items to the file buffer in case of failure
@pending_mutex.synchronize do
@buffer_state[:pending_items] = outgoing_items + @buffer_state[:pending_items]
@buffer_state[:pending_size] += outgoing_size
save_buffer_to_file
@buffer_config[:logger].error(e.backtrace.join("\n"))
retries += 1
if retries <= max_retries
sleep 1
retry
else
@buffer_config[:logger].error("Max retries reached. Data loss may occur.")
raise e
end
raise e
end

@buffer_state[:last_flush] = Time.now.to_i
Expand All @@ -129,78 +128,6 @@ def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end

def clear_flushed_buffer_states(flushed_items)
remaining_buffer_states = []
::File.foreach(@buffer_config[:buffer_file]) do |line|
begin
buffer_state = Marshal.load(line)
buffer_state[:pending_items] -= flushed_items
buffer_state[:pending_size] = buffer_state[:pending_items].sum(&:bytesize)
remaining_buffer_states << buffer_state unless buffer_state[:pending_items].empty?
rescue ArgumentError => e
@buffer_config[:logger].error("Failed to load buffer state: #{e.message}")
next
end
end

::File.open(@buffer_config[:buffer_file], 'w') do |file|
remaining_buffer_states.each do |state|
file.write(Marshal.dump(state) + "\n")
end
end
end

def save_buffer_to_file
buffer_state_copy = @buffer_state.dup
buffer_state_copy.delete(:timer) # Exclude the Thread object from serialization

::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
::File.open(@buffer_config[:buffer_file], 'a') do |file|
file.write(Marshal.dump(buffer_state_copy) + "\n")
end
@buffer_config[:logger].info("Saved buffer state to file")
end

def load_buffer_from_file
::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
::File.open(@buffer_config[:buffer_file], 'a') {} # Create the file if it doesn't exist

if ::File.file?(@buffer_config[:buffer_file]) && !::File.zero?(@buffer_config[:buffer_file])
begin
@pending_mutex.synchronize do
buffer_states = []
::File.foreach(@buffer_config[:buffer_file]) do |line|
buffer_states << Marshal.load(line)
end
@buffer_state = buffer_states.reduce do |acc, state|
acc[:pending_items].concat(state[:pending_items])
acc[:pending_size] += state[:pending_size]
acc
end
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
# Ensure the buffer does not flush immediately upon loading
@buffer_state[:last_flush] = Time.now.to_i
end
@buffer_config[:logger].info("Loaded buffer state from file")
rescue => e
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
buffer_initialize
end
else
buffer_initialize
end
end

def clear_file_buffer
::File.open(@buffer_config[:buffer_file], 'w') {} # Truncate the file
@buffer_config[:logger].info("File buffer cleared on shutdown")
end
end
end
end
13 changes: 0 additions & 13 deletions spec/outputs/kusto_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,6 @@
end

describe '#flush_buffer' do
it 'handles errors during buffer flushing' do
RSpec.configuration.reporter.message("Running test: handles errors during buffer flushing")
kusto = described_class.new(options)
kusto.register

allow(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).and_raise(StandardError.new("Test error"))
events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }
kusto.multi_receive_encoded(encoded_events)

expect { kusto.flush_buffer(encoded_events) }.not_to raise_error
RSpec.configuration.reporter.message("Completed test: handles errors during buffer flushing")
end

it 'flushes the buffer when max_size is reached' do
RSpec.configuration.reporter.message("Running test: flushes the buffer when max_size is reached")
Expand Down

0 comments on commit ca981ba

Please sign in to comment.