Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test new configs logstash #77

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open

Conversation

monishkadas-ms
Copy link
Collaborator

No description provided.

Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb
end
end

def flush

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add error handling in the flush method. If the flush callback raises an exception, it could cause the buffer to stop working properly.

def flush
return if @buffer.empty?

@flush_callback.call(@buffer)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, during gracefully shutdown of the buffer, how do we ensure all events are flushed before the application exits? may be add a graceful shutdown

class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size
@max_interval = max_interval

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the flush thread sleeps for the entire @max_interval. If the interval is large, it might introduce a considerable delay the buffer flush. Can we do something like this

def start_flusher_thread
@flusher_thread = Thread.new do
loop do
sleep @max_interval
flush_if_needed
end
end
end

def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
flush
end
end
end

Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb

# Maximum interval (in seconds) before the buffer gets flushed, defaults to 60
config :max_interval, validate: :number, default: 60

default :codec, 'json_lines'

def register
require 'fileutils' # For mkdir_p
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it all add up ? It is just a buffer that is added.

Currently there is events written to file and then these files are flushed. With the new approach these files should go away

Right now the code changes are

a) Buffer that is taken from LA plugin
b) We write the events to the buffer and flush. How is this message encoded ?
c) can we remove the file part ? or the file buffers based on size on file ?
d) Where are the Tests ?
e) Time based flushes ?

monishkadas-ms and others added 2 commits September 23, 2024 21:32
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb and ingestor.rb with the implementation of memory based
buffer and flushing and removed file based buffer

This comment has been minimized.

This comment has been minimized.

This comment has been minimized.

** Updated the kusto_spec.rb test

This comment has been minimized.

This comment has been minimized.

This comment has been minimized.

Removed the temp file buffer and added retry to prevent data loss

This comment has been minimized.

Added exponential backoff to retries. Removed max_retries.

This comment has been minimized.

Updated upload_async and upload to raise errors during network downtime
to trigger rescue block in flush buffer so the data sent for flushing
can be restored and the flush can be attempted later.

Added file buffer to store data ONLY when flush fails due to network
issues. Once the network is back online the each file in the buffer is
flushed and deleted first and the regular in-memory buffer is used post
that.
Copy link

Unit Test Results

  1 files  ±0    1 suites  ±0   6s ⏱️ +3s
23 tests +1  23 ✔️ +1  0 💤 ±0  0 ❌ ±0 

Results for commit 64b4d34. ± Comparison against base commit 73fe9c0.

Persists the incoming data during network down time. Checks for network
availabilty before retrying.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants