RabbitMQ基础入门篇

下载安装

Erlang
RabbitMQ

启航RabbitMQ管理平台插件

DOS下进入到安装目录\sbin,实行以下命令

rabbitmq-plugins enable rabbitmq_management   

当出现以下结果时,重启RabbitMQ服务

set 3 plugins.
Offline change; changes will take effect at broker restart.

访问(账号密码:guest)

注意:以下为C#代码,请引用NuGet包:RabbitMQ.Client

澳门葡京备用网址 1

澳门葡京备用网址 2

简介

  RabbitMQ是3个新闻代理。从本质上讲,它从音信生产者处接收消息,然后传递给音讯的顾客。它在音讯的生产者和顾客之间基于你钦赐的条条框框对消息进行路由、缓存和持久化。

RabbitMQ平常采用如下术语:

  • 生育(Producing),表示音信的发送。发送音讯的次序被称作生产者。大家画一个图来代表它,如下所示:

 

                      澳门葡京备用网址 3

  • 队列(Queue),表示邮箱的名目,它存在于RabbitMQ服务器中。固然新闻在RabbitMQ和您的应用程序间流转,但它们只可以存款和储蓄与RabbitMQ内部的连串中。队列不会有此外限制,它能够存放任意多的音讯,就好像二个Infiniti量的缓存一般。八个生产者能够向同三个队列发送音讯,多少个买主也能够品尝从同二个队列中读取音信。3个体系能够被画成上面那样在其上有二个称谓的图形:

                    澳门葡京备用网址 4

  • 开销(Consuming),表示新闻的接受。消费者表示常常都在等候接受新闻的顺序。画图的时候给它标记三个“C”:

                      澳门葡京备用网址 5

急需专注的是,生产者、消费者和代办并不必驻留于同一个服务器上边;平常情况下,他们是分局在不一致的机械下边包车型地铁。

参考小说

RabbitMQ急速入门

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但不可能单纯知道其为音信队列,信息代理更确切。RabbitMQ
是三个由 Erlang
语言开拓的AMQP(高等信息队列协议)的开源落成,其内部结构如下:

澳门葡京备用网址 6

RabbitMQ 内部结构

RabbitMQ作为四个消息代理,首要和消息交际,负担接收并转化新闻。RabbitMQ提供了保障的音讯机制、追踪机制和灵活的新闻路由,帮忙音信集群和分布式安排。适用于排队算法、秒杀活动、信息分发、异步管理、数据同步、管理耗费时间任务、CQ昂CoraS等接纳场景。

下边大家就来学学下RabbitMQ。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但无法只是精晓其为消息队列,音信代理更贴切。RabbitMQ
是三个由 Erlang
语言开拓的AMQP(高端音信队列协议)的开源达成,其内部结构如下:

澳门葡京备用网址 7

RabbitMQ作为2个信息代理,首要和消息张罗,担任接收并转化音讯。RabbitMQ提供了牢靠的新闻机制、追踪机制和灵活的音讯路由,帮忙消息集群和布满式安顿。适用于排队算法、秒杀活动、音讯分发、异步管理、数据同步、管理耗费时间职责、CQ汉兰达S等利用场景。

下边大家就来读书下RabbitMQ。

“Hello World“

使用.NET/C#客户端

在学科的那1有的,我们将用C#澳门葡京备用网址,写五个程序;1个发送一条音讯的劳动者和3个收下并打字与印刷音信的消费者。大家会略过.NET API的部分细节,仅仅专注于如何产生那一个轻巧的主次,传输内容为“Hello World”的新闻。

下图中,“P”表示生产者,“C”表示顾客。中间的正方,表示一个行列–RabbitMQ为买主提供的三个音讯缓存。

           澳门葡京备用网址 8

.NET
客户端库

RabbitMQ遵从AMQP协议,1个关于消息发送的绽开、通用协议。现在一度有广大例外编制程序语言完成的AMQP的客户端。咱们将应用由RabbitMQ提供的.NET客户端。

下载客户端库的包,遵照描述去检查它的签字。提取包的剧情,把“RabbitMQ.Client.dll”程序集复制到您的干活目录下。

您须求承认你的种类中能找到C#的编写翻译器csc.exe,你可能须求把目录“;C:\WINDOWS\Microsoft.NET\Framework\v三.伍”(须求依照你实在的装置处境,修改目录中.NET版本)增加到你的Path中。

 

当今大家早就有了.NET客户端的二进制造进程序,能够写些代码了。

发送(Sending)

                  澳门葡京备用网址 9

作者们把新闻的发送者称为Send.cs,音信的接收者称为Receive.cs。音讯发送者将一而再到RabbitMQ,发送一条音讯,然后退出。

在Send.cs中,大家要求运用一些命名空间:

1 using System;
2 using RabbitMQ.Client;
3 using System.Text;

设置类

1 class Send
2 {
3     public static void Main()
4     {
5         ...
6     }
7 }

接下来创制1个到RabbitMQ服务的链接

 1 class Send
 2 {
 3     public static void Main()
 4     {
 5         var factory = new ConnectionFactory() { HostName = "localhost" };
 6         using (var connection = factory.CreateConnection())
 7         {
 8             using (var channel = connection.CreateModel())
 9             {
10                 ...
11             }
12         }
13     }
14 }

该链接抽象了套接字连接,为大家关心协议版本协定和位置认证等。在此地,大家连年到了3个地点的代办,因此选取的是localhost。如若大家想连接到其余机器的代办,只须求轻巧在在这里钦命该机器的名称可能Ip地址就可以。

接下去我们创立信道(Channel),完结专门的职业的洋洋API都在内部。

为了发送音讯,我们亟须说澳优个队列,然后技艺将消息发表当中:

 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4 
 5 class Send
 6 {
 7     public static void Main()
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.QueueDeclare(queue: "hello",
14                                  durable: false,
15                                  exclusive: false,
16                                  autoDelete: false,
17                                  arguments: null);
18 
19             string message = "Hello World!";
20             var body = Encoding.UTF8.GetBytes(message);
21 
22             channel.BasicPublish(exchange: "",
23                                  routingKey: "hello",
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent {0}", message);
27         }
28 
29         Console.WriteLine(" Press [enter] to exit.");
30         Console.ReadLine();
31     }
32 }

表Bellamy(Bellamy)个系列是幂等的,它独自会在表明的队列不设有于RabbitMQ中时才被创制。音信的内容是字节数组,所以,你能够使用其余喜欢的艺术对音信实行编码。

RabbitMQ基础入门篇。当上边的代码运转甘休,信道和链接将会被释放。

Send.cs的完全代码在此间。

发送不能实践

若果那是你首先次利用RabbitMQ,而且未察看已发送的消息,你早宴会搔首猜疑是出了什么难题。恐怕是因为代理运维在未曾足够空闲硬盘空间的机械上(暗中认可情状下,代理需求至少50MB的悠闲空间)而招致拒绝接收新闻。通过查阅代理的日记文件能够确认,同时借使有至关重要下降该项限制。配备文件文书档案能够告诉你怎么着设置(disk_free_limit)。

接收

上边那多少个是关于发送者的。RabbitMQ将音信压入接收者,所以和揭发一条音讯的发送者不均等,大家要保持接收者一贯运营着去监听音讯然后打字与印刷出来。

                  澳门葡京备用网址 10

Receive.cs的代码要求引用和Send.cs中山大学多一样的命名空间:

1 using RabbitMQ.Client;
2 using RabbitMQ.Client.Events;
3 using System;
4 using System.Text;

类的设置和发送者是同1的;大家开发贰个链接和1个信道,申Bellamy(Bellamy)个大家将在消费的类别。注意队列需求和发送者中表达的队列相相配。

 1 class Receive
 2 {
 3     public static void Main()
 4     {
 5         var factory = new ConnectionFactory() { HostName = "localhost" };
 6         using (var connection = factory.CreateConnection())
 7         {
 8             using (var channel = connection.CreateModel())
 9             {
10                 channel.QueueDeclare(queue: "hello",
11                                      durable: false,
12                                      exclusive: false,
13                                      autoDelete: false,
14                                      arguments: null);
15                 ...
16             }
17         }
18     }
19 }

留目的在于此地表明的那几个行列。大家兴许在运营发送者前运营接收者,需求保险在消费这一个行列以前队列已经存在。

