Skip to content

Commit

Permalink
Added temporary file buffer for persistant storage
Browse files Browse the repository at this point in the history
Updated upload_async and upload to raise errors during network downtime
to trigger rescue block in flush buffer so the data sent for flushing
can be restored and the flush can be attempted later.

Added file buffer to store data ONLY when flush fails due to network
issues. Once the network is back online the each file in the buffer is
flushed and deleted first and the regular in-memory buffer is used post
that.
  • Loading branch information
MonishkaDas committed Oct 24, 2024
1 parent 07fc531 commit 64b4d34
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 58 deletions.
2 changes: 1 addition & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
default :codec, 'json_lines'

def register
# Initialize the custom buffer with size, interval, and buffer file
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end
Expand Down
91 changes: 72 additions & 19 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require 'logger'
require 'thread'
require 'fileutils'
require 'securerandom'

module LogStash
module Outputs
Expand All @@ -8,24 +10,33 @@ def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
buffer_dir: './tmp/buffer_storage/',
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
timer: nil,
network_down: false
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
load_buffer_from_files
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")

# Start the timer thread after a delay to ensure initializations are completed
Thread.new do
sleep(10)
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
end
end

def <<(event)
Expand All @@ -37,15 +48,14 @@ def <<(event)
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

buffer_flush
end

def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
clear_buffer_files
end

private
Expand All @@ -67,7 +77,6 @@ def buffer_flush(options = {})
end

items_flushed = 0
backoff_interval = 1

begin
outgoing_items = []
Expand All @@ -92,19 +101,18 @@ def buffer_flush(options = {})

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end

begin
@buffer_config[:logger].info("Attempting to flush #{outgoing_items.size} items to the network")
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
@buffer_config[:logger].info("Successfully flushed #{outgoing_items.size} items to the network")
@buffer_state[:network_down] = false # Reset network status after successful flush
flush_buffer_files # Flush buffer files if any exist
rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
@buffer_config[:logger].error(e.backtrace.join("\n"))
sleep backoff_interval
backoff_interval = [backoff_interval * 2, 60].min # Exponential backoff with a max interval of 60 seconds
retry
@buffer_state[:network_down] = true
save_buffer_to_file(outgoing_items)
end

@buffer_state[:last_flush] = Time.now.to_i
Expand All @@ -118,9 +126,54 @@ def buffer_flush(options = {})
items_flushed
end

def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
def save_buffer_to_file(events)
buffer_state_copy = {
pending_items: events,
pending_size: events.sum(&:bytesize)
}

::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists
file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log")
::File.open(file_path, 'w') do |file|
file.write(Marshal.dump(buffer_state_copy))
end
@buffer_config[:logger].info("Saved buffer state to file: #{file_path}")
end

def load_buffer_from_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
begin
buffer_state = Marshal.load(::File.read(file_path))
@buffer_state[:pending_items].concat(buffer_state[:pending_items])
@buffer_state[:pending_size] += buffer_state[:pending_size]
::File.delete(file_path)
rescue => e
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
end
end
@buffer_config[:logger].info("Loaded buffer state from files")
end

def flush_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
begin
buffer_state = Marshal.load(::File.read(file_path))
@buffer_config[:logger].info("Flushed from file: #{file_path}")
@flush_callback.call(buffer_state[:pending_items])
::File.delete(file_path)
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
rescue => e
@buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}")
break
end
end
end

def clear_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
::File.delete(file_path)
end
@buffer_config[:logger].info("Cleared all buffer state files")
end
end
end
Expand Down
82 changes: 44 additions & 38 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end
end
#
@logger.debug(Gem.loaded_specs.to_s)
# Unfortunately there's no way to avoid using the gem/plugin name directly...
name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}"
Expand Down Expand Up @@ -123,67 +122,74 @@ def upload_async(data)
if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
@logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
end

exception = nil
@workers_pool.post do
LogStash::Util.set_thread_name("Kusto to ingest data")
begin
upload(data)
rescue => e
@logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
exception = e
end
end

# Wait for the task to complete and check for exceptions
@workers_pool.shutdown
@workers_pool.wait_for_termination

if exception
@logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace)
raise exception
end
rescue Exception => e
@logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
end

def upload(data)
@logger.debug("Sending data to Kusto")

# TODO: dynamic routing
# file_metadata = path.partition('.kusto.').last
# file_metadata_parts = file_metadata.split('.')

# if file_metadata_parts.length == 3
# # this is the number we expect - database, table, json_mapping
# database = file_metadata_parts[0]
# table = file_metadata_parts[1]
# json_mapping = file_metadata_parts[2]

# local_ingestion_properties = Java::KustoIngestionProperties.new(database, table)
# local_ingestion_properties.addJsonMappingName(json_mapping)
# end

@logger.info("Sending data to Kusto")

if data.size > 0
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
@kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
ingestionLatch = java.util.concurrent.CountDownLatch.new(1)

Thread.new do
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)

# Check the ingestion status
status = ingestion_result.getIngestionStatusCollection.get(0)
if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued
raise "Failed upload: #{status.errorCodeString}"
end
@logger.info("Final ingestion status: #{status.status}")
rescue => e
@logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace)
if e.message.include?("network")
raise e
end
ensure
ingestionLatch.countDown()
end
end

# Wait for the ingestion to complete with a timeout
if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS)
@logger.error('Ingestion timed out, possible network issue.')
raise 'Ingestion timed out, possible network issue.'
end
else
@logger.warn("Data is empty and is not ingested.")
end
@logger.debug("Data sent to Kusto.")
@logger.info("Data sent to Kusto.")
rescue => e
# When the retry limit is reached or another error happens we will wait and retry.
#
# Thread might be stuck here, but I think it's better than losing anything
# it's either a transient error or something bad really happened.
@logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, backtrace: e.backtrace)
retry_count = 0
max_retries = 5
begin
sleep (2 ** retry_count) * RETRY_DELAY_SECONDS
retry_count += 1
retry if retry_count <= max_retries
rescue => retry_error
@logger.error('Retrying failed.', exception: retry_error.class, message: retry_error.message, backtrace: retry_error.backtrace)
raise retry_error
end
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e # Raise the original error if ingestion fails
end

def stop
@workers_pool.shutdown
@workers_pool.wait_for_termination(nil) # block until its done
end
end
end
end

0 comments on commit 64b4d34

Please sign in to comment.