Kafka Flow (Producer Consumer arch.)
I'm having a lil issue with registering consumers on Kafka (using kafka flow).








c#
using KafkaFlow;
using Payroll.Shared;
namespace Payroll.API;
public class EmployeeCreatedHandler(IServiceProvider serviceProvider, ILogger<EmployeeCreatedHandler> logger)
: IMessageHandler<EmployeeCreatedEvent>
{
public Task Handle(IMessageContext context, EmployeeCreatedEvent message)
{
logger.LogInformation("EmployeeCreatedHandler: EmployeeId={EmployeeId}", message.EmployeeId);
return Task.CompletedTask;
}
}c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string groupName = "payroll-consumer-group";
builder.Services.AddKafkaFlowHostedService(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer =>
consumer
.Topic(topicName)
.WithGroupId(groupName)
.WithBufferSize(100)
.WithWorkersCount(4)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddTypedHandlers(handlers => handlers
.AddHandler<EmployeeCreatedHandler>()
)
)
)
)
);c#
// Configure KafkaFlow
const string topicName = "employee-topic";
const string producerName = "employee-producer";
builder.Services.AddKafka(
kafka => kafka
.UseMicrosoftLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 1, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m =>
m.AddSerializer<JsonCoreSerializer>()
)
)
)
);