我们将要告诉RabbitMQ服务从哪个队列传递消息给我们。因为该队列将压入异步音讯给大家,大家提供回调。那就是伊夫ningBasicConsumer.Received事件管理器做的事务了。

 1 using RabbitMQ.Client;
 2 using RabbitMQ.Client.Events;
 3 using System;
 4 using System.Text;
 5 
 6 class Receive
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "hello",
15                                  durable: false,
16                                  exclusive: false,
17                                  autoDelete: false,
18                                  arguments: null);
19 
20             var consumer = new EventingBasicConsumer(channel);
21             consumer.Received += (model, ea) =>
22             {
23                 var body = ea.Body;
24                 var message = Encoding.UTF8.GetString(body);
25                 Console.WriteLine(" [x] Received {0}", message);
26             };
27             channel.BasicConsume(queue: "hello",
28                                  noAck: true,
29                                  consumer: consumer);
30 
31             Console.WriteLine(" Press [enter] to exit.");
32             Console.ReadLine();
33         }
34     }
35 }

总体的Receive.cs类在此处。

名词解析

P(Publisher):生产者
C(Consumer):消费者
Channel:信道
Queue:队列
Exchange:音信交流机

二. 条件搭建

正文主要根据Windows下使用Vs Code 基于.net
core实行demo演示。初步从前大家需求安不忘虞好之下条件。

  • 安装Erlang运营条件
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows开端开关,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理人身份运维。
  • 逐条试行以下命令运维RabbitMQ服务

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
  • 执行rabbitmqlctl status检查RabbitMQ状态
  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management就能够成功安装,使用默许账号密码(guest/guest)登陆http://localhost:15672/即可。

2. 条件搭建

正文重要遵照Windows下选拔Vs Code 基于.net
core举行demo演示。起头此前大家必要盘算好以下条件。

  • 安装Erlang运维情形
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows开端开关,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理员身份运营。
  • 逐一实践以下命令运转RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management就可以成功安装,使用私下认可账号密码(guest/guest)登六即可。

结缘在同步

您能够引用RabbitMQ的.NET客户端程序集,然后编写翻译Send.cs和Receive.cs。大家将动用命令行(cmd.exe和csc)编译和周转这个代码。只怕你能够应用Visual Studio。

1 $ csc /r:"RabbitMQ.Client.dll" Send.cs
2 $ csc /r:"RabbitMQ.Client.dll" Receive.cs

接下来运营可执行文件

1 $ Send.exe

跟着运维接收者

1 $ Receive.exe

收信人会打字与印刷从RabbitMQ接收到的音讯。接收者一向运转等待新音讯(使用Ctrl+C中止),所以能够品味在不一样的顶峰运营发送者。

假如您愿目的在于队列上做检讨,使用rabbitmqctl list_queues.

Hello World!

 

现行反革命是时候进入到第3部分去创制三个做事行列了。

 

初稿链接:

简易演示

消息发送端

static void Send()
{
    //1. 实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 声明队列
            channel.QueueDeclare(queue: "rabbitmq",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //5. 构建字节数据包
            var message = "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);

            //6. 发送数据包
            channel.BasicPublish(exchange: "",
                                 routingKey: "rabbitmq",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

新闻接收端

static void Receive()
{
    //1. 实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 声明队列
            channel.QueueDeclare(queue: "rabbitmq",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);

            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "rabbitmq",
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();

        }
    }
}

3. Hello RabbitMQ

在伊始在此之前我们先来询问下新闻模型:

澳门葡京备用网址 11

消息流

顾客(consumer)订阅某个队列。生产者(producer)创立音信,然后公布到行列(queue)中,队列再将消息发送到监听的顾客。

上面我们大家经过demo来通晓RabbitMQ的中坚用法。

3. Hello RabbitMQ

在起首以前大家先来询问下信息模型:
澳门葡京备用网址 12
顾客(consumer)订阅有些队列。生产者(producer)创建新闻,然后揭橥到行列(queue)中,队列再将新闻发送到监听的顾客。

上边大家大家经过demo来领会RabbitMQ的着力用法。

轮询调整

P生产的八个任务进入到行列中,三个C间能够并行管理职分。暗中同意景况下,RabbitMQ把音信按顺序发送给每2个C。平均各类C将获得同样数量的音信。

叁.一.音讯的出殡和吸收

开创RabbitMQ文件夹,伸开命令提醒符,分别创设三个调整台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

大家先来添加新闻发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音讯接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运转消息接收端,再运维音讯发送端,结果如下图。

澳门葡京备用网址 13

运维结果

从地点的代码中得以看来,发送端和消费端的代码前肆步都以均等的。首要的区分在于发送端调用channel.BasicPublish情势发送消息;而接收端要求实例化八个EventingBasicConsumer实例来拓展音信管理逻辑。其它一些索要留意的是:音信接收端和出殡和埋葬端的队列名称(queue)必须保持壹致,这里钦点的队列名为hello。

三.1.新闻的出殡和抽出

创办RabbitMQ文件夹,展开命令提醒符,分别创设多少个调控台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

作者们先来增添音讯发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音信接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运营信息接收端,再运维新闻发送端,结果如下图。

澳门葡京备用网址 14

从地点的代码中得以看到,发送端和消费端的代码前四步都以均等的。首要的区分在于发送端调用channel.BasicPublish艺术发送音讯;而接收端要求实例化二个EventingBasicConsumer实例来开始展览音信管理逻辑。此外一些供给注意的是:音信接收端和发送端的队列名称(queue)必须保持一致,这里钦赐的行列名字为hello。

消息确认

服从最简易的示范来讲,音讯一旦发送到C中,则该音讯就能从队列中移除。一旦中间新闻管理卓殊/退步,C端程序退出等,都将会招致新闻未管理完毕,而此时队列中已将新闻移除了,那么就能够导致一名目许多的标题。大家得以在C端设置手动确认信息,从而化解上述难点的发生。
Receive代码块

//6. 绑定消息接收后的事件委托
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine(" [x] Received {0}", message);
    Thread.Sleep(5000);//模拟耗时
    Console.WriteLine(" [x] Done");

    // 发送信息确认信号(手动信息确认)
    channel.BasicAck(ea.DeliveryTag, false);
};
//7. 启动消费者
/*
 autoAck参数属性
    true:自动信息确认,当C端接收到信息后,自动发送ack信号,不管信息是否处理完毕
    false:关闭自动信息确认,通过调用BasicAck方法手动进行信息确认
 */
channel.BasicConsume(queue: "rabbitmq",
                     autoAck: false,
                     consumer: consumer);

叁.2. 循环调解

选用工作行列的好处便是它亦可互为的拍卖队列。假使堆成堆了重重任务,我们只需求增添越多的劳力(workers)就足以了。大家先运营两个接收端,等待音讯接收,再开发银行叁个出殡和埋葬端举办新闻发送。

澳门葡京备用网址 15

音信分发

我们扩大运转三个消费端后的运作结果:

澳门葡京备用网址 16

巡回调整

从图中可见,我们循环境与发展送四条新闻,五个音讯接收端按梯次被循环分配。
私下认可情况下,RabbitMQ将按梯次将每条音讯发送给下二个顾客。平均每种消费者将收获同等数量的音信。那种分发音讯的诀要叫做循环(round-robin)。

3.贰. 巡回调节

利用职业行列的裨益正是它亦可互为的拍卖队列。假设堆集了众多职分,大家只要求加上越来越多的工小编(workers)就能够了。大家先运转七个接收端,等待音信接收,再开发银行一个出殡和埋葬端进行音信发送。

澳门葡京备用网址 17

大家增添运转1个消费端后的运行结果:

澳门葡京备用网址 18

从图中可见,大家循环境与发展送四条音讯,七个音讯接收端按顺序被循环分配。
默许意况下,RabbitMQ将按梯次将每条新闻发送给下三个主顾。平均每一个顾客将得到同等数量的消息。那种分发新闻的章程叫做循环(round-robin)。

音信持久化

当RabbitMQ退出或死机时会清空队列和音信。通过将队列和音信符号为持久的,来报告RabbitMQ将新闻持久化。

Send代码块

//4. 声明队列
//durable设置为true,表示此队列为持久的。
//注意:RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,所以你需要重启服务/更改队列名称
channel.QueueDeclare(queue: "rabbitmq",
                     durable: true, //标记队列持久
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);
//设置IbasicProperties.SetPersistent属性值为true来标记我们的消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

//5. 构建字节数据包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

//6. 发送数据包
channel.BasicPublish(exchange: "",
                     routingKey: "rabbitmq",
                     basicProperties: properties, //指定BasicProperties
                     body: body);

3.3. 音讯确认

遵从我们地点的demo,一旦RabbitMQ将音讯发送到消费端,新闻就能够登时从内存中移出,无论消费端是或不是管理到位。在那种场所下,消息就能丢掉。

为了确认保障三个消息永久不会丢掉,RabbitMQ援救音信确认(message
acknowledgments)
。当消费端接收音信还要管理落成后,会发送3个ack(信息确认)非非确定性信号到RabbitMQ,RabbitMQ接收到那个时限信号后,就能够去除掉那条已经管理的新闻职责。但万1消费端挂掉了(举个例子,通道关闭、连接丢失等)没有发送ack时域信号。RabbitMQ就可以领会有些音讯尚未健康管理,RabbitMQ将会重新将消息入队,假如有此外三个成本端在线,就能够急速的重复发送到其它贰个消费端。

RabbitMQ中未有音信超时的定义,唯有当消费端关闭或奔溃时,RabbitMQ才会再也分发新闻。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

第三退换的是将
autoAck:true修改为autoAck:fasle,以及在消息管理实现后手动调用BasicAck办法开始展览手动音讯确认。

澳门葡京备用网址 19

从图中可知,音信发送端连接发送四条消息,其中消费端1先被分配管理第3条音信,消费端2被循环分配第三条消息,第1条信息由于并没有空闲消费者依旧在队列中。
在开销端二未管理完第3条音信此前,手动中断(ctrl+c)。大家得以窥见RabbitMQ在下一回分发时,会预先将被中止的新闻分发给消费端一甩卖。

三.三. 新闻确认

依据大家地方的demo,1旦RabbitMQ将音讯发送到消费端,音讯就能够立刻从内部存款和储蓄器中移出,无论消费端是不是管理到位。在那种状态下,音信就能丢掉。

为了确认保障3个音信永久不会丢掉,RabbitMQ扶助新闻确认(message
acknowledgments)
。当消费端接收音信还要管理实现后,会发送3个ack(音信确认)功率信号到RabbitMQ,RabbitMQ接收到这么些复信号后,就足以去除掉那条已经管理的新闻职务。但假诺消费端挂掉了(举个例子,通道关闭、连接丢失等)未有发送ack实信号。RabbitMQ就能知道某些音信尚未例行处理,RabbitMQ将会重新将新闻入队,假设有其它八个开销端在线,就能神速的再度发送到其余一个消费端。

RabbitMQ中尚无消息超时的定义,只有当消费端关闭或奔溃时,RabbitMQ才会再也分发新闻。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

根本更换的是将
autoAck:true修改为autoAck:fasle,以及在消息管理实现后手动调用BasicAck方法开始展览手动音信确认。

澳门葡京备用网址 20

从图中可见,音信发送端连接发送四条消息,当中消费端一先被分配管理第二条消息,消费端二被循环分配第叁条音信,第3条新闻由于并未有空闲消费者如故在队列中。
在开支端2未管理完第二条新闻从前,手动中断(ctrl+c)。大家得以窥见RabbitMQ在下1回分发时,会事先将被暂停的消息分发给消费端1甩卖。

公平级调动度

上述示范中,倘使队列中设有两个音讯,在展开多少个C的景况下,只有一个C忙个不停,其它的却一向处在空闲状态。通过调用BasicQos,告知RabbitMQ在有个别C音讯管理落成,并且已经接到消息确认之后,才得以持续发送音讯到这么些C。不然,将会把音讯分发到此外空闲的C。

Receive代码块

//4. 声明队列
channel.QueueDeclare(queue: "rabbitmq",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);
////设置prefetchCount为1来告知RabbitMQ在未收到消费端的消息确认时,不再分发消息
channel.BasicQos(prefetchSize: 0,
                 prefetchCount: 1,
                 global: false);

叁.四. 音信持久化

音讯确认确定保障了固然消费端相当,音信也不会丢掉能够被再一次分发管理。但是要是RabbitMQ服务端非常,消息依旧会丢掉。除非大家钦定durable:true,否则当RabbitMQ退出或奔溃时,消息将依旧会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将新闻持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将音信标志为持久性无法一心保证消息不会丢掉。固然它告诉RabbitMQ将音信保存到磁盘,可是当RabbitMQ接受新闻还要还尚未保留时​​,照旧有五个异常的短的时日窗口。RabbitMQ
大概只是将音信保存到了缓存中,并未将其写入到磁盘上。持久化是无法确定保障的,可是对于三个简单易行职务队列来讲已经足足。假若急需确认保证消息队列的持久化,能够选拔publisher
confirms.

三.四. 音信持久化

音讯确认确认保障了就算消费端万分,音讯也不会丢掉能够被再次分发管理。不过如若RabbitMQ服务端至极,音信依旧会丢掉。除非我们钦命durable:true,不然当RabbitMQ退出或奔溃时,音信将依旧会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将信息持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将信息标识为持久性不能一心保险新闻不会丢掉。即便它告诉RabbitMQ将信息保存到磁盘,不过当RabbitMQ接受信息还要还向来不保留时​​,照旧有三个异常的短的年月窗口。RabbitMQ
只怕只是将音讯保存到了缓存中,并从未将其写入到磁盘上。持久化是不能肯定保证的,然而对于五个简单任务队列来说早已够用。假诺急需保障音信队列的持久化,能够应用publisher
confirms.

发布/订阅

上述中的演示,P推送消息至队列中,C从队列中管理消息。但是一旦须要将P推送的音信至每种订阅的C中拍卖音讯,那么我们就足以使用Exchange。

三.伍. 公平分发

RabbitMQ的音信分发暗中认可依据消费端的多寡,按顺序循环分发。那样仅是确定保障了消费端被平均分发音讯的数目,但却忽略了消费端的闲忙情形。那就也许出现某些消费端直接管理耗费时间职务处于阻塞状态,有些消费端直接处理一般职务处于空置状态,而只是它们分配的职责数量一样。

澳门葡京备用网址 21

但大家得以经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来报告RabbitMQ,在未收到消费端的音讯确认时,不再分发消息,也就保证了当消费端处于辛勤景色时,不再分配职分。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

此时你需求留意的是借使全数的消费端都地处费劲景色,你的类别恐怕会被塞满。你必要留意那或多或少,要么增加更加多的消费端,要么使用其余战略。

叁.伍. 正义分发

RabbitMQ的新闻分发默许依据消费端的数量,按顺序循环分发。那样仅是保障了消费端被平均分发音讯的数额,但却忽略了消费端的闲忙景况。那就或者出现有些消费端直接管理耗费时间职责处于阻塞状态,有个别消费端直接管理一般职责处于空置状态,而只是它们分配的天职位数量量一样。

澳门葡京备用网址 22

但大家得以由此channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来报告RabbitMQ,在未抽出消费端的消息确认时,不再分发新闻,也就确定保证了当消费端处于劳碌景观时,不再分配职分。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

此刻你供给小心的是只要持有的消费端都处于艰辛景观,你的队列或者会被塞满。你须求小心那或多或少,要么增多更加多的消费端,要么使用此外战术。

fanout(将音讯分发到exchange上绑定的装有队列上)

Send代码块

//1. 实例化连接工厂
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立连接
using (var connection = factory.CreateConnection())
{
    //3. 创建信道
    using (var channel = connection.CreateModel())
    {
        //4. 声明信息交换机
        channel.ExchangeDeclare(exchange: "fanoutDemo",
                                type: "fanout");

        //5. 构建字节数据包
        var message = "Hello RabbitMQ!";
        var body = Encoding.UTF8.GetBytes(message);

        //6. 发布到指定exchange,fanout类型的会忽视routingKey的值,所以无需填写
        channel.BasicPublish(exchange: "fanoutDemo",
                             routingKey: "",
                             basicProperties: null,
                             body: body);

        Console.WriteLine(" [x] Sent {0}", message);
    }
}

Receive代码块

