introduction to the concept related to the use of Kafka
Kafka est un système distribué, possèdant une architecture resiliente et tolérant aux pannes. capable de supporter des gros debits des messages par seconde.
- Messaging system
- Activity tracking
- Collecter des métriques àpartir de nombreux emplacements
- Journaux d'application
- Traitement de flux
- Découpler les dépendances système et microservices (Spark, Flink, Hadoop)
- etc...
- a particular stream of data, it's like a table in a database (without all the constraints)
- You can have as many topics as you want
- A topic is identified by its name
- kafka topics are immutable : once data is written to a partition, it cannot be changed. Vous pouvez continuer à écrire sur le partition, mais pas mettre à jour ni supprimer
- data kept only for a limited time (default is one week - configurable)
- Order is guaranteed only within partition (not across partitions)
- Producers write data to topics (which are made of partitions)
- In case of kafka broker failures, producers will automatically recover
- Producers can choose to send a key with the message (String, number binary, etc...)
- if key = null, data is sent round robin (partition 0, then 1, then 2,...)
- if key != null, then all messages for that will always go to the same partition (hashing)
- A key are typically sent if you need message ordering for a specific field
- In the defaut kafka partitionner, the keys are hashed using the murmur2 algorithm, will the formula below for the curious:
targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartition - 1)
- Consumers read data from a topic (identified by name) - ce n'est pas kafka qui transmet automatiquement les données aux consumers, mais ils doivent le demander (pull model)
- Consumers automatically know wich broker to read from
- In case of broker failures, consumers know how to recover
- Data is read in order from low to high offset within each partitions
- if a consumer dies, it willbe able be able to read back from where it left off, thanks to the committed consumer offsets !
- Deserializer indicates how to transfor bytes into objects/data
- They are used on the value and the key of the message
- Common deserializers (JSON, Avro, protobuf, etc...)
- the serialization/deserialization type must not change during a topic lifecycle (create a new topic instead)
- A kafka cluster is composed of multiple brokers (servers) - ils reçoivent et envoient des données
- Each broker is identified with its ID (integer)
- Each broker contains certain topic partitions
- il n'est pas important de connaître tous les brokers du cluster, il suffit juste de se connaître comment se connecter à un broker et les clients se connecteront automatiquement aux autres.
- Each broker knows about all brokers, topics and partitions (metadata)
-
Producer can choose to receive acknowledgement of data writes :
- acks = 0 : Producer won't wait for acknowledgement (possible data lose)
- acks = 1 : producer will wait for leader acknowledgement (limited data loss)
- acks = all : leader + replicas acknowledgement (no data loss)
As a rule, for a replication factor of N, you can permanently lose up to N-1 brokers and still recover your data
Producer Default Partition when key = null
Sticky Partition improves the performance of the producer especially when high throughput when the key is null
- Zookeeper manages brokers (keeps a list of them)
- Zookeeper helps in performing leader election for partitions
- Zookeeper sends notifications to kafka in case of changes (e.g. new topic broker dies, broker comes up, delete topics, etc...)
- Zookeeper by design operates with an odd number of servers (1, 3, 5, 7)
- Zookeeper has a leader (writes) the rest of the servers are followers (reads)
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Start Zookeeper and kafka server
zookeeper-server-start.sh ~/kafka_<version>/config/zookeeper.properties
kafka-server-start.sh ~/kafka_<version>/config/server.properties
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic [topic_name] \
--partitions 3 --replication-factor 1
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic [topic_name]
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic [topic_name] \
--producer-property acks=all --property parse.key=true --property key.separator=:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning
[base] --formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true --property print.key=true \
--property print.value=true --from-beginning
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group [group_name]
[base] --group [group_name] --reset-offsets --execute --to-earliest \
[--all-topics | --topic {topic_name}]
[base] --group [group_name] --reset-offsets --execute --shift-by -N \
[--all-topics | --topic {topic_name}]
retry.backoff.ms (default 100ms) : combien de temps attendre avant de retenter
delivery.timeout.ms (default 120 000 ms) : Records will be failed if they can't be acknowledgement within time
max.in.flight.requests.per.connection (default 5) : For this, you can set the setting while controls how many produce requests can be made in parallel; set it to 1 if you ensure ordering (may impact throughput)
Since kafka 3.0, the producer is "safe" by default :
compression.type=producer (default), the broker takes the compressed batch from the producer client and writes it directly to the topic's log fil without recompressing the data.
compression.type=none : all batches are decompressed by the broker
compression.type=lz4 : (for exemple)
- if it's matching the producer setting, data is stored on disk as is
- if it's different compression setting, batches are decompressed by the broker and then recompressed using the compression algorithm specified
linger.ms : (default 0) : how long to wait until we send a batch. Adding a small number for example 5 ms helps add more messages in the batch at the expense of latency.
batch.size (default 16KB) : if a batch is filled before linger.ms, increase the batch size. Increasing a batch size to something like 32KB or 64KB can help increasing the compression, throughput, and efficiency of requests.
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024))
max.block.ms=60000 : the time .send() will block until throwing an exception
Exceptions are thrown when :
- At least once : offsets are committed as soon as the message batch is received If the processing goes wrong, the message will be lost (it won't be read again)
- At least once : offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can rsult in duplicate processing of messages. Make sure your processing is idempotent.
- Exactly once : can be achieved for kafka => kafka workflows using the transactional API (easy with kafka streams API). For kafka => sink workflows, use an idempotent consumer.
For most applications you should use at least once processing (we'll see in pratice how to do it) and ensure your transformations / processing are idempotent.
enable.auto.commit=true & synchronous processing of batches
while(true){
List<Records> batch = consumer.poll(Duration.ofMillis(100));
doSomethingSynchronous(batch)
}
while(true){
List<Records> batch = consumer.poll(Duration.ofMillis(100));
if(isReady(batch)){
doSomethingSynchronous(batch);
consumer.commitAsynch
}
}
heart.interval.ms (default 3 seconds) : howw often to send heartbeats.
Usually set to 1/3rd of session.timeout.ms
session.timeout.ms (default 45 seconds kafka 3.0+) : Heartbeats are sent periodically to the broker; If no heartbeat is sent during that period, the consumer is considered dead
max.poll.interval.ms (default 5 minutes) : Maximun amount of time between two .poll() calls before declaring ths consumer died.
max.poll.records (default 500) : controls how many records to receive per poll request; increase if your messages are very small and have a lot of available RAM; Lower if it takes you too much time to process records.
fetch.min.bytes (default 1) : controls how much data you want to pull at least on each request; helps improving throughput and decreasing request number; At the cost of latency.
fetch.max.wait.ms (default 500) : the maximum amount of time the kafka broker will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.
This means that until the requirement of etch.min.bytes to be satisfied, you will have up to 500 ms of latency before the fetch returns datat to the consumer (e.g. introducing a potential delay to be more efficient in requests)
max.partition.fetch.bytes (default 1MB) : The maximum amount of data per partition the server will return.
If you read from 100 partitions, you'll need a lot of memory (RAM)
fetch.max.bytes (default 55MB) : Maximum data returned for each fetch request.
If you have available memory, try increasing fetch.max.bytes to allow the consumer to read more data in each request.
log.segment.bytes (default 1GB) : the max size of a single segment in bytes.
log.segment.ms (default 1 week) : the time kafka will wait before committing the segment if not full
Policy 1: log.cleanup.policy=delete (kafka default for all users topics)
Delete based on age of data (default is a week) Delete based on max size of log (default is -1 == infinite)
Policy 2 : log.cleanup.policy=compact (kafka default for topic __consumer_offsets)
log.cleanup.policy=compact is compacted by :segment.ms (default 7 days) : max amount of time to wait to close active segment.segment.bytes (default 1GB) : max size of a segmentmin.compaction.log.ms (default 0) : how long to wait before a message can be compacted.delete.retention.ms (default 24 hours) : wait before deleting data marked for compactionmin.cleanable.dirty.ratio (default 0.5) : higher => less, more efficient cleaning.lower => opposite
kafka-configs --bootstrap-server localhost:9092 --entity-type [topics | users] \
--entity-name [topic_name] --alter --add-config min.insync.replicas=2