diff --git a/README.md b/README.md index 69623d2..f9993fc 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,4 @@ examples. - [Docker](docker/README.md): Docker images, Dockerfiles, and container basics - [Keycloak](keycloak/README.md): Identity management, JWT authentication, Docker Compose setup +- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx new file mode 100644 index 0000000..353a32d --- /dev/null +++ b/kafka/Kafka.slnx @@ -0,0 +1,5 @@ + + + + + diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..0084b13 --- /dev/null +++ b/kafka/README.md @@ -0,0 +1,81 @@ +# Kafka + +The aim of this project is to demonstrate how to implement a `Consumer` for +`Apache Kafka` in `.NET`. Project contains a sample producer and consumer +implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka` +docker image is used. + +Producer and Consumer samples are implemented in a way to demonstrate a topic +with multiple partitions, auto and manual offset committing. + +View [Apache Kafka](0) for more information on `Kafka` + +View [confluent-kafka-dotnet](1) for more information for consumer and producer +implementations. + +## Setup + +- Run compose file to start a working `Kafka` server +- Create a topic using following command +```cmd +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic +--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +``` +- Start `Producer` app to send messages +- Start `Consumer` app to receive messages + +> [!NOTE] +> +> We currently faced an issue when all applications run in docker, since +> producer and consumer should wait for the server to be healthy but currently +> a valid health check could not be performed so we decided to only run `Kafka` +> server in docker + +For more information for docker setup, view [kafka](2) repository + +## Producer + +Producers are clients that send messages to Kafka topics. They create data and +push to `Kafka` server. Producers can specify the partition a message belongs +to, or it can be assigned by the Kafka server based on a given key or +automatically. + +Producers can send messages to multiple topics, and multiple producers can send +messages to the same topic. If topic has multiple partitions, messages with the +same key will always be in the same partition. + +## Consumer + +Consumers are clients that read messages from `Kafka` for given topic. Consumers +keep track of processed messages by committing an offset, which is stored in +topic partitions. + +Consumers can subscribe to multiple topics and can also be grouped. Each +consumer group will have their individual message offsets so it enables multiple +groups to subscribe to same topic. + +### Notes + +- Consumers have group id and offset is associated with that group id, if group + id is randomly generated every time, the offset will be reset for that group, + and all persisted messages will be read from the beginning + +- One or more consumer groups can be subscribed to a topic, and each consumer + group will have separate offsets. + +- For a partitioned topic, each consumer will be assigned to an individual + partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will + be left idle + +- If number of partitions are more than consumer count in a group, a consumer + may have more than one partition assigned + +- If a consumer of a group fails, the partitions will be assigned to remaining + consumers by `Kafka` + +- `Kafka` moves the offset when committed, Auto-commit is set to true by default + but can be turned off and committed manually. + +[0]: https://kafka.apache.org/ +[1]: https://github.com/confluentinc/confluent-kafka-dotnet +[2]: https://github.com/apache/kafka/tree/trunk/docker/examples \ No newline at end of file diff --git a/kafka/compose.yml b/kafka/compose.yml new file mode 100644 index 0000000..aed3b55 --- /dev/null +++ b/kafka/compose.yml @@ -0,0 +1,21 @@ +version: '3.8' + +services: + kafka: + image: apache/kafka:latest + container_name: kafka + ports: + - "9092:9092" + - "9094:9094" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT_HOST://:9092,PLAINTEXT_DOCKER://:9094,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT_DOCKER://host.docker.internal:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_HOST + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/kafka/src/Consumer/BatchConsumer.cs b/kafka/src/Consumer/BatchConsumer.cs new file mode 100644 index 0000000..0a33d72 --- /dev/null +++ b/kafka/src/Consumer/BatchConsumer.cs @@ -0,0 +1,57 @@ +using Confluent.Kafka; +using System.Diagnostics; + +namespace Consumer; + +public class BatchConsumer +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "12", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false + }; + var consumerBuilder = new ConsumerBuilder(config); + + using var consumer = consumerBuilder.Build(); + consumer.Subscribe("demo-topic"); + + var batch = new List>(); + + var stopWatch = new Stopwatch(); + stopWatch.Start(); + + while (true) + { + + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; + + batch.Add(result); + if (batch.Count < 5 && stopWatch.ElapsedMilliseconds <= TimeSpan.FromSeconds(1).TotalMilliseconds) + { + continue; + } + + foreach (var item in batch) + { + Console.WriteLine( + $"Partition: {item.Partition.Value}," + + $"Offset: {item.Offset}," + + $"Key: {item.Message.Key}," + + $"Value: {item.Message.Value}" + ); + } + + batch.Clear(); + consumer.Commit(); + + Console.WriteLine("awating next batch"); + await Task.Delay(1000); + stopWatch.Restart(); + } + } +} diff --git a/kafka/src/Consumer/Consumer.csproj b/kafka/src/Consumer/Consumer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Consumer/Consumer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs new file mode 100644 index 0000000..efe66f5 --- /dev/null +++ b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs @@ -0,0 +1,49 @@ +using Confluent.Kafka; + +namespace Consumer; + +public class ExceedMaxPollIntervalConsumer +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "99", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, + MaxPollIntervalMs = 11000, + SessionTimeoutMs = 10000, + HeartbeatIntervalMs = 3000 + }; + var consumerBuilder = new ConsumerBuilder(config).SetLogHandler((_, __) => { }); + + using var consumer = consumerBuilder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + try + { + var result = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (result == null) continue; + + Console.WriteLine("Consuming Message"); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + await Task.Delay((int)(result.Offset * 1000)); + + consumer.Commit(result); + } + catch (Exception ex) + { + Console.WriteLine($"{ex.GetType().Name}: {ex.Message}"); + } + } + } +} diff --git a/kafka/src/Consumer/MultipleConsumers.cs b/kafka/src/Consumer/MultipleConsumers.cs new file mode 100644 index 0000000..0f45e2b --- /dev/null +++ b/kafka/src/Consumer/MultipleConsumers.cs @@ -0,0 +1,46 @@ +using Confluent.Kafka; + +namespace Consumer; + +public class MultipleConsumers +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "1", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + var consumerBuilder = new ConsumerBuilder(config); + + Task[] consumers = [ + Consume(consumerBuilder, 0), + Consume(consumerBuilder, 1), + Consume(consumerBuilder, 2) + ]; + + await Task.WhenAll(consumers); + } + + Task Consume(ConsumerBuilder builder, int consumerId) => + Task.Run(() => + { + using var consumer = builder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; + + Console.WriteLine( + $"Consumer: {consumerId}," + + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + } + }); +} diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs new file mode 100644 index 0000000..87a1cf5 --- /dev/null +++ b/kafka/src/Consumer/Program.cs @@ -0,0 +1,10 @@ +using Consumer; + +//var multipleConsumers = new MultipleConsumers(); +//await multipleConsumers.RunAsync(); + +var batchConsumer = new BatchConsumer(); +await batchConsumer.RunAsync(); + +//var exceedMaxPollIntervalConsumer = new ExceedMaxPollIntervalConsumer(); +//await exceedMaxPollIntervalConsumer.RunAsync(); \ No newline at end of file diff --git a/kafka/src/Producer/Producer.csproj b/kafka/src/Producer/Producer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Producer/Producer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs new file mode 100644 index 0000000..e4c01ef --- /dev/null +++ b/kafka/src/Producer/Program.cs @@ -0,0 +1,45 @@ +using Confluent.Kafka; + +var producerBuilder = new ProducerBuilder(new ProducerConfig() +{ + BootstrapServers = "localhost:9092", + Acks = Acks.All, +}); + +using var producer = producerBuilder.Build(); + +Task[] producers = [ + Produce(), + Produce(key: "1234") +]; + +await Task.WhenAll(producers); + +Task Produce( + string? key = default +) => Task.Run(async () => + { + while (true) + { + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = key ?? $"#{DateTime.Now:ddhhmmss}", + Value = "Message" + }); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } + } + }); diff --git a/kafka/src/ProducerApi/ProducerApi.csproj b/kafka/src/ProducerApi/ProducerApi.csproj new file mode 100644 index 0000000..0afccb9 --- /dev/null +++ b/kafka/src/ProducerApi/ProducerApi.csproj @@ -0,0 +1,14 @@ + + + + net9.0 + enable + enable + + + + + + + + diff --git a/kafka/src/ProducerApi/Program.cs b/kafka/src/ProducerApi/Program.cs new file mode 100644 index 0000000..99e8755 --- /dev/null +++ b/kafka/src/ProducerApi/Program.cs @@ -0,0 +1,37 @@ +using Confluent.Kafka; +using Microsoft.AspNetCore.Mvc; +using System.Text.Json; + +var builder = WebApplication.CreateBuilder(args); + +// Add services to the container. +// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi +builder.Services.AddOpenApi(); + +var app = builder.Build(); + +// Configure the HTTP request pipeline. +if (app.Environment.IsDevelopment()) +{ + app.MapOpenApi(); +} + +app.UseHttpsRedirection(); + +app.MapPost("/produce/{topic}", async ([FromRoute] string topic, [FromBody] object message) => +{ + var producerBuilder = new ProducerBuilder(new ProducerConfig() + { + BootstrapServers = "localhost:9092", + Acks = Acks.All, + }); + + using var producer = producerBuilder.Build(); + + return await producer.ProduceAsync(topic, new Message + { + Value = JsonSerializer.Serialize(message) + }); +}); + +app.Run(); \ No newline at end of file diff --git a/kafka/src/ProducerApi/Properties/launchSettings.json b/kafka/src/ProducerApi/Properties/launchSettings.json new file mode 100644 index 0000000..281c298 --- /dev/null +++ b/kafka/src/ProducerApi/Properties/launchSettings.json @@ -0,0 +1,23 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5106", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "https://localhost:7201;http://localhost:5106", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/kafka/src/ProducerApi/appsettings.Development.json b/kafka/src/ProducerApi/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/kafka/src/ProducerApi/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/kafka/src/ProducerApi/appsettings.json b/kafka/src/ProducerApi/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/kafka/src/ProducerApi/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +}