Skip to content

glazkovalex/Rebus.Kafka

Repository files navigation

Rebus.Kafka

install from nuget

Provides a Apache Kafka transport implementation for Rebus.

Getting started Rebus.Kafka:

  1. Implement Getting started for Rebus
  2. Add transport as UseKafka
builder.RegisterRebus((configurer, context) => configurer
	.Transport(t => t.UseKafka("localhost:9092", "InputQueueName", "groupName"))
);

All parameters for the producer and the consumer can be specified in detail. See this example.

It is possible to configures Rebus to use Apache Kafka to transport messages as a one-way client (i.e. will not be able to receive any messages). See this example.

builder.RegisterRebus((configurer, context) => configurer
	.Transport(t => t.UseKafkaAsOneWayClient("localhost:9092"))
);

See examples and tests for other usage examples.

This provider supports the following Rebus bus functions:

Many others are probably supported too, but I haven't checked.

Additional features:

  • When using the Rebus package.Service Provider, to avoid deadlock, events should be subscribed await bus.Subscribe<TestMessage>() to only after the bus is started, but NOT in the "onCreate" rebus event!
  • At the request of the transport users, the Rebus.Kafka transport automatically creates topics by default. However, I do not recommend using allow.auto.create.topics=true for production! To disable allow.auto.create.topics, pass your ConsumerConfig or ConsumerAndBehaviorConfig configuration to the transport with the AllowAutoCreateTopics = false parameter disabled.
  • The Rebus.Kafka transport has a new overload with the ConsumerAndBehaviorConfig parameter instead of ConsumerConfig. This new configuration type contains transport behavior settings. So far, it has a single CommitPeriod parameter that defines the period after which the commit offset will be set in Apache Kafka. Here is an example of using it
  • UseAttributeOrTypeFullNameForTopicNames simplifies naming the topic of events by Name from TopicAttribute({TopicName}) or to "---Topic---.{Spacename}.{TypeName}".
  • KafkaAdmin which allows you to programmatically create a topic with many partitions and delete topics.
  • In multithreaded message processing, the order in which messages are processed may differ from the order of messages in the queue. If the order is very important, then use single-threaded processing: .Options(o => o.SetMaxParallelism(1))

Note:

  • So as to interact with the Apache Kafka requires the unmanaged "librdkafka", you need to install the appropriate version of the package "librdkafka.redist". If this unmanaged "librdkafka" is not found automatically, you must load it before you can use Rebus.Kafka for the first time as follows:
if (!Library.IsLoaded)
	Confluent.Kafka.Library.Load(pathToLibrd);
  • Due to the features of Apache Kafka, after subscribing or unsubscribing to messages for some time while there is very slowly rebalancing of clients in groups, lasting several seconds or more. therefore, you should avoid the scenario of dynamic subscription to a single reply message, sending a single message to the recipient, and unsubscribing from the message after receiving a single reply. Since this scenario will work very slowly. I recommend that you subscribe to all your messages only when the application starts and that you do not change subscribers in runtime, then the work of transport will be fast.

Log of important changes:

V 4.0.0 (29.11.2024)

  1. Updated to .Net 9 and up to Rebus version 8.7.1, which supports .Net 9
  2. Deleted the legacy code and the extra dependency

V 3.0.1 (12.01.2024)

  1. Refactoring for Rebus version 8 with the corresponding API change;
  2. Implemented RetryStrategy - automatic retries and error handling. Confirmations of receipt of messages are now sent not after they are received, but only after successful processing of messages or sending them to the error topic;
  3. Add "transaction" support. More precisely, not transactions, because Apache Kafka does not support transactions, but delayed sending of all transaction messages before calling await scope.Complete Async() or canceling the sending of all "sent" messages at the end of the transaction block without calling await scope.Complete Async(). This convenience slows down the maximum performance of sending all messages by half, even those messages that are sent without transactions.

V 2.0.0 (18.08.2023)

  1. Improving data transfer efficiency;
  2. The format of transport messages has changed. In them now the key is not Null, but string. The messages are incompatible with previous versions of the transport!
  3. Message headers are now supported;
  4. Refactoring for the current version of Apache Kafka "confluentinc/cp-kafka:7.0.1";
  5. Transport forcibly creates missing topics if Consumer.Config.AllowAutoCreateTopics == true; However, I do not recommend using allow.auto.create.topics=true for production!

V 1.6.3 (1.04.2021)

  1. The Rebus.Kafka transport has a new overload with the ConsumerAndBehaviorConfig parameter instead of ConsumerConfig. This new configuration type contains transport behavior settings. So far, it has a single CommitPeriod parameter that defines the period after which the commit offset will be set in Apache Kafka. Here is an example of using it

  2. In the summer of 2020, the Librdkafka v1.5.0 library was updated, which was a change unexpected for many users of the Rebus.Kafka transport.

    Consumer will no longer trigger auto creation of topics, allow.auto.create.topics=true may be used to re-enable the old deprecated functionality:

    At the request of the transport users, I enabled the previous transport behavior by default. Now the Rebus.Kafka transport automatically creates topics by default as before. However, I do not recommend using allow.auto.create.topics=true for production! To disable allow.auto.create.topics, pass your ConsumerConfig or ConsumerAndBehaviorConfig configuration to the transport with the AllowAutoCreateTopics = false parameter disabled.

ToDo:

  • Schema Registry support in Kafka: Avro, JSON and Protobuf
  • In the future, the value from the message header "kafka-key" or, maybe, from the message property marked with the KafkaKey attribute will be inserted into the Apache Kafka message key. This will be useful for partitioning.
  • Start the transport from user-defined offsets for topics and partitions.

If you have any recommendations or comments, I will be glad to hear.

About

Apache Kafka transport for Rebus

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages