基于事件流处理的微服务架构可以使程序有更低的耦合度,更高的吞吐量和更好的扩展性,本文将使用Docker安装Kafka环境,并基于.Net6搭建一个WebApi的Producer和一个控制台Consumer程序。
一、安装Apache Kafka
使用Docker-Compose安装Apache Kafka只需要简单2个步骤。
- 下载
docker-compose.yml
文件,curl --silent --output docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.1-post/cp-all-in-one/docker-compose.yml
- 在docker-compose.yml所在目录运行
docker-compose up -d
,等待镜像拉取和运行。
二、使用Confluent Control Center
Confluent Control Center是一个基于Web,管理和监控Apache Kafka的工具,根据docker-compose.yml相关配置,打开http://localhost:9021/clusters。
三、创建基于WebApi的生产者
基于.Net6创建一个ASP.NET Core Web API项目和一个控制台应用项目。
安装Confluent.Kafka和Newtonsoft.Json Nuget包。
新建KafkaProducer类,注入IProducer<Null, string> _producer
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class KafkaProducer : IKafkaProducer { private readonly IProducer<Null, string> _producer;
public KafkaProducer(IProducer<Null, string> producer) { _producer = producer; }
public async Task ProduceAsync(Order order) { await _producer.ProduceAsync("order-topic", new Message<Null, string> { Value = JsonConvert.SerializeObject(order) }); } }
|
- Program.cs类中注入
IProducer
和IKafkaProducer
。
1 2 3 4 5
| var kafkaConfig = new ProducerConfig { BootstrapServers = "localhost:9092" }; builder.Services.AddSingleton<IProducer<Null, string>>(x => new ProducerBuilder<Null, string>(kafkaConfig).Build());
builder.Services.AddSingleton<IKafkaProducer, KafkaProducer>();
|
四、创建基于控制台的消费者
创建Consumer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| var config = new ConsumerConfig { GroupId = "order-consumer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var consumer = new ConsumerBuilder<Null, string>(config).Build()) { consumer.Subscribe("order-topic"); CancellationTokenSource token = new CancellationTokenSource(); try { while (true) { var response = consumer.Consume(token.Token); if (response.Message != null) { var order = JsonConvert.DeserializeObject<Order>(response.Message.Value); Console.WriteLine($"OrderId: {order.OrderId} OrderCode: {order.OrderCode}"); } } } catch (Exception ex) { Console.WriteLine(ex.Message); ; } }
|
五、运行测试
运行起来2个项目,并向/Order接口Post测试数据,可以看到Consumer应用实时接收到相关消息。
Producer
Consumer
打开Confluent Control Center,可以看到相关Topic和消息。
GitHub
引用
- Docker快速入门