Karafka 2.0 is a major rewrite that brings many new things to the table but removes specific concepts that could have been better when I created them.
In this upgrade document, I will describe the most noticeable changes that require manual intervention to handle the upgrade process. This document does not cover new functionalities but aims to guide the upgrade process.
Before reading this article, please ensure you've read the Karafka framework 2.0 announcement blog post so you are familiar with the overall scope of changes.
Please note, that there are many aspects of Karafka upgrade that are specific to your application. While we may not have covered all of the cases here, do not hesitate to ask questions either via Slack or by creating a Github issue.
Those upgrade notes will be extended whenever someone points out any missing content.
Karafka 2.0 is multi-threaded. This means that all of your code needs to be thread-safe.
Please keep in mind that Karafka does not provide warranties, that the same consumer will always run in the same Ruby thread, hence code like this:
def consume
Thread.current[:accumulator] ||= []
Thread.current[:accumulator] += messages.payloads
end
may cause severe problems and is not recommended.
To safely upgrade from Karafka from 1.4
to 2.0
, you need to shut down completely all the consumers of the application you are upgrading.
- Update your
karafka
gem version reference
# Replace
gem 'karafka', '~> 1.4'
# with
gem 'karafka', '~> 2.0'
- Remove the
sidekiq-backend
reference
# This needs to be removed
gem 'karafka-sidekiq-backend', '~> 1.4'
- Update
karafka-testing
version reference
# Replace
gem 'karafka-testing', '~> 1.4'
# with
gem 'karafka-testing', '~> 2.0'
- Run
bundle install
- Remove WaterDrop setup code from your
karafka.rb
:
# This can be safely removed
monitor.subscribe('app.initialized') do
WaterDrop.setup { |config| config.deliver = !Karafka.env.test? }
end
- Remove direct WaterDrop listener references from your
karafka.rb
# This can be safely removed
Karafka.monitor.subscribe(WaterDrop::Instrumentation::LoggerListener.new)
Karafka 2.0 is powered by librdkafka. Because of that, we've decided to split the settings into two sections:
karafka
options - options directly related to the Karafka framework and its components.librdkafka
options - options related to librdkafka.
All karafka
options should be set at the root level, while all the librdkafka options need to go under the kafka
scope:
class KarafkaApp < Karafka::App
setup do |config|
config.client_id = 'my_application'
# librdkafka configuration options need to be set as symbol values
config.kafka = {
'bootstrap.servers': '127.0.0.1:9092'
}
end
end
Below you can find some of the most significant naming changes in the configuration options:
Root options:
start_from_beginning
is nowinitial_offset
and accepts either'earliest'
or'latest'
ssl_ca_certs_from_system
is no longer needed butkafka
security.protocol
needs to be set tossl
batch_fetching
is no longer neededbatch_consuming
is no longer neededserializer
is no longer needed becauseResponders
have been removed from Karafkatopic_mapper
is no longer needed as concept of mapping topic names has been removed from Karafkabackend
is no longer needed because Karafka is now multi-threadedmanual_offset_management
needs to be set now on a per topic basis
Kafka options:
kafka.seed_brokers
is nowbootstrap.servers
without the protocol definitionkafka.heartbeat_interval
is no longer needed.SASL
andSSL
options changes are described in their own section.- Check out the configuration details of librdkafka for all the remaining options.
Sending heartbeats is no longer needed. Both #trigger_heartbeat
and trigger_heartbeat!
can be safely removed.
Please read the Deployment documentation to see appropriate configuration for given environment.
You can optionally check the librdkafka
configuration documentation as well.
If you still struggle, feel free to reach out to us either via Slack or by creating a Github issue.
In Karafka 1.4 you could set config.manual_offset_management = true
to make all the topics work with manual offset management.
This option is no longer available in 2.0
and needs to be set per topic as followed:
class KarafkaApp < Karafka::App
routes.draw do
consumer_group :events do
topic :user_events do
consumer EventsConsumer
manual_offset_management true
end
end
end
end
Karafka 2.0 introduces seamless Ruby on Rails integration via Rails::Railte
without needing extra configuration.
Your karafka.rb
should contain only Karafka-specific stuff, and the rest will be done automatically. This means you need to reverse the manual setup steps that were needed for Karafka 1.4
to work with Ruby on Rails.
- Remove any changes in the
config/environment.rb
of your Rails application related to Karafka:
# environment.rb - this needs to be removed
require Rails.root.join(Karafka.boot_file)
- Remove those lines from your
karafka.rb
:
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!
if Rails.env.development?
Rails.logger.extend(
ActiveSupport::Logger.broadcast(
ActiveSupport::Logger.new($stdout)
)
)
end
- Remove the Rails logger assignment from your
karafka.rb
:
class KarafkaApp < Karafka::App
setup do |config|
# ...
# Remove this line
config.logger = Rails.logger
end
end
- Remove the code reloader (if used):
Karafka.monitor.subscribe(
Karafka::CodeReloader.new(
*Rails.application.reloaders
)
)
- Remove the
KarafkaApp.boot!
from the end ofkarafka.rb
:
# Remove this
KarafkaApp.boot!
All of the consumer callbacks were removed. They were replaced with the following lifecycle consumer methods:
#revoked
- runs the code you want to execute when a given topic partition has been revoked from a given consumer instance.#shutdown
- runs the code you want to execute when the Karafka process is being shut down.
You can still use instrumentation hooks if you need to perform any consumer, not related actions, but please be aware that this code will not run in the same thread as the consumption.
Karafka 1.4
published errors under several instrumentation keys. Karafka 2.0
publishes all the errors under the same instrumentation event name: error.occurred
.
The payload there always contains a type
field that can be used to understand the origin of the issue.
Simple routing style creates now a single consumer group for all the topics defined on a root level.
This is a major change that can heavily impact your system.
If you used the "non consumer group" based routing where all the topics would be defined on the root level of the routing:
class KarafkaApp < Karafka::App
routes.draw do
topic :user_events do
consumer UsersEventsConsumer
end
topic :system_events do
consumer SystemEventsConsumer
end
topic :payment_events do
consumer PaymentEventsConsumer
end
end
end
Karafka 1.4
would create for you three separate consumer groups while Karafka 2.0
will create one:
# Karafka 1.4
Karafka::App.consumer_groups.count #=> 3
# Karafka 2.0
Karafka::App.consumer_groups.count #=> 1
To mitigate this you need to:
- List all the groups names while in
1.4
:
Karafka::App.consumer_groups.map(&:name)
#=> ['user_events', 'system_events', 'payment_events']
- Replicate this setup in Karafka
2.0
by creating three separate consumer groups directly:
class KarafkaApp < Karafka::App
routes.draw do
consumer_group :user_events do
topic :user_events do
consumer UsersEventsConsumer
end
end
consumer_group :system_events do
topic :system_events do
consumer SystemEventsConsumer
end
end
consumer_group :payment_events do
topic :payment_events do
consumer PaymentEventsConsumer
end
end
end
end
- After applying the above changes, validate, that the following command gives you same results under
1.4
and2.0
:
Karafka::App.consumer_groups.map(&:id)
#=> ['example_app_user_events', 'example_app_system_events', 'example_app_payment_events']
!!! note ""
Unless you used Heroku, you can probably skip this section.
Topic mapping is no longer supported. Please prefix all of your topic names with the env KAFKA_PREFIX
.
You can read more about integrating Karafka 2.0 with Heroku here.
Quote from Mike:
- Don't daemonize, start Karafka with a process supervisor like systemd, upstart, foreman, etc.
- Log only to STDOUT, the entity starting Karafka can control where STDOUT redirects to, if any.
- PID files are a legacy of double forking and have no reason to exist anymore.
Karafka 2.0 is multi-threaded.
If you use sidekiq-backend
, you have two options:
- Pipe the jobs to Sidekiq yourself
- Elevate Karafka's multi-threading capabilities
Responders were a dead end.
Please use Waterdrop to produce messages via Karafka.producer
:
- Replace direct responders usage with
Karafka.producer
usage
class ExampleResponder < ApplicationResponder
topic :users_notified
def respond(user)
respond_to :users_notified, user
end
end
ExampleResponder.call(User.last)
With:
Karafka.producer.produce_async(
topic: 'users_notified',
payload: user.to_json,
partition_key: user.id.to_s
)
- Replace all the
#respond_with
consumer responder invocations with directKarafka.producer
code
You can achieve this functionality by slightly altering your consumer:
class SingleMessageBaseConsumer < Karafka::BaseConsumer
attr_reader :message
def consume
messages.each do |message|
@message = message
consume_one
mark_as_consumed(message)
end
end
end
class Consumer < SingleMessageBaseConsumer
def consume_one
puts "I received following message: #{message.payload}"
end
end
Here you can find more details about this.
Karafka 2.0
no longer uses any of the dry-rb
ecosystem libraries. If you've relied on them indirectly via Karafka, you will have to define them in your Gemfile
.
#params_batch
is now #messages
:
def consume
# params_batch.each do |message|
messages.each do |message|
@message = message
consume_one
mark_as_consumed(message)
end
end
Instrumentation and monitoring are often application specific.
The recommendation here is to revisit your current integration and align it with the events published by Karafka and to follow the 2.0
instrumentation guidelines document.
Karafka::Instrumentation::StdoutListener
is nowKarafka::Instrumentation::LoggerListener
Alongside the recent updates to Karafka, the karafka-testing
gem has also been updated. The comprehensive guides for the testing gem can be accessed here.
bootstrap.servers
setting underkafka
should not have a protocol, and it needs to be a string with comma-separated hosts.
BAD:
# protocol should not be here
config.kafka = { 'bootstrap.servers': 'kafka://my.kafka.host1:9092' }
BAD:
# it should be a comma separate string, not an array
config.kafka = { 'bootstrap.servers': ['my.kafka.host1:9092', 'my.kafka.host2:9092'] }
GOOD:
config.kafka = { 'bootstrap.servers': 'my.kafka.host1:9092,my.kafka.host2:9092' }
- Make sure not to overwrite your settings similar to how it was done in the section below.
BAD:
config.kafka = {
'bootstrap.servers': ENV.fetch('BROKERS').split(',')
}
# This section will FULLY overwrite the `bootstrap.servers`.
# You want to merge those sections and NOT overwrite.
if Rails.env.production?
config.kafka = {
'ssl.ca.pem': ENV.fetch('CA_CERT'),
'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'),
'ssl.key.pem': ENV.fetch('CLIENT_CERT_KEY'),
'security.protocol': 'ssl'
}
end
GOOD:
config.kafka = {
'bootstrap.servers': ENV.fetch('BROKERS').split(',')
}
if Rails.env.production?
config.kafka.merge!({
'ssl.ca.pem': ENV.fetch('CA_CERT'),
'ssl.certificate.pem': ENV.fetch('CLIENT_CERT'),
'ssl.key.pem': ENV.fetch('CLIENT_CERT_KEY'),
'security.protocol': 'ssl'
})
end