w4n9hu1's Blog

使用Docker/Kafka/.Net6搭建流处理系统

基于事件流处理的微服务架构可以使程序有更低的耦合度,更高的吞吐量和更好的扩展性,本文将使用Docker安装Kafka环境,并基于.Net6搭建一个WebApi的Producer和一个控制台Consumer程序。

一、安装Apache Kafka

使用Docker-Compose安装Apache Kafka只需要简单2个步骤。

  1. 下载docker-compose.yml文件,curl --silent --output docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.1-post/cp-all-in-one/docker-compose.yml
  2. 在docker-compose.yml所在目录运行docker-compose up -d,等待镜像拉取和运行。

docker-compose

二、使用Confluent Control Center

Confluent Control Center是一个基于Web,管理和监控Apache Kafka的工具,根据docker-compose.yml相关配置,打开http://localhost:9021/clusters。

control-center

三、创建基于WebApi的生产者

  1. 基于.Net6创建一个ASP.NET Core Web API项目和一个控制台应用项目。 projects

  2. 安装Confluent.Kafka和Newtonsoft.Json Nuget包。 nuget

  3. 新建KafkaProducer类,注入IProducer<Null, string> _producer

 public class KafkaProducer : IKafkaProducer
    {
        private readonly IProducer<Null, string> _producer;

        public KafkaProducer(IProducer<Null, string> producer)
        {
            _producer = producer;
        }

        public async Task ProduceAsync(Order order)
        {
            await _producer.ProduceAsync("order-topic",
               new Message<Null, string>
               {
                   Value = JsonConvert.SerializeObject(order)
               });
        }
    }
  1. Program.cs类中注入IProducerIKafkaProducer
var kafkaConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };
builder.Services.AddSingleton<IProducer<Null, string>>(x =>
new ProducerBuilder<Null, string>(kafkaConfig).Build());

builder.Services.AddSingleton<IKafkaProducer, KafkaProducer>();

四、创建基于控制台的消费者

创建Consumer。

var config = new ConsumerConfig
{
    GroupId = "order-consumer-group",
    BootstrapServers = "localhost:9092",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Null, string>(config).Build())
{
    consumer.Subscribe("order-topic");
    CancellationTokenSource token = new CancellationTokenSource();
    try
    {
        while (true)
        {
            var response = consumer.Consume(token.Token);
            if (response.Message != null)
            {
                var order = JsonConvert.DeserializeObject<Order>(response.Message.Value);
                Console.WriteLine($"OrderId: {order.OrderId} OrderCode: {order.OrderCode}");
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message); ;
    }
}

五、运行测试

运行起来2个项目,并向/Order接口Post测试数据,可以看到Consumer应用实时接收到相关消息。 Producer producer Consumer consumer

打开Confluent Control Center,可以看到相关Topic和消息。

topic

GitHub

引用

  1. Docker快速入门