RabbitMQ知多少

下载安装

Erlang
RabbitMQ

启动RabbitMQ管理平台插件

DOS下进入到安装目录sbin,执行以下命令

rabbitmq-plugins enable rabbitmq_management   

当出现以下结果时,重启RabbitMQ服务

set 3 plugins.
Offline change; changes will take effect at broker restart.

访问http://localhost:15672(账号密码:guest)

注意:以下为C#代码,请引用NuGet包:RabbitMQ.Client

金沙官网线上 1

参考文章

RabbitMQ快速入门

1.引言

RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:

金沙官网线上 2

RabbitMQ 内部结构

RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息。RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

下面我们就来学习下RabbitMQ。

名词解析

P(Publisher):生产者
C(Consumer):消费者
Channel:信道
Queue:队列
Exchange:信息交换机

2. 环境搭建

本文主要基于Windows下使用Vs Code 基于.net core进行demo演示。开始之前我们需要准备好以下环境。

  • 安装Erlang运行环境
    下载安装Erlang
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ
  • 启动RabbitMQ Server
    点击Windows开始按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理员身份运行。
  • 依次执行以下命令启动RabbitMQ服务
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
  • 执行rabbitmqlctl status检查RabbitMQ状态
  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用默认账号密码(guest/guest)登录金沙官网线上,http://localhost:15672/即可。

简单演示

信息发送端

static void Send()
{
    //1. 实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 声明队列
            channel.QueueDeclare(queue: "rabbitmq",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //5. 构建字节数据包
            var message = "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);

            //6. 发送数据包
            channel.BasicPublish(exchange: "",
                                 routingKey: "rabbitmq",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

信息接收端

static void Receive()
{
    //1. 实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 声明队列
            channel.QueueDeclare(queue: "rabbitmq",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);

            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "rabbitmq",
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();

        }
    }
}

3. Hello RabbitMQ

在开始之前我们先来了解下消息模型:

金沙官网线上 3

消息流

消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,队列再将消息发送到监听的消费者。

下面我们我们通过demo来了解RabbitMQ的基本用法。

轮询调度

P生产的多个任务进入到队列中,多个C间可以并行处理任务。默认情况下,RabbitMQ把信息按顺序发送给每一个C。平均每个C将获得同等数量的信息。

3.1.消息的发送和接收

创建RabbitMQ文件夹,打开命令提示符,分别创建两个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

我们先来添加消息发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善消息接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运行消息接收端,再运行消息发送端,结果如下图。

金沙官网线上 4

运行结果

从上面的代码中可以看出,发送端和消费端的代码前4步都是一样的。主要的区别在于发送端调用channel.BasicPublish方法发送消息;而接收端需要实例化一个EventingBasicConsumer实例来进行消息处理逻辑。另外一点需要注意的是:消息接收端和发送端的队列名称(queue)必须保持一致,这里指定的队列名称为hello。

信息确认

按照最简单的演示来说,信息一旦发送到C中,则该信息就会从队列中移除。一旦中间信息处理异常/失败,C端程序退出等,都将会导致信息未处理完成,而此时队列中已将信息移除了,那么就会导致一系列的问题。我们可以在C端设置手动确认信息,从而解决上述问题的发生。
Receive代码块

//6. 绑定消息接收后的事件委托
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine(" [x] Received {0}", message);
    Thread.Sleep(5000);//模拟耗时
    Console.WriteLine(" [x] Done");

    // 发送信息确认信号(手动信息确认)
    channel.BasicAck(ea.DeliveryTag, false);
};
//7. 启动消费者
/*
 autoAck参数属性
    true:自动信息确认,当C端接收到信息后,自动发送ack信号,不管信息是否处理完毕
    false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认
 */
channel.BasicConsume(queue: "rabbitmq",
                     autoAck: false,
                     consumer: consumer);

3.2. 循环调度

使用工作队列的好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了。我们先启动两个接收端,等待消息接收,再启动一个发送端进行消息发送。

金沙官网线上 5

消息分发

我们增加运行一个消费端后的运行结果:

金沙官网线上 6

循环调度

从图中可知,我们循环发送4条信息,两个消息接收端按顺序被循环分配。
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做循环(round-robin)。

信息持久化

当RabbitMQ退出或死机时会清空队列和信息。通过将队列和信息标记为持久的,来告知RabbitMQ将信息持久化。

Send代码块

//4. 声明队列
//durable设置为true,表示此队列为持久的。
//注意:RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,所以你需要重启服务/更改队列名称
channel.QueueDeclare(queue: "rabbitmq",
                     durable: true, //标记队列持久
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);
//设置IbasicProperties.SetPersistent属性值为true来标记我们的消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

//5. 构建字节数据包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

//6. 发送数据包
channel.BasicPublish(exchange: "",
                     routingKey: "rabbitmq",
                     basicProperties: properties, //指定BasicProperties
                     body: body);

3.3. 消息确认

按照我们上面的demo,一旦RabbitMQ将消息发送到消费端,消息就会立即从内存中移出,无论消费端是否处理完成。在这种情况下,消息就会丢失。

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认(message acknowledgments)。当消费端接收消息并且处理完成后,会发送一个ack(消息确认)信号到RabbitMQ,RabbitMQ接收到这个信号后,就可以删除掉这条已经处理的消息任务。但如果消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会明白某个消息没有正常处理,RabbitMQ将会重新将消息入队,如果有另外一个消费端在线,就会快速的重新发送到另外一个消费端。

RabbitMQ中没有消息超时的概念,只有当消费端关闭或奔溃时,RabbitMQ才会重新分发消息。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

主要改动的是将 autoAck:true修改为autoAck:fasle,以及在消息处理完毕后手动调用BasicAck方法进行手动消息确认。

金沙官网线上 7

从图中可知,消息发送端连续发送4条消息,其中消费端1先被分配处理第一条消息,消费端2被循环分配第二条消息,第三条消息由于没有空闲消费者仍然在队列中。
在消费端2未处理完第一条消息之前,手动中断(ctrl+c)。我们可以发现RabbitMQ在下一次分发时,会优先将被中断的消息分发给消费端1处理。

公平调度

上述演示中,如果队列中存在多个信息,在开启多个C的情况下,只有一个C忙个不停,另外的却一直处于空闲状态。通过调用BasicQos,告知RabbitMQ在某个C信息处理完毕,并且已经收到信息确认之后,才可以继续发送信息到这个C。否则,将会把信息分发到另外空闲的C。

Receive代码块

//4. 声明队列
channel.QueueDeclare(queue: "rabbitmq",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);
////设置prefetchCount为1来告知RabbitMQ在未收到消费端的消息确认时,不再分发消息
channel.BasicQos(prefetchSize: 0,
                 prefetchCount: 1,
                 global: false);

3.4. 消息持久化

消息确认确保了即使消费端异常,消息也不会丢失能够被重新分发处理。但是如果RabbitMQ服务端异常,消息依然会丢失。除非我们指定durable:true,否则当RabbitMQ退出或奔溃时,消息将依然会丢失。通过指定durable:true,并指定Persistent=true,来告知RabbitMQ将消息持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将消息标记为持久性不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时​​,仍然有一个很短的时间窗口。RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要确保消息队列的持久化,可以使用publisher confirms.

本文由金沙官网线上发布于编程,转载请注明出处:RabbitMQ知多少

您可能还会对下面的文章感兴趣: