From ad1e3c9f17f5df954fb4be5fdc2fe031a1a49e15 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Wed, 11 Feb 2026 17:26:29 +0300 Subject: [PATCH 1/8] init `issue/hello-kafka` --- README.md | 1 + kafka/README.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 kafka/README.md diff --git a/README.md b/README.md index 69623d2..f9993fc 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,4 @@ examples. - [Docker](docker/README.md): Docker images, Dockerfiles, and container basics - [Keycloak](keycloak/README.md): Identity management, JWT authentication, Docker Compose setup +- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..2eb9d10 --- /dev/null +++ b/kafka/README.md @@ -0,0 +1 @@ +# Kafka From f6b4427a6e181a3e81e2622d3f17057b890302fe Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 01:50:12 +0300 Subject: [PATCH 2/8] add sample Consumer and Producer - add compose file for kafka --- kafka/Kafka.slnx | 4 ++++ kafka/compose.yml | 19 +++++++++++++++++++ kafka/src/Consumer/Consumer.csproj | 14 ++++++++++++++ kafka/src/Consumer/Program.cs | 22 ++++++++++++++++++++++ kafka/src/Producer/Producer.csproj | 14 ++++++++++++++ kafka/src/Producer/Program.cs | 27 +++++++++++++++++++++++++++ 6 files changed, 100 insertions(+) create mode 100644 kafka/Kafka.slnx create mode 100644 kafka/compose.yml create mode 100644 kafka/src/Consumer/Consumer.csproj create mode 100644 kafka/src/Consumer/Program.cs create mode 100644 kafka/src/Producer/Producer.csproj create mode 100644 kafka/src/Producer/Program.cs diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx new file mode 100644 index 0000000..05e440f --- /dev/null +++ b/kafka/Kafka.slnx @@ -0,0 +1,4 @@ + + + + diff --git a/kafka/compose.yml b/kafka/compose.yml new file mode 100644 index 0000000..eaebccd --- /dev/null +++ b/kafka/compose.yml @@ -0,0 +1,19 @@ +version: '3.8' + +services: + kafka: + image: apache/kafka:latest + container_name: kafka + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/kafka/src/Consumer/Consumer.csproj b/kafka/src/Consumer/Consumer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Consumer/Consumer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs new file mode 100644 index 0000000..9494b50 --- /dev/null +++ b/kafka/src/Consumer/Program.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; + +using var consumer = new ConsumerBuilder(new ConsumerConfig +{ + BootstrapServers = "localhost:9092", + GroupId = "1", + AutoOffsetReset = AutoOffsetReset.Earliest +}).Build(); + +consumer.Subscribe("demo-topic"); + +while (true) +{ + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) + { + Console.WriteLine("Awaiting Message"); + continue; + } + + Console.WriteLine($"Key: {result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); +} diff --git a/kafka/src/Producer/Producer.csproj b/kafka/src/Producer/Producer.csproj new file mode 100644 index 0000000..8c0e676 --- /dev/null +++ b/kafka/src/Producer/Producer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs new file mode 100644 index 0000000..6310ec0 --- /dev/null +++ b/kafka/src/Producer/Program.cs @@ -0,0 +1,27 @@ +using Confluent.Kafka; + +using var producer = new ProducerBuilder(new ProducerConfig() +{ + BootstrapServers = "localhost:9092", + Acks = Acks.All, +}).Build(); + +while (true) +{ + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = Guid.NewGuid().ToString(), + Value = $"Message #{DateTime.Now:yyyyMMddhhmmss}" + }); + + Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } +} \ No newline at end of file From bad72414177020a125400235b98151ec6fa274d4 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 17:25:30 +0300 Subject: [PATCH 3/8] improve sample to use topic with multiple partitions and multiple consumer --- kafka/README.md | 7 +++++++ kafka/src/Consumer/Program.cs | 33 +++++++++++++++++++++------------ kafka/src/Producer/Program.cs | 22 ++++++++++++++++------ 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index 2eb9d10..3aa36c4 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1 +1,8 @@ # Kafka + + +Create a topic using following command + +```cmd +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +``` \ No newline at end of file diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs index 9494b50..bcb49b1 100644 --- a/kafka/src/Consumer/Program.cs +++ b/kafka/src/Consumer/Program.cs @@ -1,22 +1,31 @@ using Confluent.Kafka; -using var consumer = new ConsumerBuilder(new ConsumerConfig +var consumerBuilder = new ConsumerBuilder(new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "1", AutoOffsetReset = AutoOffsetReset.Earliest -}).Build(); +}); -consumer.Subscribe("demo-topic"); +var consumers = Enumerable.Repeat(0, 3).Select((_, index) => Execute(consumerBuilder, index)); -while (true) -{ - var result = consumer.Consume(TimeSpan.FromSeconds(5)); - if (result == null) +await Task.WhenAll(consumers); + +Task Execute(ConsumerBuilder builder, int consumerId) => + Task.Run(() => { - Console.WriteLine("Awaiting Message"); - continue; - } + using var consumer = builder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; - Console.WriteLine($"Key: {result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); -} + Console.Write($"Consumer: {consumerId},"); + Console.Write($"Partition: {result.Partition.Value},"); + Console.Write($"Offset: {result.Offset},"); + Console.Write($"Key: {result.Message.Key},"); + Console.WriteLine($"Value: {result.Message.Value}"); + } + }); \ No newline at end of file diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs index 6310ec0..1e2e97f 100644 --- a/kafka/src/Producer/Program.cs +++ b/kafka/src/Producer/Program.cs @@ -1,24 +1,34 @@ using Confluent.Kafka; -using var producer = new ProducerBuilder(new ProducerConfig() +var producerBuilder = new ProducerBuilder(new ProducerConfig() { BootstrapServers = "localhost:9092", Acks = Acks.All, -}).Build(); +}); + +var producer = producerBuilder.Build(); while (true) { try { - var result = await producer.ProduceAsync("demo-topic", new Message + var result = await producer.ProduceAsync("demo-topic" , new Message { - Key = Guid.NewGuid().ToString(), - Value = $"Message #{DateTime.Now:yyyyMMddhhmmss}" + Key = $"#{DateTime.Now:yyyyMMddhhmmss}", + Value = "Message" }); + Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + + await Task.Delay(500); + result = await producer.ProduceAsync("demo-topic", new Message + { + Key = "1234", + Value = $"Message" + }); Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); - await Task.Delay(2000); + await Task.Delay(500); } catch (Exception e) { From 2f8837f1c294819c1e3041de9ed8ea21f8e4864d Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 18:13:41 +0300 Subject: [PATCH 4/8] begin writing documentation - refactor producer and consumer sample --- kafka/README.md | 33 ++++++++++++++++++++- kafka/src/Consumer/Program.cs | 33 +++++++++++++++------ kafka/src/Producer/Program.cs | 56 ++++++++++++++++++++--------------- 3 files changed, 88 insertions(+), 34 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index 3aa36c4..cdd14de 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1,8 +1,39 @@ # Kafka +We used `Confluent.Kafka` `.NET` library and `apache/kafka` docker image in this +demo project + +## Setup Create a topic using following command ```cmd /opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 -``` \ No newline at end of file +``` + +## Consumer + + +### Notes + +- Consumers have group id and offset is associated with the group id, if group + id is randomly generated everytime, the offset will be reset for that group, + and all persisted messages will be read from the beginning + +- One ore more consumer groups can be subscribed to a topic, and each conmsumer + group will have seperate offsets. + +- For a partioned topic, each consumer will be assigned to an individual + partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will + be left idle + +- If number of partitions are more than consumer count in a group, a consumer may + have more than one partitions assigned + +- If a consumer of a group fails, the partitions will be assigned to remaining + consumers by kafka + +- Messages with same key will exists in the same partition + +- Kafka moves the offset when commited, Auto commit is set true by default, can + be turned of an manually committed. diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs index bcb49b1..524d8bb 100644 --- a/kafka/src/Consumer/Program.cs +++ b/kafka/src/Consumer/Program.cs @@ -1,17 +1,25 @@ using Confluent.Kafka; -var consumerBuilder = new ConsumerBuilder(new ConsumerConfig +var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "1", AutoOffsetReset = AutoOffsetReset.Earliest -}); +}; +var consumerBuilder = new ConsumerBuilder(config); +var consumerWithNoCommitBuilder = new ConsumerBuilder(new ConsumerConfig(config) { EnableAutoCommit = false }); -var consumers = Enumerable.Repeat(0, 3).Select((_, index) => Execute(consumerBuilder, index)); +Task[] consumers = [ + Consume(consumerBuilder, 0), + Consume(consumerWithNoCommitBuilder, 1, manualCommit: true), + Consume(consumerWithNoCommitBuilder, 2) +]; await Task.WhenAll(consumers); -Task Execute(ConsumerBuilder builder, int consumerId) => +Task Consume(ConsumerBuilder builder, int consumerId, + bool manualCommit = false +) => Task.Run(() => { using var consumer = builder.Build(); @@ -22,10 +30,17 @@ Task Execute(ConsumerBuilder builder, int consumerId) => var result = consumer.Consume(TimeSpan.FromSeconds(5)); if (result == null) continue; - Console.Write($"Consumer: {consumerId},"); - Console.Write($"Partition: {result.Partition.Value},"); - Console.Write($"Offset: {result.Offset},"); - Console.Write($"Key: {result.Message.Key},"); - Console.WriteLine($"Value: {result.Message.Value}"); + Console.WriteLine( + $"Consumer: {consumerId}," + + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + if (manualCommit) + { + consumer.Commit(result); + } } }); \ No newline at end of file diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs index 1e2e97f..e4c01ef 100644 --- a/kafka/src/Producer/Program.cs +++ b/kafka/src/Producer/Program.cs @@ -6,32 +6,40 @@ Acks = Acks.All, }); -var producer = producerBuilder.Build(); +using var producer = producerBuilder.Build(); -while (true) -{ - try - { - var result = await producer.ProduceAsync("demo-topic" , new Message - { - Key = $"#{DateTime.Now:yyyyMMddhhmmss}", - Value = "Message" - }); - Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); +Task[] producers = [ + Produce(), + Produce(key: "1234") +]; - await Task.Delay(500); +await Task.WhenAll(producers); - result = await producer.ProduceAsync("demo-topic", new Message +Task Produce( + string? key = default +) => Task.Run(async () => + { + while (true) { - Key = "1234", - Value = $"Message" - }); - Console.WriteLine($"Key:{result.Message.Key}, Value: {result.Message.Value}, Offset: {result.Offset}"); + try + { + var result = await producer.ProduceAsync("demo-topic", new Message + { + Key = key ?? $"#{DateTime.Now:ddhhmmss}", + Value = "Message" + }); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); - await Task.Delay(500); - } - catch (Exception e) - { - Console.WriteLine(e.Message); - } -} \ No newline at end of file + await Task.Delay(2000); + } + catch (Exception e) + { + Console.WriteLine(e.Message); + } + } + }); From d78bbfb689c031b8c4eb3c5114e8704f559d3c52 Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 12 Feb 2026 23:19:06 +0300 Subject: [PATCH 5/8] update documentation --- kafka/README.md | 74 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/kafka/README.md b/kafka/README.md index cdd14de..0084b13 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -1,39 +1,81 @@ # Kafka -We used `Confluent.Kafka` `.NET` library and `apache/kafka` docker image in this -demo project +The aim of this project is to demonstrate how to implement a `Consumer` for +`Apache Kafka` in `.NET`. Project contains a sample producer and consumer +implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka` +docker image is used. -## Setup +Producer and Consumer samples are implemented in a way to demonstrate a topic +with multiple partitions, auto and manual offset committing. + +View [Apache Kafka](0) for more information on `Kafka` -Create a topic using following command +View [confluent-kafka-dotnet](1) for more information for consumer and producer +implementations. +## Setup + +- Run compose file to start a working `Kafka` server +- Create a topic using following command ```cmd -/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic +--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` +- Start `Producer` app to send messages +- Start `Consumer` app to receive messages + +> [!NOTE] +> +> We currently faced an issue when all applications run in docker, since +> producer and consumer should wait for the server to be healthy but currently +> a valid health check could not be performed so we decided to only run `Kafka` +> server in docker + +For more information for docker setup, view [kafka](2) repository + +## Producer + +Producers are clients that send messages to Kafka topics. They create data and +push to `Kafka` server. Producers can specify the partition a message belongs +to, or it can be assigned by the Kafka server based on a given key or +automatically. + +Producers can send messages to multiple topics, and multiple producers can send +messages to the same topic. If topic has multiple partitions, messages with the +same key will always be in the same partition. ## Consumer +Consumers are clients that read messages from `Kafka` for given topic. Consumers +keep track of processed messages by committing an offset, which is stored in +topic partitions. + +Consumers can subscribe to multiple topics and can also be grouped. Each +consumer group will have their individual message offsets so it enables multiple +groups to subscribe to same topic. ### Notes -- Consumers have group id and offset is associated with the group id, if group - id is randomly generated everytime, the offset will be reset for that group, +- Consumers have group id and offset is associated with that group id, if group + id is randomly generated every time, the offset will be reset for that group, and all persisted messages will be read from the beginning -- One ore more consumer groups can be subscribed to a topic, and each conmsumer - group will have seperate offsets. +- One or more consumer groups can be subscribed to a topic, and each consumer + group will have separate offsets. -- For a partioned topic, each consumer will be assigned to an individual +- For a partitioned topic, each consumer will be assigned to an individual partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will be left idle -- If number of partitions are more than consumer count in a group, a consumer may - have more than one partitions assigned +- If number of partitions are more than consumer count in a group, a consumer + may have more than one partition assigned - If a consumer of a group fails, the partitions will be assigned to remaining - consumers by kafka + consumers by `Kafka` -- Messages with same key will exists in the same partition +- `Kafka` moves the offset when committed, Auto-commit is set to true by default + but can be turned off and committed manually. -- Kafka moves the offset when commited, Auto commit is set true by default, can - be turned of an manually committed. +[0]: https://kafka.apache.org/ +[1]: https://github.com/confluentinc/confluent-kafka-dotnet +[2]: https://github.com/apache/kafka/tree/trunk/docker/examples \ No newline at end of file From 47a60a8e839eb908968f11723c2b39e1be20457b Mon Sep 17 00:00:00 2001 From: dncsvr Date: Mon, 23 Feb 2026 23:24:10 +0300 Subject: [PATCH 6/8] improve kafka sample consumers - move use cases to their own files --- kafka/src/Consumer/BatchConsumer.cs | 57 +++++++++++++++++++ .../Consumer/ExceedMaxPollIntervalConsumer.cs | 49 ++++++++++++++++ kafka/src/Consumer/MultipleConsumers.cs | 46 +++++++++++++++ kafka/src/Consumer/Program.cs | 50 +++------------- 4 files changed, 159 insertions(+), 43 deletions(-) create mode 100644 kafka/src/Consumer/BatchConsumer.cs create mode 100644 kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs create mode 100644 kafka/src/Consumer/MultipleConsumers.cs diff --git a/kafka/src/Consumer/BatchConsumer.cs b/kafka/src/Consumer/BatchConsumer.cs new file mode 100644 index 0000000..0a33d72 --- /dev/null +++ b/kafka/src/Consumer/BatchConsumer.cs @@ -0,0 +1,57 @@ +using Confluent.Kafka; +using System.Diagnostics; + +namespace Consumer; + +public class BatchConsumer +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "12", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false + }; + var consumerBuilder = new ConsumerBuilder(config); + + using var consumer = consumerBuilder.Build(); + consumer.Subscribe("demo-topic"); + + var batch = new List>(); + + var stopWatch = new Stopwatch(); + stopWatch.Start(); + + while (true) + { + + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; + + batch.Add(result); + if (batch.Count < 5 && stopWatch.ElapsedMilliseconds <= TimeSpan.FromSeconds(1).TotalMilliseconds) + { + continue; + } + + foreach (var item in batch) + { + Console.WriteLine( + $"Partition: {item.Partition.Value}," + + $"Offset: {item.Offset}," + + $"Key: {item.Message.Key}," + + $"Value: {item.Message.Value}" + ); + } + + batch.Clear(); + consumer.Commit(); + + Console.WriteLine("awating next batch"); + await Task.Delay(1000); + stopWatch.Restart(); + } + } +} diff --git a/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs new file mode 100644 index 0000000..efe66f5 --- /dev/null +++ b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs @@ -0,0 +1,49 @@ +using Confluent.Kafka; + +namespace Consumer; + +public class ExceedMaxPollIntervalConsumer +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "99", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, + MaxPollIntervalMs = 11000, + SessionTimeoutMs = 10000, + HeartbeatIntervalMs = 3000 + }; + var consumerBuilder = new ConsumerBuilder(config).SetLogHandler((_, __) => { }); + + using var consumer = consumerBuilder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + try + { + var result = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (result == null) continue; + + Console.WriteLine("Consuming Message"); + Console.WriteLine( + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + + await Task.Delay((int)(result.Offset * 1000)); + + consumer.Commit(result); + } + catch (Exception ex) + { + Console.WriteLine($"{ex.GetType().Name}: {ex.Message}"); + } + } + } +} diff --git a/kafka/src/Consumer/MultipleConsumers.cs b/kafka/src/Consumer/MultipleConsumers.cs new file mode 100644 index 0000000..0f45e2b --- /dev/null +++ b/kafka/src/Consumer/MultipleConsumers.cs @@ -0,0 +1,46 @@ +using Confluent.Kafka; + +namespace Consumer; + +public class MultipleConsumers +{ + public async Task RunAsync() + { + var config = new ConsumerConfig + { + BootstrapServers = "localhost:9092", + GroupId = "1", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + var consumerBuilder = new ConsumerBuilder(config); + + Task[] consumers = [ + Consume(consumerBuilder, 0), + Consume(consumerBuilder, 1), + Consume(consumerBuilder, 2) + ]; + + await Task.WhenAll(consumers); + } + + Task Consume(ConsumerBuilder builder, int consumerId) => + Task.Run(() => + { + using var consumer = builder.Build(); + consumer.Subscribe("demo-topic"); + + while (true) + { + var result = consumer.Consume(TimeSpan.FromSeconds(5)); + if (result == null) continue; + + Console.WriteLine( + $"Consumer: {consumerId}," + + $"Partition: {result.Partition.Value}," + + $"Offset: {result.Offset}," + + $"Key: {result.Message.Key}," + + $"Value: {result.Message.Value}" + ); + } + }); +} diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs index 524d8bb..87a1cf5 100644 --- a/kafka/src/Consumer/Program.cs +++ b/kafka/src/Consumer/Program.cs @@ -1,46 +1,10 @@ -using Confluent.Kafka; +using Consumer; -var config = new ConsumerConfig -{ - BootstrapServers = "localhost:9092", - GroupId = "1", - AutoOffsetReset = AutoOffsetReset.Earliest -}; -var consumerBuilder = new ConsumerBuilder(config); -var consumerWithNoCommitBuilder = new ConsumerBuilder(new ConsumerConfig(config) { EnableAutoCommit = false }); +//var multipleConsumers = new MultipleConsumers(); +//await multipleConsumers.RunAsync(); -Task[] consumers = [ - Consume(consumerBuilder, 0), - Consume(consumerWithNoCommitBuilder, 1, manualCommit: true), - Consume(consumerWithNoCommitBuilder, 2) -]; +var batchConsumer = new BatchConsumer(); +await batchConsumer.RunAsync(); -await Task.WhenAll(consumers); - -Task Consume(ConsumerBuilder builder, int consumerId, - bool manualCommit = false -) => - Task.Run(() => - { - using var consumer = builder.Build(); - consumer.Subscribe("demo-topic"); - - while (true) - { - var result = consumer.Consume(TimeSpan.FromSeconds(5)); - if (result == null) continue; - - Console.WriteLine( - $"Consumer: {consumerId}," + - $"Partition: {result.Partition.Value}," + - $"Offset: {result.Offset}," + - $"Key: {result.Message.Key}," + - $"Value: {result.Message.Value}" - ); - - if (manualCommit) - { - consumer.Commit(result); - } - } - }); \ No newline at end of file +//var exceedMaxPollIntervalConsumer = new ExceedMaxPollIntervalConsumer(); +//await exceedMaxPollIntervalConsumer.RunAsync(); \ No newline at end of file From e244768cdf097507624f77aaaf18b957736385eb Mon Sep 17 00:00:00 2001 From: dncsvr Date: Thu, 26 Feb 2026 23:37:52 +0300 Subject: [PATCH 7/8] add an api project to produce test messages --- kafka/Kafka.slnx | 1 + kafka/src/ProducerApi/ProducerApi.csproj | 14 +++++++ kafka/src/ProducerApi/Program.cs | 37 +++++++++++++++++++ .../Properties/launchSettings.json | 23 ++++++++++++ .../ProducerApi/appsettings.Development.json | 8 ++++ kafka/src/ProducerApi/appsettings.json | 9 +++++ 6 files changed, 92 insertions(+) create mode 100644 kafka/src/ProducerApi/ProducerApi.csproj create mode 100644 kafka/src/ProducerApi/Program.cs create mode 100644 kafka/src/ProducerApi/Properties/launchSettings.json create mode 100644 kafka/src/ProducerApi/appsettings.Development.json create mode 100644 kafka/src/ProducerApi/appsettings.json diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx index 05e440f..353a32d 100644 --- a/kafka/Kafka.slnx +++ b/kafka/Kafka.slnx @@ -1,4 +1,5 @@ + diff --git a/kafka/src/ProducerApi/ProducerApi.csproj b/kafka/src/ProducerApi/ProducerApi.csproj new file mode 100644 index 0000000..0afccb9 --- /dev/null +++ b/kafka/src/ProducerApi/ProducerApi.csproj @@ -0,0 +1,14 @@ + + + + net9.0 + enable + enable + + + + + + + + diff --git a/kafka/src/ProducerApi/Program.cs b/kafka/src/ProducerApi/Program.cs new file mode 100644 index 0000000..99e8755 --- /dev/null +++ b/kafka/src/ProducerApi/Program.cs @@ -0,0 +1,37 @@ +using Confluent.Kafka; +using Microsoft.AspNetCore.Mvc; +using System.Text.Json; + +var builder = WebApplication.CreateBuilder(args); + +// Add services to the container. +// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi +builder.Services.AddOpenApi(); + +var app = builder.Build(); + +// Configure the HTTP request pipeline. +if (app.Environment.IsDevelopment()) +{ + app.MapOpenApi(); +} + +app.UseHttpsRedirection(); + +app.MapPost("/produce/{topic}", async ([FromRoute] string topic, [FromBody] object message) => +{ + var producerBuilder = new ProducerBuilder(new ProducerConfig() + { + BootstrapServers = "localhost:9092", + Acks = Acks.All, + }); + + using var producer = producerBuilder.Build(); + + return await producer.ProduceAsync(topic, new Message + { + Value = JsonSerializer.Serialize(message) + }); +}); + +app.Run(); \ No newline at end of file diff --git a/kafka/src/ProducerApi/Properties/launchSettings.json b/kafka/src/ProducerApi/Properties/launchSettings.json new file mode 100644 index 0000000..281c298 --- /dev/null +++ b/kafka/src/ProducerApi/Properties/launchSettings.json @@ -0,0 +1,23 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5106", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "https://localhost:7201;http://localhost:5106", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/kafka/src/ProducerApi/appsettings.Development.json b/kafka/src/ProducerApi/appsettings.Development.json new file mode 100644 index 0000000..0c208ae --- /dev/null +++ b/kafka/src/ProducerApi/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/kafka/src/ProducerApi/appsettings.json b/kafka/src/ProducerApi/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/kafka/src/ProducerApi/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} From 1a8c205347234f704bfd670ca9b107dc398c029a Mon Sep 17 00:00:00 2001 From: dncsvr Date: Fri, 27 Feb 2026 01:50:22 +0300 Subject: [PATCH 8/8] update kafka configuration for both internal and external access --- kafka/compose.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/compose.yml b/kafka/compose.yml index eaebccd..aed3b55 100644 --- a/kafka/compose.yml +++ b/kafka/compose.yml @@ -6,14 +6,16 @@ services: container_name: kafka ports: - "9092:9092" + - "9094:9094" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 - KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT_HOST://:9092,PLAINTEXT_DOCKER://:9094,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT_DOCKER://host.docker.internal:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_HOST KAFKA_LOG_DIRS: /tmp/kraft-combined-logs KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file