本文共 3788 字,大约阅读时间需要 12 分钟。
上节我们提到交换机,而且我们也使用了rabbitmq内置的一种交换机类型——扇形交换机,是将消息发送给所有可知的消息队列,而本节我们使用rabbitmq内置的另外一种交换机——直连交换机,本节我们基于上节的日志系统加以改造。在上节我们是将所有消息都发给队列,所有的消息都在worker中保存到硬盘上,而我们本节是要精简,现实生活中我们可能会把一些重要的日志保存到硬盘上作为后续的分析,所以这一节我们是要对日志内容进行分为info|warning|error三级,并且将warning|error记录到硬盘上,而info只输出到屏幕上。
绑定(bindings)
前面我们已经使用过绑定,绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做绑定键(binding key)。接着上面的形容,我们可以理解为,这个队列最这个交换机的哪些消息感兴趣。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')绑定键的意义取决于交换机(exchange)的类型。我们之前使用过的扇型交换机(fanout exchanges)会忽略这个值。
直连交换机(direct exchange)
我们本节要的做内容,是对日志内容进行区分,error及warning写入磁盘,我们上节使用的扇形交换机不够灵活,她只能把消息广播,所以我们要使用直连交换机代替扇形交换机。 路由的算法很简单 —— 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。此场景可以看下图:在这个场景中,我们可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。 这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。
多个绑定(multiple bindings)
看到这个图,是不是感觉很眼熟,没错,和扇形交换机基本实现了同样的功能,将消息广播给所有队列。但是区别是,扇形交换机是将所有的消息都广播给队列,而这里的直连交换机,我们只是将带有black路由键的消息广播给队列Q1和Q2,这也是直连交换机的一种灵活使用方法
总结
我们从整个流程中来总结一下生产者
创建连接创建交换机(直连交换机)
发送消息到指定交换机,并将消息中指定routing_key消费者
创建连接创建交换机(直连交换机) 创建匿名队列(每个消费者都要创建的独立的队列,用于接收路由后消息)
创建绑定(通过路由键绑定交换机队列关系,确定队列处理交换机中的哪些消息)在指定交换机处理消息
代码整合
emit_logs_direct.py#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date : 2016-02-28 21:28:17# @Author : mx (mx472756841@gmail.com)# @Link : http://www.shujutiyu.com/# @Version : $Id$import osimport pikaimport sysconn = Noneseverity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'try: # 获取连接 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 获取通道 channel = conn.channel() # 创建直连交换机 channel.exchange_declare(exchange='direct_logs', type='direct') # 在RabbitMQ中发送消息,指定交换机(exchange),指定routing ret = channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message,) print " [x] Sent '{0}'".format(message) print retexcept Exception, e: raise efinally: if conn: conn.close()recv_logs_direct.py#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date : 2016-02-29 16:30:21# @Author : mx (mx472756841@gmail.com)# @Link : http://www.shujutiyu.com/# @Version : $Id$import pikaimport sysconn = Noneseverities = sys.argv[1:] # 指定routing_key >> info|warning|errorif not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[0],) sys.exit(1)def callback(ch, method, properties, body): """ @ch: channel 通道,是由外部调用时上送 out body 读取队列内容做不同的操作 """ print " [x] method.routing_key {0}".format(method.routing_key) print " [x] Done %r" % (body, )try: # get connection conn = pika.BlockingConnection(pika.ConnectionParameters( 'localhost') ) # get channel channel = conn.channel() # 声明直连交换机 channel.exchange_declare(exchange='direct_logs', type='direct') # 声明临时队列 , param exclusive 互斥 tmp_queue = channel.queue_declare(exclusive=True) queue_name = tmp_queue.method.queue # 根据输入routing_key绑定交换机与队列 for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity ) # 在队列中读取信息 channel.basic_consume(callback, queue=queue_name, no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()except Exception, e: raise efinally: if conn: conn.close()
官网资料:中文资料: