diff --git a/README.md b/README.md
index 69623d2..f9993fc 100644
--- a/README.md
+++ b/README.md
@@ -11,3 +11,4 @@ examples.
- [Docker](docker/README.md): Docker images, Dockerfiles, and container basics
- [Keycloak](keycloak/README.md): Identity management, JWT authentication,
Docker Compose setup
+- [Kafka](kafka/README.md): Producer, Consumer, Docker Compose setup
diff --git a/kafka/Kafka.slnx b/kafka/Kafka.slnx
new file mode 100644
index 0000000..353a32d
--- /dev/null
+++ b/kafka/Kafka.slnx
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/kafka/README.md b/kafka/README.md
new file mode 100644
index 0000000..0084b13
--- /dev/null
+++ b/kafka/README.md
@@ -0,0 +1,81 @@
+# Kafka
+
+The aim of this project is to demonstrate how to implement a `Consumer` for
+`Apache Kafka` in `.NET`. Project contains a sample producer and consumer
+implementations. `Confluent.Kafka` `.NET` library and official `Apache Kafka`
+docker image is used.
+
+Producer and Consumer samples are implemented in a way to demonstrate a topic
+with multiple partitions, auto and manual offset committing.
+
+View [Apache Kafka](0) for more information on `Kafka`
+
+View [confluent-kafka-dotnet](1) for more information for consumer and producer
+implementations.
+
+## Setup
+
+- Run compose file to start a working `Kafka` server
+- Create a topic using following command
+```cmd
+/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic
+--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
+```
+- Start `Producer` app to send messages
+- Start `Consumer` app to receive messages
+
+> [!NOTE]
+>
+> We currently faced an issue when all applications run in docker, since
+> producer and consumer should wait for the server to be healthy but currently
+> a valid health check could not be performed so we decided to only run `Kafka`
+> server in docker
+
+For more information for docker setup, view [kafka](2) repository
+
+## Producer
+
+Producers are clients that send messages to Kafka topics. They create data and
+push to `Kafka` server. Producers can specify the partition a message belongs
+to, or it can be assigned by the Kafka server based on a given key or
+automatically.
+
+Producers can send messages to multiple topics, and multiple producers can send
+messages to the same topic. If topic has multiple partitions, messages with the
+same key will always be in the same partition.
+
+## Consumer
+
+Consumers are clients that read messages from `Kafka` for given topic. Consumers
+keep track of processed messages by committing an offset, which is stored in
+topic partitions.
+
+Consumers can subscribe to multiple topics and can also be grouped. Each
+consumer group will have their individual message offsets so it enables multiple
+groups to subscribe to same topic.
+
+### Notes
+
+- Consumers have group id and offset is associated with that group id, if group
+ id is randomly generated every time, the offset will be reset for that group,
+ and all persisted messages will be read from the beginning
+
+- One or more consumer groups can be subscribed to a topic, and each consumer
+ group will have separate offsets.
+
+- For a partitioned topic, each consumer will be assigned to an individual
+ partition if a topic has 3 partitions and 5 consumers, 2 of the consumers will
+ be left idle
+
+- If number of partitions are more than consumer count in a group, a consumer
+ may have more than one partition assigned
+
+- If a consumer of a group fails, the partitions will be assigned to remaining
+ consumers by `Kafka`
+
+- `Kafka` moves the offset when committed, Auto-commit is set to true by default
+ but can be turned off and committed manually.
+
+[0]: https://kafka.apache.org/
+[1]: https://github.com/confluentinc/confluent-kafka-dotnet
+[2]: https://github.com/apache/kafka/tree/trunk/docker/examples
\ No newline at end of file
diff --git a/kafka/compose.yml b/kafka/compose.yml
new file mode 100644
index 0000000..aed3b55
--- /dev/null
+++ b/kafka/compose.yml
@@ -0,0 +1,21 @@
+version: '3.8'
+
+services:
+ kafka:
+ image: apache/kafka:latest
+ container_name: kafka
+ ports:
+ - "9092:9092"
+ - "9094:9094"
+ environment:
+ KAFKA_NODE_ID: 1
+ KAFKA_PROCESS_ROLES: broker,controller
+ KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
+ KAFKA_LISTENERS: PLAINTEXT_HOST://:9092,PLAINTEXT_DOCKER://:9094,CONTROLLER://:9093
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT_DOCKER://host.docker.internal:9094
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
+ KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_HOST
+ KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
diff --git a/kafka/src/Consumer/BatchConsumer.cs b/kafka/src/Consumer/BatchConsumer.cs
new file mode 100644
index 0000000..0a33d72
--- /dev/null
+++ b/kafka/src/Consumer/BatchConsumer.cs
@@ -0,0 +1,57 @@
+using Confluent.Kafka;
+using System.Diagnostics;
+
+namespace Consumer;
+
+public class BatchConsumer
+{
+ public async Task RunAsync()
+ {
+ var config = new ConsumerConfig
+ {
+ BootstrapServers = "localhost:9092",
+ GroupId = "12",
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = false
+ };
+ var consumerBuilder = new ConsumerBuilder(config);
+
+ using var consumer = consumerBuilder.Build();
+ consumer.Subscribe("demo-topic");
+
+ var batch = new List>();
+
+ var stopWatch = new Stopwatch();
+ stopWatch.Start();
+
+ while (true)
+ {
+
+ var result = consumer.Consume(TimeSpan.FromSeconds(5));
+ if (result == null) continue;
+
+ batch.Add(result);
+ if (batch.Count < 5 && stopWatch.ElapsedMilliseconds <= TimeSpan.FromSeconds(1).TotalMilliseconds)
+ {
+ continue;
+ }
+
+ foreach (var item in batch)
+ {
+ Console.WriteLine(
+ $"Partition: {item.Partition.Value}," +
+ $"Offset: {item.Offset}," +
+ $"Key: {item.Message.Key}," +
+ $"Value: {item.Message.Value}"
+ );
+ }
+
+ batch.Clear();
+ consumer.Commit();
+
+ Console.WriteLine("awating next batch");
+ await Task.Delay(1000);
+ stopWatch.Restart();
+ }
+ }
+}
diff --git a/kafka/src/Consumer/Consumer.csproj b/kafka/src/Consumer/Consumer.csproj
new file mode 100644
index 0000000..8c0e676
--- /dev/null
+++ b/kafka/src/Consumer/Consumer.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ net9.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs
new file mode 100644
index 0000000..efe66f5
--- /dev/null
+++ b/kafka/src/Consumer/ExceedMaxPollIntervalConsumer.cs
@@ -0,0 +1,49 @@
+using Confluent.Kafka;
+
+namespace Consumer;
+
+public class ExceedMaxPollIntervalConsumer
+{
+ public async Task RunAsync()
+ {
+ var config = new ConsumerConfig
+ {
+ BootstrapServers = "localhost:9092",
+ GroupId = "99",
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = false,
+ MaxPollIntervalMs = 11000,
+ SessionTimeoutMs = 10000,
+ HeartbeatIntervalMs = 3000
+ };
+ var consumerBuilder = new ConsumerBuilder(config).SetLogHandler((_, __) => { });
+
+ using var consumer = consumerBuilder.Build();
+ consumer.Subscribe("demo-topic");
+
+ while (true)
+ {
+ try
+ {
+ var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
+ if (result == null) continue;
+
+ Console.WriteLine("Consuming Message");
+ Console.WriteLine(
+ $"Partition: {result.Partition.Value}," +
+ $"Offset: {result.Offset}," +
+ $"Key: {result.Message.Key}," +
+ $"Value: {result.Message.Value}"
+ );
+
+ await Task.Delay((int)(result.Offset * 1000));
+
+ consumer.Commit(result);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"{ex.GetType().Name}: {ex.Message}");
+ }
+ }
+ }
+}
diff --git a/kafka/src/Consumer/MultipleConsumers.cs b/kafka/src/Consumer/MultipleConsumers.cs
new file mode 100644
index 0000000..0f45e2b
--- /dev/null
+++ b/kafka/src/Consumer/MultipleConsumers.cs
@@ -0,0 +1,46 @@
+using Confluent.Kafka;
+
+namespace Consumer;
+
+public class MultipleConsumers
+{
+ public async Task RunAsync()
+ {
+ var config = new ConsumerConfig
+ {
+ BootstrapServers = "localhost:9092",
+ GroupId = "1",
+ AutoOffsetReset = AutoOffsetReset.Earliest
+ };
+ var consumerBuilder = new ConsumerBuilder(config);
+
+ Task[] consumers = [
+ Consume(consumerBuilder, 0),
+ Consume(consumerBuilder, 1),
+ Consume(consumerBuilder, 2)
+ ];
+
+ await Task.WhenAll(consumers);
+ }
+
+ Task Consume(ConsumerBuilder builder, int consumerId) =>
+ Task.Run(() =>
+ {
+ using var consumer = builder.Build();
+ consumer.Subscribe("demo-topic");
+
+ while (true)
+ {
+ var result = consumer.Consume(TimeSpan.FromSeconds(5));
+ if (result == null) continue;
+
+ Console.WriteLine(
+ $"Consumer: {consumerId}," +
+ $"Partition: {result.Partition.Value}," +
+ $"Offset: {result.Offset}," +
+ $"Key: {result.Message.Key}," +
+ $"Value: {result.Message.Value}"
+ );
+ }
+ });
+}
diff --git a/kafka/src/Consumer/Program.cs b/kafka/src/Consumer/Program.cs
new file mode 100644
index 0000000..87a1cf5
--- /dev/null
+++ b/kafka/src/Consumer/Program.cs
@@ -0,0 +1,10 @@
+using Consumer;
+
+//var multipleConsumers = new MultipleConsumers();
+//await multipleConsumers.RunAsync();
+
+var batchConsumer = new BatchConsumer();
+await batchConsumer.RunAsync();
+
+//var exceedMaxPollIntervalConsumer = new ExceedMaxPollIntervalConsumer();
+//await exceedMaxPollIntervalConsumer.RunAsync();
\ No newline at end of file
diff --git a/kafka/src/Producer/Producer.csproj b/kafka/src/Producer/Producer.csproj
new file mode 100644
index 0000000..8c0e676
--- /dev/null
+++ b/kafka/src/Producer/Producer.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ net9.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/kafka/src/Producer/Program.cs b/kafka/src/Producer/Program.cs
new file mode 100644
index 0000000..e4c01ef
--- /dev/null
+++ b/kafka/src/Producer/Program.cs
@@ -0,0 +1,45 @@
+using Confluent.Kafka;
+
+var producerBuilder = new ProducerBuilder(new ProducerConfig()
+{
+ BootstrapServers = "localhost:9092",
+ Acks = Acks.All,
+});
+
+using var producer = producerBuilder.Build();
+
+Task[] producers = [
+ Produce(),
+ Produce(key: "1234")
+];
+
+await Task.WhenAll(producers);
+
+Task Produce(
+ string? key = default
+) => Task.Run(async () =>
+ {
+ while (true)
+ {
+ try
+ {
+ var result = await producer.ProduceAsync("demo-topic", new Message
+ {
+ Key = key ?? $"#{DateTime.Now:ddhhmmss}",
+ Value = "Message"
+ });
+ Console.WriteLine(
+ $"Partition: {result.Partition.Value}," +
+ $"Offset: {result.Offset}," +
+ $"Key: {result.Message.Key}," +
+ $"Value: {result.Message.Value}"
+ );
+
+ await Task.Delay(2000);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine(e.Message);
+ }
+ }
+ });
diff --git a/kafka/src/ProducerApi/ProducerApi.csproj b/kafka/src/ProducerApi/ProducerApi.csproj
new file mode 100644
index 0000000..0afccb9
--- /dev/null
+++ b/kafka/src/ProducerApi/ProducerApi.csproj
@@ -0,0 +1,14 @@
+
+
+
+ net9.0
+ enable
+ enable
+
+
+
+
+
+
+
+
diff --git a/kafka/src/ProducerApi/Program.cs b/kafka/src/ProducerApi/Program.cs
new file mode 100644
index 0000000..99e8755
--- /dev/null
+++ b/kafka/src/ProducerApi/Program.cs
@@ -0,0 +1,37 @@
+using Confluent.Kafka;
+using Microsoft.AspNetCore.Mvc;
+using System.Text.Json;
+
+var builder = WebApplication.CreateBuilder(args);
+
+// Add services to the container.
+// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi
+builder.Services.AddOpenApi();
+
+var app = builder.Build();
+
+// Configure the HTTP request pipeline.
+if (app.Environment.IsDevelopment())
+{
+ app.MapOpenApi();
+}
+
+app.UseHttpsRedirection();
+
+app.MapPost("/produce/{topic}", async ([FromRoute] string topic, [FromBody] object message) =>
+{
+ var producerBuilder = new ProducerBuilder(new ProducerConfig()
+ {
+ BootstrapServers = "localhost:9092",
+ Acks = Acks.All,
+ });
+
+ using var producer = producerBuilder.Build();
+
+ return await producer.ProduceAsync(topic, new Message
+ {
+ Value = JsonSerializer.Serialize(message)
+ });
+});
+
+app.Run();
\ No newline at end of file
diff --git a/kafka/src/ProducerApi/Properties/launchSettings.json b/kafka/src/ProducerApi/Properties/launchSettings.json
new file mode 100644
index 0000000..281c298
--- /dev/null
+++ b/kafka/src/ProducerApi/Properties/launchSettings.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "https://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "http": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "launchBrowser": false,
+ "applicationUrl": "http://localhost:5106",
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development"
+ }
+ },
+ "https": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "launchBrowser": false,
+ "applicationUrl": "https://localhost:7201;http://localhost:5106",
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development"
+ }
+ }
+ }
+}
diff --git a/kafka/src/ProducerApi/appsettings.Development.json b/kafka/src/ProducerApi/appsettings.Development.json
new file mode 100644
index 0000000..0c208ae
--- /dev/null
+++ b/kafka/src/ProducerApi/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ }
+}
diff --git a/kafka/src/ProducerApi/appsettings.json b/kafka/src/ProducerApi/appsettings.json
new file mode 100644
index 0000000..10f68b8
--- /dev/null
+++ b/kafka/src/ProducerApi/appsettings.json
@@ -0,0 +1,9 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ },
+ "AllowedHosts": "*"
+}