RabbitMQ 是 VMware 一个非常强大的开源消息代理软件,通常用于处理背景任务,延迟消息和长时间运行的任务.它支持多种消息队列协议(AMQP,MQTT,STOMP 等),可以在分布式和联网环境中使用,提供了高可用性和可靠性.
至于其他的一些优点也不必多说,这也不是我的风格.我喜欢的就是好用,稳,简单.??
其他的更多的内容这里不再多说,以及 RabbitMQ 的 7 种工作模式.可以参考工良大佬的博客,和查看官网的例子来学习.
工良大佬的博客地址: https://www.whuanle.cn/archives/21430
官网链接: https://www.rabbitmq.com/tutorials
官网的例子包含各种语言的实现.需要的可以查看一下,我主要用 C#,所以这里我仅使用 C#作为本文内容.
部署 RabbitMQ 服务
要使用 RabbitMQ 肯定是需要我们部署一个服务端的.这里也有必要简要的快速说明一下,安装方法.
当然我们可以根据官网的教程,进行手动安装服务.但是略显麻烦.所以针对在 Ubuntu/Debian 操作系统的小伙伴我写了一个脚本.当然也有一些注意事项.
第一个就是系统版本.推荐使用 Ubuntu 系统并且系统版本为: 22.04, 23.04, 23.10
虽然 Debian 也可以,但是更推荐 Ubuntu,并且使用较新的版本,这样不容易出现依赖问题.
这几个版本是经过我测试的.可以通过一键安装脚本进行安装.
curl -s https://raw.githubusercontent.com/joesdu/rabbitmq-server-install-bash/main/install.sh | sudo bash
脚本源码所在仓库: https://github.com/joesdu/rabbitmq-server-install-bash
有兴趣的可以去看看,并且若是能提交新的 PR 就更好了.
当然事情也不必如此麻烦,因为现在开发我们有更快速和方便的工具 Docker.利用 Docker 我们可以快速的构建一个服务用于测试,生产环境也可以使用 Docker 进行服务部署.简直不要太舒服.
所以我也制作了 Docker 镜像用于快速部署服务端.可以使用如下命令运行一个容器.
# 启动RabbitMQ服务,该镜像包含并启用 delayed_message_exchange 插件
docker run --name rabbitmq -p 5672:5672 -p 15672:15672 -d --rm -it -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest dygood/rabbitmq:3.13-management-dlx
镜像源码仓库: https://github.com/EasilyNET/rabbit-with-dlx-image
创建项目开始写代码了
服务端准备好后,我们便可以开始写代码了,目前代码刚开始创建.我写文档都是写到什么地方就开始什么操作,所以有可能前后文字风格会发生变化,希望见谅.
打开命令行创建一个解决方案文件,名字随你们自己取.并加入一些我们需要的东西.
# 创建解决方案文件
dotnet new sln -n easilynet.sample.rabbit
# 创建生产者Webapi项目,方便使用Swagger进行消息发布
dotnet new apicontroller -n Producer.Api
# 将生产者项目添加到解决方案文件中
dotnet sln add ./Producer.Api
# 创建消费者项目
dotnet new apicontroller -n Consumer
# 将消费者添加到解决方案
dotnet sln add ./Consumer
# 这里我们再创建一个类库,用于构建消息实体.方便管理和引用.并将其加入解决方案中
dotnet new classlib -n RabbitEvents
dotnet sln add ./RabbitEvents
# 接下来两句由AI生成,真是牛啊,意思就是将RabbitEvents库分别添加到消费者和生产者项目中
# Add the RabbitEvents library as a reference to the Producer.Api project
dotnet add ./Producer.Api/Producer.Api.csproj reference ./RabbitEvents/RabbitEvents.csproj
# Add the RabbitEvents library as a reference to the Consumer project
dotnet add ./Consumer/Consumer.csproj reference ./RabbitEvents/RabbitEvents.csproj
# 接下来为各个项目引入依赖包.
# Add the necessary NuGet packages to the RabbitEvents project
dotnet add ./RabbitEvents/RabbitEvents.csproj package EasilyNET.RabbitBus.Core
dotnet add ./Producer.Api/Producer.Api.csproj package EasilyNET.RabbitBus.AspNetCore
dotnet add ./Consumer/Consumer.csproj package EasilyNET.RabbitBus.AspNetCore
到这里,我们便可以用 VS 或者 VS Code 打开项目了.当然,我们使用 VS 直接可视化创建项目并添加引用也是非常可以的.为了不截图,这里就使用命令的方式进行创建了.
接下来,先为生产者和消费者添加服务注册.并删除一些模板的代码.详细代码,稍后我会上传到 GitHub,所以这里我简要写一个例子.
首先在消费者和生产者项目中,添加服务.
// 配置服务(亦可使用集群模式或者使用配置文件)
builder.Services.AddRabbitBus(c =>
{
c.Host = "localhost";
c.Port = 5672;
c.UserName = "guest";
c.PassWord = "guest";
});
// 或者使用配置文件
// builder.Services.AddRabbitBus(builder.Configuration, poolCount: (uint)Environment.ProcessorCount);
若是使用 Configuration 注册,则按照如下格式书写连接字符串.
{
"ConnectionStrings": {
"Rabbit": "amqp://guest:guest@localhost:5672/%2f"
}
}
然后我们在 RabbitEvents 项目中添加一个消息实体.这里以 DelayedEvent 为例,顺便介绍如何使用延时消息.
[Exchange(EModel.Delayed, "xdl.hello", queue: "xdl.hello.world", isDlx: true), QueueArg("x-message-ttl", 5000)]
public class DelayedEvent : IEvent
{
/// <summary>
/// 摘要
/// </summary>
public string Summary { get; set; } = "delayed_msg";
/// <summary>
/// 消息ID
/// </summary>
public string EventId => SnowId.GenerateNewId().ToString();
}
从上面的代码中需要消息实体继承 IEvent 接口,其中消息 ID 可以自动生成,若是不希望使用默认的格式,可以自己替换成其他生成方式.
然后添加上交换机信息的特性即可.
消息实体完成后,接着去 Consumer 项目添加消费者的函数.
public class DelayedEventHandler : IEventHandler<DelayedEvent>
{
/// <inheritdoc />
public Task HandleAsync(DelayedEvent @event)
{
Console.WriteLine($"[消息处理自:{nameof(DelayedEventHandler)}]-{JsonSerializer.Serialize(@event)}");
return Task.CompletedTask;
}
}
同样我们的消费者仅需继承 IEventHandler<>接口即可.
然后在 HandleAsync 函数中实现自己的业务逻辑即可.
最后我们去生产者项目中写一个消息发送的接口,并通过 Swagger 调用.等待 3 秒后即可在消费者窗口看到消息输出.
/// <summary>
/// 发送延时消息,使用延时插件
/// </summary>
[HttpPost]
public async Task Delayed()
{
var rand = new Random();
await ibus.Publish(new DelayedEvent(), priority: (byte)rand.Next(0, 9), ttl: 3000).ConfigureAwait(false);
}
消费者输出结果
[消息处理自:DelayedEventHandler]-{"Summary":"delayed_msg","EventId":"65f42a8ff73e109da5ec0d19"}
至此我们的消息发送,处理就完成了.该库发送消息仅一个接口,两个重载.
处理消息也仅有一个函数.可谓是极简 ??.
更详细的使用可以参考本文例子源码.当然可以去 EasilyNET 组织提供 PR 或者提出问题联系我.
本文仅为一个简单的例子,还有一些细节尚未详细写出来,一边创建项目一边写文档确实有点费时间,一两个小时就做了这一件事情
本文例子 GitHub 仓库地址: https://github.com/joesdu/easilynet.sample.rabbit
本文暂时没有评论,来添加一个吧(●'◡'●)