//1. 实例化连接工厂
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立连接
using (var connection = factory.CreateConnection())
{
    //3. 创建信道
    using (var channel = connection.CreateModel())
    {
        //4. 声明信息交换机
        channel.ExchangeDeclare(exchange: "fanoutDemo",
                                type: "fanout");
        //生成随机队列名称
        var queueName = channel.QueueDeclare().QueueName;
        //绑定队列到指定fanout类型exchange
        channel.QueueBind(queue: queueName,
                          exchange: "fanoutDemo",
                          routingKey: "");

        //5. 构造消费者实例
        var consumer = new EventingBasicConsumer(channel);

        //6. 绑定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
        };

        channel.BasicConsume(queue: queueName,
                             autoAck: true,
                             consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

4. Exchange

仔细的你大概开掘上边的demo,生产者和消费者直接是经过同样队列名称举行相称衔接的。消费者订阅有些队列,生产者创立消息发布到行列中,队列再将音信转载到订阅的买主。那样就能够有三个局限性,即消费者2次只可以发送音讯到某3个系列。

那消费者哪些手艺发送音讯到三个音讯队列呢?
RabbitMQ提供了Exchange,它就好像于路由器的功效,它用于对音讯进行路由,将新闻发送到八个体系上。Exchange1方面从劳动者接收音讯,另1方面将新闻推送到行列。但exchange必须精晓怎么样处理接收到的新闻,是将其附加到一定队列依然外加到多个体系,还是一直忽略。而这几个规则由exchange
type定义,exchange的规律如下图所示。

澳门葡京备用网址 23

Exchange

大面积的exchange type 有以下两种:

  • direct(明确的路由规则:消费端绑定的行列名称必须和音信表露时钦点的路由名称壹致)
  • topic (格局相配的路由规则:援助通配符)
  • fanout (音信广播,将音信分发到exchange上绑定的有着队列上)

下面大家就来千家万户那介绍它们的用法。

4. Exchange

细心的你只怕开掘上边的demo,生产者和顾客直接是通过一样队列名称实行相称衔接的。消费者订阅有些队列,生产者创制音讯揭露到行列中,队列再将音信转载到订阅的买主。那样就能够有贰个局限性,即消费者一回只好发送音讯到某贰个行列。

那消费者如何才具发送消息到多少个音信队列呢?
RabbitMQ提供了Exchange,它就像于路由器的功效,它用来对消息进行路由,将音讯发送到多少个种类上。Exchange1方面从劳动者接收新闻,另一方面将消息推送到行列。但exchange必须清楚哪些管理接收到的新闻,是将其附加到特定队列依然增大到三个连串,依然直接忽略。而这一个规则由exchange
type定义,exchange的规律如下图所示。
澳门葡京备用网址 24

普及的exchange type 有以下两种:

  • direct(鲜明的路由规则:消费端绑定的行列名称必须和消息透露时钦赐的路由名称一样)
  • topic (格局相称的路由规则:帮忙通配符)
  • fanout (音讯广播,将信息分发到exchange上绑定的具备队列上)

下边大家就来家家户户那介绍它们的用法。

direct(C绑定的队列名称须和P发表内定的路由名称壹致)

Send代码块

//4. 声明信息交换机
channel.ExchangeDeclare(exchange: "directDemo",
                        type: "direct");

//5. 构建字节数据包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

//6. 发布到指定exchange
channel.BasicPublish(exchange: "directDemo",
                     routingKey: "a",
                     basicProperties: null,
                     body: body);

Receive代码块

//4. 声明信息交换机
channel.ExchangeDeclare(exchange: "directDemo",
                        type: "direct");
//生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//绑定队列到指定direct类型exchange
channel.QueueBind(queue: queueName,
                  exchange: "directDemo",
                  routingKey: "b");

4.1 fanout

本着安分守己的讨论,我们先来掌握下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的新闻都会散发到独具绑定该exchange的行列上去。

澳门葡京备用网址 25

fanout 路由体制

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

顾客示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

4.1 fanout

本着由表及里的思想,我们先来掌握下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的音讯都会散发到具有绑定该exchange的队列上去。

澳门葡京备用网址 26

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

消费者示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

topic(扶助通配符的路由规则)

通配字符:

  • *:匹配2个单词
  • #:相配0个或多个单词
  • .:仅作为分隔符

Send代码块

//4. 声明信息交换机
channel.ExchangeDeclare(exchange: "topicDemo",
                        type: "topic");

//5. 构建字节数据包
var message = "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

//6. 发布到指定exchange
channel.BasicPublish(exchange: "topicDemo",
                     routingKey: "admin.user.error", //模拟后台用户错误
                     basicProperties: null,
                     body: body);

Receive代码块

//4. 声明信息交换机
channel.ExchangeDeclare(exchange: "topicDemo",
                        type: "topic");
//生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//绑定队列到指定topic类型exchange
channel.QueueBind(queue: queueName,
                  exchange: "topicDemo",
                  routingKey: "admin.*.#"); //订阅所有后台异常错误

4.2. direct

direct相对于fanout就属于完全匹配、单播的形式,路由体制如下图,即队列名称和音讯发送时钦命的路由完全合作时,新闻才会发送到内定队列上。

澳门葡京备用网址 27

direct路由体制

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

顾客示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

4.2. direct

direct相对于fanout就属于完全协作、单播的格局,路由体制如下图,即队列名称和音信发送时内定的路由完全相称时,新闻才会发送到内定队列上。
澳门葡京备用网址 28

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

顾客示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

RPC(远程进度调用)

  1. 进展长距离调用的客户端需求钦命接收远程回调的队列,并注脚消费者监听此行列。
  2. 长途调用的服务端除了要表明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的系列中去。

客户端代码块

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        var correlationId = Guid.NewGuid().ToString();
        var replyQueue = channel.QueueDeclare().QueueName;

        var properties = channel.CreateBasicProperties();
        properties.ReplyTo = replyQueue;
        properties.CorrelationId = correlationId;

        string number = args.Length > 0 ? args[0] : "30";
        var body = Encoding.UTF8.GetBytes(number);
        //发布消息
        channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);

        Console.WriteLine($"[*] Request fib({number})");

        // //创建消费者用于消息回调
        var callbackConsumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);

        callbackConsumer.Received += (model, ea) =>
        {
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";

                Console.WriteLine($"[x]: {responseMsg}");
            }
        };

        Console.ReadLine();

    }
}

劳务端代码块

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var conection = factory.CreateConnection())
    {
        using (var channel = conection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
                exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            Console.WriteLine("[*] Waiting for message.");

            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                int n = int.Parse(message);
                Console.WriteLine($"Receive request of Fib({n})");
                int result = Fib(n);

                var properties = ea.BasicProperties;
                var replyProerties = channel.CreateBasicProperties();
                replyProerties.CorrelationId = properties.CorrelationId;

                channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
                    basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));

                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine($"Return result: Fib({n})= {result}");

            };
            channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

            Console.ReadLine();
        }
    }

}

private static int Fib(int n)
{
    if (n == 0 || n == 1)
    {
        return n;
    }
    return Fib(n - 1) + Fib(n - 2);
}

4.3. topic

topic是direct的进级换代版,是一种形式匹配的路由机制。它帮衬接纳二种通配符来开始展览形式相配:符号#和符号*。其中*同盟2个单词,
#则意味着相配0个或两个单词,单词之间用.分割。如下图所示。

澳门葡京备用网址 29

topic路由体制

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

买主示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

4.3. topic

topic是direct的晋升版,是壹种方式匹配的路由机制。它援救使用两种通配符来进展方式相配:符号#和符号*。其中*合作三个单词,
#则意味着相配0个或五个单词,单词之间用.细分。如下图所示。
澳门葡京备用网址 30

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

买主示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

5. RPC

RPC——Remote Procedure Call,远程进程调用。
这RabbitMQ怎么着进行远程调用呢?暗指图如下:

澳门葡京备用网址 31

RPC机制

先是步,首借使拓展远程调用的客户端必要钦赐接收远程回调的系列,并声明消费者监听此行列。
其次步,远程调用的服务端除了要阐明消费端接收远程调用请求外,还要将结果发送到客户端用来监听回调结果的队列中去。

长距离调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

长距离调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

5. RPC

RPC——Remote Procedure Call,远程进程调用。
那RabbitMQ怎样进展长途调用呢?暗暗提示图如下:
澳门葡京备用网址 32
率先步,首若是打开长途调用的客户端须求内定接收远程回调的行列,并表达消费者监听此行列。
第一步,远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的体系中去。

远程调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

远程调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

6. 总结

听大人讲上边包车型地铁demo和对两种差别exchange路由体制的求学,我们开掘RabbitMQ首假如关系到以下几个基本概念:

  1. Publisher:生产者,音信的发送方。
  2. Connection:互连网连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:调换器(路由器),担任信息的路由到对应队列。
  5. Binding:队列与沟通器间的涉嫌绑定。消费者将关心的种类绑定到钦命交流器上,以便Exchange能准确分发音信到钦点队列。
  6. Queue:队列,音讯的缓冲存储区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供能源的逻辑分组和分手。包含连接,沟通,队列,绑定,用户权限,战术等。
  8. Broker:音讯队列的服务器实体。
  9. Consumer:消费者,音信的接收方。

此次作为入门就讲到这里,下次我们来教学下EventBus +
RabbitMQ
怎么样促成事件的分发。

参考资料:
RabbitMQ
Tutorials
Demo路径——RabbitMQ

6. 总结

根据上面包车型地铁demo和对三种不一样exchange路由体制的求学,大家开采RabbitMQ首假诺事关到以下几个核心概念:

  1. Publisher:生产者,音信的发送方。
  2. Connection:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:调换器(路由器),担负音讯的路由到对应队列。
  5. Binding:队列与交流器间的涉及绑定。消费者将关爱的队列绑定到钦定交流器上,以便Exchange能准确分发音讯到钦点队列。
  6. Queue:队列,音讯的缓冲存款和储蓄区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供能源的逻辑分组和分手。包括连接,交流,队列,绑定,用户权限,计谋等。
  8. Broker:音信队列的服务器实体。
  9. Consumer:消费者,音讯的接收方。

这一次作为入门就讲到这里,下次大家来教学下EventBus +
RabbitMQ
何以得以达成事件的散发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*
*
Website