This project show how to use KSQL (Streaming SQL Engine for Apache Kafka) to stream processing.
For you to use this repository you will need the following softwares:
- Python
- Pip
- Docker
- Docker Compose
- Zookeeper
- Kafka
- KSQL Server
However, only Docker and Docker Compose need is installed in your machine. All Kafka ecosystem will be embedded via docker images.
- Install Python and Pip
- Install Docker and Docker Compose
- Load Images
- Create Topics
- Start Simulator
The installation process of the Python and Pip is very easy. So this tutorial dont't will cover this steps. I recommend you look for more information in www.python.org and pip.pypa.io.
After you install Python and Pip run the command below to install all dependencies need to execute click_simulator.py
application. This code is responsible to simulate the click events into an web page. It will generate unbounded click events, sending a flow continuous messages to a Kafka topic.
pip install -r requirements.txt
This tutorial does not demonstrate the installation process for Docker and Docker Compose. I strongly recommend you to visit the Docker installation link for more informations. Please click here.
docker-compose up
or
docker-compose up -d
The last command allow you to run docker-compose
in the background.
docker-compose exec kafka kafka-topics --create --topic com.mywebsite.streams.pages --bootstrap-server localhost:9092
docker-compose exec kafka kafka-topics --create --topic com.mywebsite.streams.clickevents --bootstrap-server localhost:9092
python click_simulator.py
If you executed all steps correctly. You will see an image similar that below.
Starting application
Message: {"email": "anoble@yahoo.com", "timestamp": "1986-03-10T16:38:40", "uri": "https://mitchell.info/login.php", "number": 358}
Message: {"email": "leonardpatrick@mason-clark.info", "timestamp": "1971-04-25T10:09:26", "uri": "https://www.bailey.com/search/about/", "number": 431}
Message: {"email": "morriskatie@villarreal-villa.biz", "timestamp": "1996-11-22T00:12:20", "uri": "http://www.woodard.info/terms.php", "number": 838}
Message: {"email": "kenneth79@rogers.info", "timestamp": "2005-10-24T22:16:59", "uri": "http://www.king.com/wp-content/blog/blog/index/", "number": 793}
Message: {"email": "wbailey@wu-martinez.net", "timestamp": "1995-06-20T12:44:44", "uri": "https://www.smith-neal.com/categories/login/", "number": 509}
Message: {"email": "tkennedy@hall-wolfe.org", "timestamp": "2009-01-27T14:04:20", "uri": "https://www.marshall-holmes.info/", "number": 336}
Message: {"email": "steven15@yahoo.com", "timestamp": "2019-12-13T16:09:11", "uri": "https://www.sims.net/main.html", "number": 263}
Message: {"email": "hobbsmario@hotmail.com", "timestamp": "1990-08-16T05:09:04", "uri": "http://www.smith.com/search/tags/explore/about.jsp", "number": 61}
...
docker-compose exec ksql ksql http://localhost:8088
After you connect to KSQL Server you will see the image below:
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2019 Confluent Inc.
CLI v5.4.1, Server v5.4.1 located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Show all topics
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------
com.mywebsite.streams.clickevents | 5 | 1
com.mywebsite.streams.pages | 1 | 1
---------------------------------------------------------------------
Show all streams
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
--------------------------------------------------------
CLICKEVENTS | com.mywebsite.streams.clickevents | JSON
--------------------------------------------------------
If you need run it in the background mode.
CREATE STREAM clickevents
(email VARCHAR,
timestamp VARCHAR,
uri VARCHAR,
number INTEGER)
WITH (KAFKA_TOPIC='com.mywebsite.streams.clickevents',
VALUE_FORMAT='JSON');
CREATE TABLE pages
(uri VARCHAR,
description VARCHAR,
created VARCHAR)
WITH (KAFKA_TOPIC='com.mywebsite.streams.pages',
VALUE_FORMAT='JSON',
KEY='uri');
CREATE TABLE a_pages AS
SELECT * FROM pages WHERE uri LIKE 'http://www.a%';
SELECT * FROM clickevents EMIT CHANGES;
ksql> DESCRIBE PAGES;
Name : PAGES
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
URI | VARCHAR(STRING)
DESCRIPTION | VARCHAR(STRING)
CREATED | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Like all Kafka Consumers, KSQL by default begins consumption at the latest offset. This can be a problem for some scenarios. In the following example we're going to create a pages table -- but -- we want all the data available to us in this table. In other words, we want KSQL to start from the earliest offset. To do this, we will use the SET
command to set the configuration variabl auto.offset.reset
for our session -- and before we run any commands.
SET 'auto.offset.reset' = 'earliest';
Also note that this can be set at the KSQL server level, if you'd like. Once you're done querying or creating tables or streams with this value, you can set it back to its original setting by simply running:
UNSET 'auto.offset.reset';
KSQL Provides a number of Scalar functions for us to make use of.
Lets write a function that takes advantage of some of these features:
SELECT UCASE(SUBSTRING(uri, 12))
FROM clickevents
WHERE number > 100
AND uri LIKE 'http://www.k%' EMIT CHANGES;
Notice that as soon as you hit CTRL+C your query ends
As with Streams, we must first find the running underlying query, and then drop the table. First, find your query:
ksql> SHOW QUERIES;
Query ID | Kafka Topic | Query String
----------------------------------------------------------------------------------------------
CTAS_A_PAGES_1 | A_PAGES | CREATE TABLE a_pages AS
SELECT * FROM pages WHERE uri LIKE 'http://www.a%';
----------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
Find your query, which in this case is CTAS_A_PAGES_1 and then, finally, TERMINATE the query and DROP the table:
TERMINATE QUERY CTAS_A_PAGES_1;
DROP TABLE A_PAGES;
In this demonstration we'll see how to create Tables with windowing enabled.
Let's create a tumbling clickevents table, where the window size is 30 seconds.
CREATE STREAM clickevents_tumbling AS
SELECT * FROM clickevents
WINDOW TUMBLING (SIZE 30 SECONDS);
Now we can create a Table with a hopping window of 30 seconds with 5 second increments.
CREATE TABLE clickevents_hopping AS
SELECT uri FROM clickevents
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS)
WHERE uri LIKE 'http://www.b%'
GROUP BY uri;
The above window is 30 seconds long and advances by 5 second. If you query the table you will see the associated window times!
Finally, lets see how session windows work. We're going to define the session as 5 minutes in order to group many events to the same window
CREATE TABLE clickevents_session AS
SELECT uri FROM clickevents
WINDOW SESSION (5 MINUTES)
WHERE uri LIKE 'http://www.b%'
GROUP BY uri;
docker-compose exec kafka kafka-topics --create --topic <topic-name> --bootstrap-server localhost:9092
docker-compose exec kafka kafka-console-producer --topic <topic-name> --bootstrap-server localhost:9092
You must type on console the press key enter.
docker-compose exec kafka kafka-console-consumer.sh --topic <topic-name> --from-beginning --bootstrap-server localhost:9092
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.