本文讨论的是 RabbitMQ,它是 OpenStack 推荐用于云部署的消息代理中间件。它符合 AMQP 标准并在 Erlang 中开发。代码示例是使用 Python 和 PIKA 库开发的。
1.消息代理
消息代理是一个软件组件,可以在企业应用程序集群中实现跨应用程序的通信。它在面向服务的体系结构(SOA)中也称为面向消息的中间件(MOM)。企业集群中的应用程序使用邮件交换或邮局等消息代理向其他应用程序发送消息。
RabbitMQ 符合 AMQP 标准,这是应用程序和组织之间的业务消息的开放标准。它是一个二进制协议而不是一个接口规范。 AMQP 标准支持消息传递作为云服务、高级发布-订阅模式、基于自定义标头的路由和独立于编程语言。
2.RabbitMQ模型
RabbitMQ 模型由各种组件组成。它们是: Producer (发送者)、 Consumer (接收者)、 Exchange 、 Bindings 和 Message queues 。这些组件协同工作,如下所述:
- 生产者向交易所发送消息
- Exchange 根据绑定将消息转发到队列
- 绑定由队列设置以附加到交换
- 消费者(接收者)从各自的消息队列中接收消息。
RabbitMQ生态系统
消息的生产者和消费者是外部实体。交换器、队列和绑定是消息代理的内部实体。
2.1 交易所
交易所是 RabbitMQ 的核心。生产者将消息发送到交换器以将其转发给正确的消费者。 Exchange根据exchange类型、消息中的routing_key、消息中的customheader字段、queue向exchange注册的routing_key进行转发决策。
Exchange 可以配置为持久的,这样它们就可以在 RabbitMQ 服务器重启后继续存在。交易所可以是服务器“内部”的,因此它可以由服务器内的其他交易所发布。交换器可以配置为自动删除,而不再绑定更多队列。
2.2 队列
队列用于将消息转发给目标消费者并确保有序的消息传递。消费者将自己绑定到一个带有回调函数的队列,这样消费者就会在收到消息时得到通知。
队列可以配置为持久的,这样它们就可以在 RabbitMQ 服务器重启后继续存在。它还可以配置为“独占”连接,一旦连接关闭,队列将被删除。当没有消费者订阅队列时,自动删除队列将被删除。
2.3 绑定
交换使用绑定来为消息做出路由决策。基本上绑定将交换器与队列连接起来,它使用交换器类型、队列设置的路由键、自定义消息头和生产者发送的消息中存在的路由键。如果没有足够的绑定来为消息做出转发决定,则可以根据消息中设置的属性丢弃该消息或将其发送回生产者。
3.生产者与消费者
在典型的银行系统中,当客户从 ATM 取款或在商店刷信用卡时,客户会收到短信或电子邮件通知,以防止欺诈。在此用例中,核心银行系统将向消息传递子系统发送消息并继续处理其他客户请求。监听队列的消息传递子系统将收到通知,并根据订阅者的偏好采取适当的行动。在此示例中,核心银行系统是消息的生产者,消息传递子系统是消息的消费者。
3.1 制作人
根据 RabbitMQ 设计,生产者将消息发送到交换器。由消息队列的交换类型、route_key 和绑定定义的交换行为。让我们看看如何使用 Python pika 库创建生产者。如果您没有安装 RabbitMQ,请参阅 附录 A 部分来安装和配置 RabbitMQ。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
在上面的代码段中,首先我们建立与托管 RabbitMQ 的服务器的连接。 RabbitMQ 允许在现有连接中建立多个通道,并且所有通信操作都与该通道相关联。第三行,我们创建一个交换器,命名为“SMS-Exchange”,类型为“direct”。如果交换已经创建,则此语句不会产生错误,它只会返回它,除非交换类型没有冲突。
“channel.basic_publish”函数用于向交易所发送消息。生产者必须在实际消息中提及交换名称和 routing_key。在“直接”交换中,“routing_key”将用于做出消息转发决策。如果不再需要,最后关闭连接。
3.2 消费者
根据 RabbitMQ 设计,消费者是消息的目标应用程序。消费者必须将自己注册到队列并将其与交换绑定。如果有多个消费者注册到一个队列,那么 RabbitMQ 会以循环方式将消息发送给消费者。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
生产者和消费者的连接建立完全相同。但是消费者必须声明一个队列并将队列与 routing_key 一起绑定到交换器。在上面的代码中,“SMS-Queue”被创建并与“SMS-Exchange”绑定,routing_key“SMS-Alert”。这指示 SMS-Exchange 将带有 routing_key“SMS-Alert”的消息转发到“SMS-Queue”。
一旦在队列上收到消息,将调用在队列中注册的回调函数。列表语句“channel.start_consuming()”是一个阻塞调用,消费者在其中等待已注册队列中的任何消息。
3.3 远程连接
在上面的示例中,我们已经连接到本地主机中存在的 RabbitMQ 服务器。让我们看看代码片段,如何连接到远程 RabbitMQ 服务器:
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
上面的代码首先通过设置用户凭据创建一个 credits 对象。然后我们通过设置 IP 地址、端口号、虚拟主机路径和凭据对象来创建参数对象。此参数只是传递给 BlockingConnection 方法以建立与给定参数的连接。建立连接后,其余代码与创建生产者和消费者完全相同。
在本练习中,我们讨论了使用阻塞连接开发生产者和消费者。 RabbitMQ 还支持异步连接。请在 http://pika.readthedocs.org/en/latest/examples/asynchronous_publisher_example.html 找到示例。
4. 交易所类型
在本节中,我们将学习如何开发各种类型的交易所。
4.1 直接兑换
直接交换根据消息路由键将消息传送到队列。队列使用路由键绑定到交换器。当具有该路由键的新消息到达直接交换器时,该消息将被路由到该队列。直接交换用于以循环方式在多个工作人员之间分配任务。当多个消费者侦听同一个队列时,RabbitMQ 负载平衡消费者。直接交换非常适合消息的单播路由。
生产者: 下面的代码段创建一个名为“Direct-X”的交换,并将交换类型设置为“direct”。 postMsg 方法将消息发送到此交换器,并将 routing_key 设置为“Key1”
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
消费者:
下面的代码片段创建名为“Direct-Q1”的队列,并将其注册到交换“Direct-X”,以获取路由键为“Key1”的消息。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
注意:如果多个消费者监听同一个队列,RabbitMQ 会以循环方式在消费者之间负载均衡消息。
4.2 扇出交换
扇出交换将消息路由到所有有界队列,而不管路由键如何。对广播最合适。
生产商:
创建一个向交换“Fanout-X”发送消息的生产者。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
消费者 1:
创建队列“Faount-Q1”并绑定到交换“Fanout-X”,它是扇出交换类型。即使消费者注册了一个routing_key,也不会对exchange的转发决定产生任何影响,因为它是一个fanout exchange。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
消费者 2: 创建另一个队列“Fanout-Q2”并将其绑定到同一个交换器“Fanout-X”
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
现在您将看到发布者发送的消息将被两个消费者接收。
4.3 话题交流
主题交换根据用于将队列绑定到交换器的消息路由键和键模式将消息路由到一个或多个队列。这是交换用于实现 发布/订阅 模式。主题交换适用于多播。
生产者 创建一个名为“Topic-X”的“topic”类型的交换,并发送具有不同键值的各种消息。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
消费者 1: 创建一个名为“Topic-Q1”的队列,并为所有“post”消息与交换“Topic-X”绑定。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
消费者 2: 创建另一个名为“Topic-Q2”的队列,并为来自“rnd”的所有消息绑定相同的交换“Topic-X”。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
4.4 标题交换
消息头根据消息头中的属性交换路由消息,并忽略路由键。如果消息标头属性与队列绑定参数匹配,则消息将转发到那些队列。
生产者 可以在发送的消息头中设置键值对(字典)。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
上面的生产者创建了一个名为“Header-X”的交换并将类型设置为“headers”。然后它发送一条消息,标题键设置为“Source”,相应的值设置为“Core-Banking”。消息头基本上与消息的发起者捆绑在一起,在这个例子中是“Core-Banking”。
下面的 Consumer 创建了一个名为“ Header-Q1 ”的队列,并将其与“ Header-X ”以及它感兴趣的头信息绑定在一起。如果您参考帖子方法,它
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
请注意,queue_bind 语句采用额外的参数,名为“arguments”,它采用键值对。如果存在多个键值对,x-match:any 将确保即使单个条目匹配,消息也会被传递到该队列。
5.总结
我们已经了解了 RabbitMQ 的工作原理,了解了各种组件和消息传递模型。 RabbitMQ 支持高可用性集群环境,这将确保零停机时间、更高的吞吐量和增加的容量,这使其适合基于云的安装。
6. 参考资料
1. 消息代理 –
https://msdn.microsoft.com/en-us/library/ff648849.aspx
2. AMQP –
https://www.rabbitmq.com/tutorials/amqp-concepts.html
3. RabbitMQCtl命令参考:
https://www.rabbitmq.com/man/rabbitmqctl.1.man.html
4. PIKA 文档 –
http://pika.readthedocs.org/en/latest/modules/index.html
5. AMQP 标准 –
https://www.amqp.org/
附录-A
A. 安装与配置
rabbitMQ基础设施安装在CentOS6.5
A.1 安装
请安装 RabbitMQ、Erlang(RabbitMQ 在 Erlang 上运行)、Python 和 Pika。
- 要安装 RabbitMQ (v3.5.3), 请参考 https://www.rabbitmq.com/download.html 并根据您的目标操作系统按照说明进行操作。我已经在 CentOS 机器上安装了 rabbitMQ。
- Erlang (v18.0) 可以从 http://www.erlang.org/doc/installation_guide/INSTALL.html 安装
- Python 2.7.10 可用 https://www.python.org/downloads/ 。请安装 2.7.10 版本,因为 3.x 尚不支持 pika,目前正在开发中。
-
Pika
(v0.9.13)
可以通过 Python pip 使用。
-
import pika
""" Establish connection and declare the exchange""" connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """ Msg = "Balance as of today is 200.00$" channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """ connection.close()
-
A.2 安装 RabbitMQ 管理插件
RabbitMQ 带有一个基于 Web 的管理控制台。该软件与我们安装的 RabbitMQ 安装软件捆绑在一起。要启用管理控制台,请在您的 Linux 机器上执行以下命令:
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
RabbitMQ管理控制台使用15672端口,所以必须打开这个端口。有关管理插件的更多信息,请参阅 https://www.rabbitmq.com/management.html 。
启用管理插件后,在浏览器中打开 http://ipaddress:15672/ 即可看到RabbitMQ的Web管理界面。
A.3 配置 RabbitMQ(虚拟主机/用户/权限)
安装 RabbitMQ 后,它就可以投入生产了。但是我们必须做一些强制配置才能允许远程客户端,因为默认配置和用户权限不允许删除客户端访问服务器。
这里的想法是使用 RabbitMQ 服务器创建一个虚拟环境并为用户提供访问权限,他可以在其中创建消息队列、交换、读取和写入这些资源。因此,用户被限制在虚拟环境中,资源包含在环境中。
A.3.1 创建虚拟主机
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
注意:RabbitMQ 将有一个名为“/”的默认虚拟主机。
A.3.2 使用密码创建新用户
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
注意:RabbitMQ 将有一个名为“guest”的默认用户,但它不能用于外部客户端的远程访问。但它允许在本地访问服务器。
A.3.3 为用户分配访问虚拟主机的权限
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
有关详细信息,请参阅 https://www.rabbitmq.com/access-control.html 。
A.4 防火墙设置
我们必须打开 Linux 防火墙的必要端口以允许远程客户端。对IP表进行如下操作,打开5672&15672端口。
import pika
""" Establish connection and declare the exchange"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange="SMS-Exchange",exchange_type="direct")
""" Publish a Message on the Channel """
Msg = "Balance as of today is 200.00$"
channel.basic_publish(exchange = "SMS-Exchange",routing_key="Sms-Alert",body=Msg)
"""Close the communication channel """
connection.close()
要使更改永久生效,请在文件 /etc/sysconfig/iptables 中进行配置更改并重新启动 iptables 服务器: