【澳门葡京备用网址】新闻队列

RabbitMQ队列

先是大家在讲rabbitMQ在此以前我们要说一下python里的queue:二者干的事务是一律的,都以队列,用于传递新闻

在python的queue中有多少个三个是线程queue,八个是进度queue(multiprocessing中的queue)。线程queue不能跨进程,用于五个线程之间展开数据同步交互;进度queue只是用来父进度与子进度,或然同属于同意父进程下的多少个子进程举办交互。也便是说假设是多少个精光独立的主次,固然是python程序,也依然不能用这么些进程queue来通讯。那如果我们有五个单身的python程序,分属于八个进程,恐怕是python和其余语言

安装:windows下

首先供给安装 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

接下来安装RabbitMQ了 

率先下载RabbitMQ 的Windows版本

下载地址:

安装pika:

从前设置过了pip,直接打开cmd,运行pip install pika

安装完成之后,达成贰个最简便的队列通讯:

澳门葡京备用网址 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创立三当中坚的socket,然后建立贰个管道,在管道中发消息,然后声雅培(Abbott)个queue,起个连串的名字,之后真正的发音信(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一启动就径直运转下去,他不停收一条,永远在此处卡住。

在上边不管是produce依旧consume,里面都宣称了七个queue,这么些是怎么吧?因为大家不明了是顾客先开首运营依旧生产者先运维,那样假诺没有评释的话就会报错。

上面大家来看一下一对多,即1个劳动者对应多个买主:

第叁大家运维二个买主,然后不断的用produce去发送数据,大家能够观望顾客是通过一种轮询的艺术展开连发的承受多少,各类顾客消费叁个。

那就是说只要大家顾客接受了音讯,然后处理那些音讯供给30分钟,在处理的经过中,消费者断电了宕机了,那消费者还未曾处理完,大家设那一个任务大家无法不处理完,那大家应有有三个承认的音信,说那一个任务成功了也许是从未有过成功,所以本人的生产者要认可消费者是或不是把这几个职务处理完了,消费者处理完之后要给那几个生产者服务器端发送多个承认音信,生产者才会把这些职分从音信队列中剔除。若是没有处理完,消费者宕机了,没有给劳动者发送确认新闻,那就代表尚未处理完,那咱们看看rabbitMQ是怎么处理的

我们得以在顾客的callback中添加一个time.sleep()进行效仿宕机。callback是八个回调函数,只要事件一触发就会调用那些函数。函数执行完了就表示音信处理完了,如若函数没有处理完,那就认证。。。。

大家能够见到在顾客代码中的basic_consume()中有3个参数叫no_ack=True,这么些意思是那条音信是不是被拍卖完都不会发送确认新闻,一般大家不加这些参数,rabbitMQ私下认可就会给你设置成音讯处理完了就活动发送确认,大家明日把那几个参数去掉,并且在callback中添加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

澳门葡京备用网址 2澳门葡京备用网址 3澳门葡京备用网址 4

运维的结果就是,笔者先运维叁次生产者,数据被消费者1吸收到了,不过本身把消费者1宕机,结束运转,那么消费者2就接受了新闻,即倘使消费者没有发送确认音信,生产者就不会把音讯删除。

RabbitMQ音讯持久化:

大家能够变更好多的音讯队列,那大家怎么查看音信队列的动静呢:rabbitmqctl.bat
list_queues

澳门葡京备用网址 5

现行反革命的图景是,新闻队列中还有音信,不过服务器宕机了,那那几个音讯就丢了,那笔者就必要那个消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次评释队列的时候添加贰个durable参数(客户端和劳务器端都要足够那几个参数),

澳门葡京备用网址 6

在这些意况下,大家把rabbitMQ服务珍视启,发现只有队列名留下了,不过队列中的新闻并未了,那样大家还亟需在劳动者basic_publish中添加叁个参数:properties

澳门葡京备用网址 ,producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

那样就能够使得新闻持久化

昨天是2个劳动者对应多少个顾客,很公道的收发收发,不过事实上的气象是,大家机器的配备是不等同的,有的配置是单核的一部分配置是多核的,大概i7处理器处理4条消息的时候和其余的处理器处理1条音讯的小运基本上,那差的总结机那里就会堆积音信,而好的电脑那里就会形成闲置,在具体中做运营的,大家会在负载均衡中装置权重,什么人的安顿高权重高,任务就多一些,可是在rabbitMQ中,我们只做了三个简便的处理就足以兑现公道的信息分发,你有多大的能力就处理多少音讯

即:server端给客户端发送音讯的时候,先反省未来还有多少音讯,假若当前消息尚未处理实现,就不会发送给那几个消费者音讯。假使当前的消费者绝非音讯就发送

以此只须要在顾客端进行修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在云谲风诡1个consume2,在callback中sleep20秒来效仿

澳门葡京备用网址 7澳门葡京备用网址 8澳门葡京备用网址 9

笔者先运营八个produce,被consume接受,然后在起步1个,就被consumer2接受,不过因为consumer第22中学sleep20秒,处理慢,所以此时在开发银行produce,就又给了consume实行拍卖

 

python学习之RabbitMQ—–新闻队列,

RabbitMQ队列

前言:本次整治写一篇有关rabbitMQ的博客,相比较上一篇redis,感觉rabbitMQ难度是增高不少。那篇博客会插入一些英文讲解,不过不难精通的。rabbitMQ的下载与安装,请参考redis&rabbitMQ安装。

Publish\Subscrible(信息公布\订阅)

前边都以1对1的出殡和埋葬接收数据,那小编想1对多,想广播一样,生产者发送三个新闻,全数顾客都接受音信。那大家咋办啊?那一个时候我们将要用到exchange了

exchange在一端收消息,在另一端就把消息放进queue,exchange必须精确的明白收到的音信要干什么,是否合宜发到三个一定的queue依旧发给许多queue,可能说把她放弃,这几个都被exchange的门类所定义

exchange在概念的时候是有档次的,以决定到底是那多少个queue符合条件,能够承受音信:

fanout:全数bind到此exchange的queue都足以承受信息

direct:通过rounroutingKey和exchange决定的可怜唯一的queue能够收起音信

topic:全数符合routingKey的routingKey所bind的queue能够承受消息

headers:通过headers来控制把新闻发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange此前是空的,现在赋值log;在那里也远非注明queue,广播不供给写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里我们有定义了一个queue,注意一下评释中的内容。可是大家在发送端没有评释queue,为啥发送端不供给接收端必要吗?在consume里有1个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要1个queue_name

运行结果:

澳门葡京备用网址 10澳门葡京备用网址 11澳门葡京备用网址 12澳门葡京备用网址 13

就一定于收音机一样,实时播报,打开四个顾客,生产者发送一条数据,然后二个买主同时收到到

RabbitMQ队列

首先大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的业务是如出一辙的,都是队列,用于传递消息

在python的queue中有五个一个是线程queue,二个是进程queue(multiprocessing中的queue)。线程queue不能够跨进度,用于三个线程之间开始展览多少同步交互;进度queue只是用来父进度与子进度,可能同属于同意父进度下的五个子进程举办相互。也正是说假设是多少个完全独立的先后,固然是python程序,也依旧不可见用这几个历程queue来通讯。那如若我们有五个独立的python程序,分属于多少个经过,大概是python和其它语言

安装:windows下

率先需求设置 Erlang环境 官网: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

事先设置过了pip,直接打开cmd,运行pip install pika

设置收尾之后,完成一个最简易的队列通讯:

澳门葡京备用网址 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创制3个为主的socket,然后建立二个管道,在管道中发音讯,然后声Bellamy个queue,起个类别的名字,之后真正的发信息(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运行就径直运转下去,他连连收一条,永远在此间卡住。

在上边不管是produce照旧consume,里面都宣称了一个queue,那么些是为啥呢?因为大家不领会是消费者先先河运维依旧生产者先运转,那样假若没有证明的话就会报错。

上面大家来看一下一对多,即贰个劳动者对应多少个顾客:

率先我们运维3个买主,然后不断的用produce去发送数据,我们能够看出顾客是由此一种轮询的不二法门开始展览连发的承受多少,各个顾客消费三个。

那么一旦大家顾客收到了音信,然后处理那些信息须要30分钟,在拍卖的进度中,消费者断电了宕机了,那消费者还一贯不处理完,大家设那几个职分我们必须处理完,那我们应有有三个认可的新闻,说那么些职分成功了或许是不曾形成,所以小编的生产者要肯定消费者是或不是把那几个职分处理完了,消费者处理完之后要给那个生产者服务器端发送二个认可音讯,生产者才会把那几个职分从音讯队列中删除。若是没有处理完,消费者宕机了,没有给劳动者发送确认音信,那就象征不曾处理完,这大家看看rabbitMQ是怎么处理的

大家能够在顾客的callback中添加一个time.sleep()举办模拟宕机。callback是二个回调函数,只要事件一触发就会调用这些函数。函数执行完了就意味着新闻处理完了,若是函数没有拍卖完,那就表达。。。。

咱俩得以看来在消费者代码中的basic_consume()中有3个参数叫no_ack=True,这几个意思是那条消息是不是被拍卖完都不会发送确认新闻,一般大家不加那个参数,rabbitMQ私下认可就会给你设置成信息处理完了就机关发送确认,大家今后把那些参数去掉,并且在callback中添加一句话运转:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

澳门葡京备用网址 15澳门葡京备用网址 16澳门葡京备用网址 17

运转的结果便是,小编先运维三遍生产者,数据被消费者1接受到了,可是本身把消费者1宕机,甘休运转,那么消费者2就收到了信息,即只要消费者绝非发送确认音信,生产者就不会把音信删除。

RabbitMQ消息持久化:

笔者们得以生成好多的音讯队列,那我们怎么查看新闻队列的景色吗:rabbitmqctl.bat
list_queues

澳门葡京备用网址 18

未来的境况是,音信队列中还有新闻,但是服务器宕机了,那那么些新闻就丢了,那自个儿就必要这一个音信强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次注脚队列的时候添加一个durable参数(客户端和服务器端都要增进那几个参数),

澳门葡京备用网址 19

在这一个情形下,大家把rabbitMQ服务注重启,发现只有队列名留下了,可是队列中的消息尚未了,那样大家还索要在劳动者basic_publish中添加1个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此就足以使得音信持久化

现行反革命是二个劳动者对应八个买主,很公道的收发收发,可是实际上的意况是,大家机器的铺排是区别的,有的配置是单核的局地配置是多核的,大概i7处理器处理4条音讯的时候和其余的微处理器处理1条新闻的年月大多,那差的处理器那里就会堆积新闻,而好的总括机这里就会形成闲置,在实际中做运行的,大家会在负载均衡中装置权重,何人的布局高权重高,义务就多一点,然而在rabbitMQ中,大家只做了3个简单易行的处理就足以兑现公正的音信分发,你有多大的能力就处理多少音信

即:server端给客户端发送消息的时候,先反省今后还有多少音信,假设当前音信没有处理完成,就不会发送给这一个消费者音讯。假诺当前的主顾绝非音讯就发送

【澳门葡京备用网址】新闻队列。这些只供给在消费者端实行改动加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在转变1个consume2,在callback中sleep20秒来效仿

澳门葡京备用网址 20澳门葡京备用网址 21澳门葡京备用网址 22

自家先运转多少个produce,被consume接受,然后在开发银行一个,就被consumer2接受,可是因为consumer第22中学sleep20秒,处理慢,所以此时在运转produce,就又给了consume进行处理

 

rabbitMQ是新闻队列;想想从前的大家学过队列queue:threading
queue(线程queue,三个线程之间实行数量交互)、进度queue(父进程与子进度展开互动可能同属于同一父进程下的多少个子进度展开互相);假若多少个独立的次序,那么之间是不能够经过queue进行交互的,那时候大家就要求五当中级代理即rabbitMQ

rabbitMQ是新闻队列;想想从前的大家学过队列queue:threading
queue(线程queue,八个线程之间举办数量交互)、进程Queue(父进度与子进度展开互动大概同属于同一父进程下的多少个子进度展开互相);假如三个独立的次序,那么之间是不可能经过queue进行交互的,那时候大家就须求四个中级代理即rabbitMQ.

有取舍的选拔音信(exchange_type = direct)

RabbitMQ还帮衬依照首要字发送,即:队列绑定关键字,发送者将数据依照重庆大学字发送到信息exchange,exchange依照重庆大学字判定应该将数据发送到钦定的系列

澳门葡京备用网址 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

一发密切的过滤(exchange_type=topic)

澳门葡京备用网址 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都以服务器端发音信,客户端收音讯,信息流是单向的,那假若我们想要发一条命令给长途的客户端去实践,然后想让客户端执行的结果重临,则这种形式叫做rpc

RabbitMQ RPC

澳门葡京备用网址 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身一个不通形式,没有音讯就等候新闻,有信息就收过来

self.connection.process_data_events()是三个非阻塞版的start_consuming,正是说发了3个事物给客户端,每过一点光阴去反省有没有音讯,假如没有音讯,能够去干其他业务

reply_to = self.callback_queue是用来采纳反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第③在客户端会通过uuid4生成,第③在劳务器端再次来到执行结果的时候也会传过来四个,所以说假设服务器端发过来的correlation_id与和睦的id相同
,那么服务器端发出来的结果就肯定是自个儿正要客户端发过去的通令的执行结果。未来就三个劳动器端二个客户端,无所谓缺人不确认。未来客户端是非阻塞版的,大家能够不让它打印没有消息,而是举办新的下令,那样就两条新闻,不必然按顺序达成,那大家就要求去肯定各类再次回到的结果是哪些命令的执行结果。

总体的格局是那般的:生产者发了3个限令给顾客,不掌握客户端几时回来,依旧要去收结果的,可是它又不想进入阻塞形式,想每过一段时间看那个音讯收回来没有,假使音讯收回来了,就表示收完了。 

运行结果:

澳门葡京备用网址 26澳门葡京备用网址 27

劳动器端开启,然后在开发银行客户端,客户端先是等待音讯的出殡和埋葬,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(新闻发表\订阅)

前边都以1对1的出殡和埋葬接收数据,那作者想1对多,想广播一样,生产者发送贰个音信,全部顾客都接受音讯。那大家怎么办啊?这些时候大家将要用到exchange了

exchange在一端收新闻,在另一端就把音信放进queue,exchange必须精确的掌握收到的音讯要干什么,是不是合宜发到1个一定的queue依旧发给许多queue,只怕说把她放任,这几个都被exchange的花色所定义

exchange在概念的时候是有档次的,以决定到底是那多少个queue符合条件,能够承受消息:

fanout:全部bind到此exchange的queue都足以承受音信

direct:通过rounroutingKey和exchange决定的可怜唯一的queue能够接收音讯

topic:全数符合routingKey的routingKey所bind的queue还不错消息

headers:通过headers来控制把新闻发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

那边的exchange以前是空的,今后赋值log;在此地也从没注解queue,广播不须要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客这里大家有定义了1个queue,注意一下注脚中的内容。不过大家在发送端没有注明queue,为什么发送端不要求接收端须求吗?在consume里有1个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要四个queue_name

运维结果:

澳门葡京备用网址 28澳门葡京备用网址 29澳门葡京备用网址 30澳门葡京备用网址 31

就一定于收音机一样,实时播报,打开多少个顾客,生产者发送一条数据,然后贰个买主同时收到到

音信队列:

 

有取舍的收纳音信(exchange_type = direct)

RabbitMQ还帮衬依照主要字发送,即:队列绑定关键字,发送者将数据依照重庆大学字发送到新闻exchange,exchange依据重点字判定应该将数据发送到钦定的行列

澳门葡京备用网址 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

进一步缜密的过滤(exchange_type=topic)

澳门葡京备用网址 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都是服务器端发新闻,客户端收新闻,音讯流是单向的,那假使大家想要发一条命令给长途的客户端去实践,然后想让客户端执行的结果重临,则那种方式叫做rpc

RabbitMQ RPC

澳门葡京备用网址 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身二个围堵情势,没有新闻就等候信息,有消息就收过来

self.connection.process_data_events()是1个非阻塞版的start_consuming,正是说发了1个东西给客户端,每过一点时刻去检查有没有音讯,若是没有新闻,能够去干别的政工

reply_to = self.callback_queue是用来收纳反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第三在客户端会通过uuid4生成,第叁在劳务器端再次来到执行结果的时候也会传过来七个,所以说要是服务器端发过来的correlation_id与自身的id相同
,那么服务器端发出来的结果就自然是自己刚刚客户端发过去的下令的推行结果。以后就四个劳动器端三个客户端,无所谓缺人不认可。以往客户端是非阻塞版的,大家得以不让它打字与印刷没有信息,而是进行新的授命,那样就两条音信,不自然按梯次完成,那我们就供给去确认种种再次回到的结果是哪个命令的进行结果。

全体的格局是那般的:生产者发了3个发令给买主,不知情客户端哪天回来,依然要去收结果的,可是它又不想进入阻塞格局,想每过一段时间看这些音信收回来没有,要是音讯收回来了,就表示收完了。 

运维结果:

澳门葡京备用网址 35澳门葡京备用网址 36

服务器端开启,然后在起步客户端,客户端先是等待新闻的发送,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ以前我们要说一下python里的queue:二者干的作业是平等的,都以队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

壹 、不难的rabbitMQ队列通讯

澳门葡京备用网址 37

由上海教室能够,数据是头阵给exchange调换器,exchage再发给相应队列。pika模块是python对rabbitMQ的API接口。接收端有一个回调函数,一接收到多少就调用该函数。一条新闻被二个买主接受后,该消息就从队列删除。OK,领会上边的知识后,先来探望二个总结的rabbitMQ列队通信。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运作结果:(上边的代码对应自个儿写的注脚相信是看得懂的~)

澳门葡京备用网址 38澳门葡京备用网址 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

通过深切的测试,有以下七个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运营。发现当receive运营时还可以接收数据。
  2. 运作八个(eg:三个)接收数据的客户端,再运营发送端,客户端1收到数额,再运维发送端,客户端2收到数额,再运营发送端,客户端3收下多少。

RabbitMQ会暗中认可把p发的音信依次分发给各样消费者(c),跟负载均衡差不离。

 

原理:

二、全英文ack

在看上边的例证,你会发觉有一句代码no_ack=False(消费达成后向服务端发送贰个承认,默许为False),以本身韩文四级飘过的水准,看完上面关于ack的上课感觉写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

本身把发送端和接收端分别比作生产者与顾客。生产者发送职责A,消费者收到任务A并拍卖,处理完后生产者将音信队列中的职分A删除。未来我们相遇了一个题材:如若买主接受任务A,但在拍卖的进度中忽然宕机了。而那时候生产者将新闻队列中的任务A删除。实际上任务A并未得逞拍卖完,也便是丢失了任务/音讯。为搞定这么些难题,应使顾客收到职分并打响拍卖完后发送几个ack到生产者!生产者收到ack后就知道职分A已被成功拍卖,那时才从新闻队列旅长职责A删除,假如没有收到ack,就须要把职责A发送给下三个买主,直到职分A被成功拍卖。

 

澳门葡京备用网址 40

三 、信息持久化

眼下早已知晓,生产者生产数量,消费者再开发银行是能够接收数据的。

可是,生产者生产数据,然后重启rabbitMQ,消费者是无力回天接收数据。

eg:音讯在传输进程中rabbitMQ服务器宕机了,会发觉后面的新闻队列就不设有了,这时大家就要用到音信持久化,新闻持久化会让队列不随着服务器宕机而化为乌有,会永远的保存下去。下边看下关于音信持久化的英文讲解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

地方的英文对消息持久化讲得很好。音信持久化分为两步:

  • 持久化队列。通过代码达成持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的音讯。通过代码完结:properties=pika.BasicProperties( delivery_mode = 2, )

此地有个点要小心下:

假定你在代码中已落成持久化hello队列与队列中的音信。那么您重启rabbitMQ后再一次运转代码也许会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了化解那几个难题,可以声美素佳儿个与重启rabbitMQ此前不相同的行列名(queue_name).

 

一 、安装和骨干使用

四 、音信公平分发

固然Rabbit只管按顺序把音讯发到各种消费者身上,不考虑消费者负载的话,很只怕出现,3个机器配置不高的顾客这里堆积了累累新闻处理不完,同时配备高的主顾却直接很轻松。为化解此题材,能够在一一消费者端,配置perfetch=1,意思正是报告RabbitMQ在本身这一个消费者当前音讯还没处理完的时候就绝不再给作者发新音信了。

澳门葡京备用网址 41

 

带新闻持久化+公平分发的完好代码

劳动者端:

澳门葡京备用网址 42澳门葡京备用网址 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

顾客端:

澳门葡京备用网址 44澳门葡京备用网址 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

自个儿在运作方面程序时对顾客端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))10分质疑。那句代码去掉消费者端也能一如既往收到新闻啊。这句代码有毛线用处??

劳动者端消息持久后,必要在消费者端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 保障音讯被消费后,消费端发送三个ack,然后服务端从队列删除该新闻.

 

安装RabbitMQ服务
 

⑤ 、音信公布与订阅

在此之前的事例都基本都是1对1的音信发送和吸收,即音信只可以发送到钦命的queue里,但稍事时候你想让你的信息被全数的queue收到,类似广播的效果,那时候就要用到exchange了。PS:有趣味的摸底redis的宣布与订阅,能够看看本人写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有项指标,以控制到底是哪些Queue符合条件,可以接到消息

 

fanout: 全部bind到此exchange的queue都得以接受音讯

direct: 通过routingKey和exchange决定的丰硕唯一的queue能够吸收音讯

topic:全体符合routingKey(此时得以是1个表达式)的routingKey所bind的queue可以接过音讯

 

表达式符号表明: #表示3个或三个字符,*意味着任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于接纳fanout

 

下边笔者分别讲下fanout,direct,topic:

1、fanout

fanout: 全部bind到此exchange的queue都可以吸收接纳消息

澳门葡京备用网址 46

send端:

澳门葡京备用网址 47澳门葡京备用网址 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

澳门葡京备用网址 49澳门葡京备用网址 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有八个点要留意下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(暗中认可)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),成效:不钦赐queue名字(为了收广播),rabbitMQ会随机分配2个queue名字,exclusive=True会在应用此queue的主顾断开后,自动将queue删除。

 

贰 、有选拔的收到音讯(exchange
type=direct)

RabbitMQ还帮忙依据重要字发送,即:队列绑定关键字,发送者将数据依照重庆大学字发送到音信exchange,exchange依照 关键字
判定应该将数据发送至内定队列。

澳门葡京备用网址 51

send端:

澳门葡京备用网址 52澳门葡京备用网址 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

澳门葡京备用网址 54澳门葡京备用网址 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

实际上最伊始笔者看代码是一脸懵逼的~
上面是自个儿在cmd实行测试的截图(协作着截图看会不难通晓些),叁个send端,两个receive端(先起receive端,再起receive端):

send端:

澳门葡京备用网址 56

receive端-1:

澳门葡京备用网址 57

receive端-2:

澳门葡京备用网址 58

 

叁 、更细致的音讯过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

感觉自个儿英文水准不高啊~,作者相比较着垃圾有道翻译,加上自身的敞亮,大约知道地点在讲哪些。

比喻:
若是是系统的荒谬,就把音信发送到A,假如是MySQL的一无所能,就把消息发送到B。可是对B来说,想达成接收MySQL的错误新闻,能够用有取舍的选用音信(exchange type=direct),让重要字为error就贯彻了啊!未来B有个要求:不是兼备的错误新闻都接到,只接受钦点的失实。在某种音信再进行过滤,那正是更细致的新闻过滤topic。

 

send端:

澳门葡京备用网址 59澳门葡京备用网址 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

澳门葡京备用网址 61澳门葡京备用网址 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

TucsonPC的概念可看我百度的(其实就恍如笔者事先做的FTP,笔者从客户端发一个下令,服务端再次回到相关音讯):

澳门葡京备用网址 63澳门葡京备用网址 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

下边重点讲下奥德赛PC通讯,小编刚起首学挺难的,学完现在感觉SportagePC通信的合计很有启发性,代码的例子写得也很牛!!

澳门葡京备用网址 65

client端发的新闻被server端接收后,server端会调用callback函数,执行任务后,还索要把相应的音讯发送到client,但是server怎样将新闻发还给client?如若有三个client连接server,server又怎么通晓是要发给哪个client??

EnclavePC-server暗中认可监听rpc_queue.肯定无法把要发放client端的新闻发到rpc_queue吧(rpc_queue是监听client端发到server端的数目)。

理所当然的方案是server端另起3个queue,通过queue将音讯重回给对应client。但难点又来了,queue是server端起的,故client端肯定不理解queue_name,连queue_name都不了解,client端接收毛线的数码??

化解措施:

客户端在发送指令的还要报告服务端:职责执行完后,数据通过某队列再次来到结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

转载注脚出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

② 、完毕最不难易行的连串通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会冒出报错,必要提供用户名密码

叁 、RabbitMQ新闻分发轮询

先运维音信生产者,然后再分别运营三个顾客,通过生产者多发送几条音讯,你会发觉,这几条音讯会被逐一分配到各种消费者身上

澳门葡京备用网址 66

 

在这种情势下,RabbitMQ会暗中同意把p发的音讯公平的顺序分发给各样消费者(c),跟负载均衡大致

澳门葡京备用网址 67澳门葡京备用网址 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

澳门葡京备用网址 69澳门葡京备用网址 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

透过推行pubulish.py和consume.py能够兑现地点的新闻公平分发,那假设c1接收新闻之后宕机了,会出现什么处境吗?rabbitMQ是怎么处理的?今后大家模拟一下

澳门葡京备用网址 71澳门葡京备用网址 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

澳门葡京备用网址 73澳门葡京备用网址 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里增添了time.sleep模拟函数处理,通过上面程序实行效仿发现,c1收取到音讯后不曾拍卖完突然宕机,消息就从队列上海消防失了,rabbitMQ把音信删除掉了;如果程序需求音信必须求拍卖完才能从队列里删除,那我们就须求对先后开始展览处理一下

澳门葡京备用网址 75澳门葡京备用网址 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

澳门葡京备用网址 77澳门葡京备用网址 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

通过把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就能够兑现消息不被处理完无法在队列里清除

查看新闻队列数:

澳门葡京备用网址 79

四 、新闻持久化

若是新闻在传输进程中rabbitMQ服务器宕机了,会意识后边的消息队列就不存在了,那时我们即将用到音信持久化,音信持久化会让队列不趁着服务器宕机而熄灭,会永远的保存下去

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

五 、音讯公平分发

一旦Rabbit只管按顺序把新闻发到各种消费者身上,不考虑消费者负载的话,很也许出现,一个机器配置不高的顾客那里堆积了成都百货上千音信处理不完,同时配备高的主顾却直接很轻松。为解决此题材,能够在依次消费者端,配置perfetch=1,意思正是报告RabbitMQ在本身那么些消费者当前消息还没处理完的时候就无须再给作者发新音信了

澳门葡京备用网址 80

channel.basic_qos(prefetch_count=1)

带新闻持久化+公平分发

澳门葡京备用网址 81澳门葡京备用网址 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

澳门葡京备用网址 83澳门葡京备用网址 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(新闻公布\订阅) 

从前的例证都基本都以1对1的音讯发送和接到,即新闻只可以发送到内定的queue里,但有点时候你想让你的信息被抱有的Queue收到,类似广播的效果,那时候就要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有项目标,以控制到底是怎么着Queue符合条件,能够接到音信

fanout: 全体bind到此exchange的queue都还行新闻
direct: 通过routingKey和exchange决定的十三分唯一的queue能够吸收音讯
topic:全数符合routingKey(此时得以是四个表达式)的routingKey所bind的queue可以接过新闻

headers: 通过headers
来决定把音信发给哪些queue

表达式符号表明:#意味着一个或四个字符,*代表任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选择fanout 


fanout接到全部广播:广播表示如今消息是实时的,要是没有3个主顾在收受音讯,消息就会放弃,在那边消费者的no_ack已经无用,因为fanout不会管你处理音信甘休没有,发过的音讯不会重发,记住广播是实时的

澳门葡京备用网址 85

 

澳门葡京备用网址 86澳门葡京备用网址 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

澳门葡京备用网址 88澳门葡京备用网址 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py


有取舍的吸收接纳音信 direct:
 同fanout一样,
no_ack在此要安装为True,不然队列里多少不会清空(即便也不会重发)**

RabbitMQ还协助遵照重点字发送,即:队列绑定关键字,发送者将数据依据首要字发送到音信exchange,exchange依据关键字 判定应该将数据发送至钦赐队列

澳门葡京备用网址 90

 

澳门葡京备用网址 91澳门葡京备用网址 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

澳门葡京备用网址 93澳门葡京备用网址 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py


更全面包车型大巴新闻过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

澳门葡京备用网址 95

澳门葡京备用网址 96澳门葡京备用网址 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

澳门葡京备用网址 98澳门葡京备用网址 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

CRUISERPC(Remote procedure
call )双向通信

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

澳门葡京备用网址 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*
*
Website