Skip to content

Commit

Permalink
Added Latch Timeout and updated java sdk version
Browse files Browse the repository at this point in the history
  • Loading branch information
MonishkaDas committed Dec 12, 2024
1 parent b624d01 commit 92edb8f
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 117 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ repositories {
// update dependencies to bom azure-sdk-bom/1.2.24

dependencies {
implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0'
implementation 'com.azure:azure-core-http-netty:1.15.1'
implementation 'com.azure:azure-core:1.49.1'
implementation 'com.azure:azure-data-tables:12.4.2'
Expand All @@ -52,7 +52,7 @@ dependencies {
implementation 'com.nimbusds:nimbus-jose-jwt:9.40'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.13'
implementation 'com.univocity:univocity-parsers:2.9.1'
implementation 'commons-codec:commons-codec:1.16.1'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
Expand Down
28 changes: 15 additions & 13 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :database, validate: :string, required: true
# Target table name
config :table, validate: :string, required: true
# Path to store failed items when max_retries is reached, set to "nil" to disable persistence to file
config :failed_items_path, validate: :string, required: true

# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
Expand Down Expand Up @@ -78,17 +75,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
config :max_interval, validate: :number, required: false , default: 10

# Maximum number of retries before the flush fails, defaults to 3
config :max_retries, validate: :number, required: false , default: 3
# Latch timeout in seconds, defaults to 60
config :latch_timeout, validate: :number, required: false, default: 60


default :codec, 'json_lines'

def register
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events|
flush_buffer(events)
end

@io_mutex = Mutex.new

final_mapping = json_mapping
Expand All @@ -98,13 +91,22 @@ def register
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor)
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, latch_timeout, executor)

# Deprecation warning for path
if @path
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
end
sleep(30)
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end
end


Expand All @@ -121,7 +123,7 @@ def multi_receive_encoded(events_and_encoded)

def close
@logger.info("Closing Kusto output plugin")
begin
begin
@buffer.shutdown unless @buffer.nil?
@logger.info("Buffer shutdown") unless @buffer.nil?
rescue => e
Expand Down
199 changes: 130 additions & 69 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,55 +1,61 @@
require 'logger'
require 'thread'
require 'csv'
require 'fileutils'
require 'securerandom'
require 'net/http'
require 'uri'

module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback)
def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
max_retries: max_retries,
failed_items_path: failed_items_path,
buffer_dir: './tmp/buffer_storage/',
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
timer: nil,
network_down: false
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}")
load_buffer_from_files
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")

# Start the timer thread
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
prepare_flush(force: true)
end
end
end

def <<(event)
while buffer_full? do
prepare_flush(force: true) # Flush when buffer is full
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

# Trigger a flush if the buffer size exceeds the maximum size
if buffer_full?
buffer_flush(force: true)
end
end

def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
prepare_flush(final: true)
flush_buffer_files
end

private
Expand All @@ -60,84 +66,139 @@ def buffer_full?
end
end

def buffer_flush(options = {})
def prepare_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
outgoing_items = []
outgoing_size = 0

@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end

if time_since_last_flush >= @buffer_config[:max_interval]
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
else
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
end

if @buffer_state[:network_down]
save_buffer_to_file(@buffer_state[:pending_items])
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
return 0
end

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end

begin
outgoing_items = []
outgoing_size = 0
if Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).any?
@buffer_config[:logger].info("Flushing all buffer state files")
flush_buffer_files
end

@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
Thread.new { perform_flush(outgoing_items) }
end

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
def perform_flush(events, file_path = nil)

@flush_mutex.lock

begin
if file_path
unless ::File.exist?(file_path)
return 0
end

if force
if time_since_last_flush >= @buffer_config[:max_interval]
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
else
@buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached")
end
begin
buffer_state = Marshal.load(::File.read(file_path))
events = buffer_state[:pending_items]
rescue => e
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
return 0
end

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
buffer_initialize
end

retries = 0
begin
@buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network")
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
rescue => e
retries += 1
if retries <= @buffer_config[:max_retries]
@buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...")
sleep 1
@buffer_config[:logger].info("Flushing #{events.size} events, #{events.sum(&:bytesize)} bytes")
@flush_callback.call(events) # Pass the list of events to the callback
@buffer_state[:network_down] = false # Reset network status after successful flush
@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{events.size} events, #{events.sum(&:bytesize)} bytes")

if file_path
::File.delete(file_path)
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
end

rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
@buffer_state[:network_down] = true

while true
sleep(2) # Wait before checking network availability again
if network_available?
@buffer_config[:logger].info("Network is back up. Retrying flush.")
retry
else
@buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes")
handle_failed_flush(outgoing_items)
end
end

ensure
@flush_mutex.unlock
end
end

def network_available?
begin
uri = URI('http://www.google.com')
response = Net::HTTP.get_response(uri)
response.is_a?(Net::HTTPSuccess)
rescue
false
end
end

def handle_failed_flush(items)
if @buffer_config[:failed_items_path].nil? || @buffer_config[:failed_items_path] == "nil"
@buffer_config[:logger].warn("No failed_items_path configured. The failed items are not persisted. Data loss may occur.")
else
def save_buffer_to_file(events)
buffer_state_copy = {
pending_items: events,
pending_size: events.sum(&:bytesize)
}
begin
::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists
file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log")
::File.open(file_path, 'w') do |file|
file.write(Marshal.dump(buffer_state_copy))
end
@buffer_config[:logger].info("Saved #{events.size} events to file: #{file_path}")
rescue => e
@buffer_config[:logger].error("Failed to save buffer to file: #{e.message}")
end
end

def load_buffer_from_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
begin
::File.open(@buffer_config[:failed_items_path], 'a') do |file|
items.each do |item|
file.puts(item)
end
end
@buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}")
buffer_state = Marshal.load(::File.read(file_path))
@buffer_state[:pending_items].concat(buffer_state[:pending_items])
@buffer_state[:pending_size] += buffer_state[:pending_size]
::File.delete(file_path)
rescue => e
@buffer_config[:logger].error("Failed to store items: #{e.message}")
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
end
end
@buffer_config[:logger].info("Loaded buffer state from files")
end

def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
def flush_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
@buffer_config[:logger].info("Flushing from buffer state file: #{file_path}")
Thread.new { perform_flush([], file_path) }
end
end
end
end
Expand Down
Loading

0 comments on commit 92edb8f

Please sign in to comment.