QPID 消息队列初步
QPID 是一种高性能 Message Bus,目前因为牵扯到工具的 SOA 架构,我的项目中将会整合它,以将自身对数据库的修改提交到 Message Bus 中, 供其它程序监听调用。
目前主流的 Message Bus 主要有以下几种:
而之所以选择 QPID 是因为它有以下几个优点(引用源):
- Supports transactions
- Persistence using a pluggable layer — I believe the default is Apache Derby
- This like the other Java based product is HIGHLY configurable
- Management using JMX and an Eclipse Management Console application - http://www.lahiru.org/2008/08/what-qpid-management-console-can-do.html
- The management console is very feature rich
- Supports message Priorities
-
Automatic client failover using configurable connection properties -
- http://qpid.apache.org/cluster-design-note.html
- http://qpid.apache.org/starting-a-cluster.html
- http://qpid.apache.org/cluster-failover-modes.html
- Cluster is nothing but a set of machines have all the queues replicated
- All queue data and metadata is replicated across all nodes that make up a cluster
- All clients need to know in advance which nodes make up the cluster
- Retry logic lies in the client code
- Durable Queues/Subscriptions
- Has bindings in many languages
- For the curious: http://qpid.apache.org/current-architecture.html
而对我而言,QPID 比较有优势的地方是,一有 Python 的 bindding(Perl 的兄弟对不起了),二是源代码比较充足。
为此我写了两个基类,简单地调用了 QPID Python 中的 Receiver 和 Sender,相对于 message_transfer 方法,这种方法可以传递 Dictionary 对象,一共三个文件,其实也可以合在一起使用。
#!/usr/bin/python
import qpid
import qpid.messaging
import logging
logging.basicConfig()
class QPIDBase(object):
def __init__(self, host='10.66.93.193', port='5672', queue_name='tmp.testing', username='guest', password='guest'):
"""
Arguments:
host
port
queue_name
username
password
"""
self.host = host
self.port = port
self.queue_name = queue_name
self.username = username
self.password = password
self.connection = None
self.session = None
def init_connect(self, mechanism='PLAIN'):
"""Initial the connection"""
url = 'amqp://guest/guest@%s:%s' %(self.host, self.port)
self.connection = qpid.messaging.Connection(
url = url, sasl_mechanisms=mechanism,
reconnect=True, reconnect_interval=60, reconnect_limit=60,
username=self.username, password=self.password
)
self.connection.open()
def init_session(self):
"""Initial the session"""
if not self.connection:
self.init_connect()
self.session = self.connection.session()
def close(self):
"""Close the connection and session"""
self.session.close()
self.connection.close()
#!/usr/bin/python
import qpid.messaging
from datetime import datetime
from base import QPIDBase
class QPIDSender(QPIDBase):
def __init__(self, **kwargs):
super(QPIDSender, self).__init__(**kwargs)
self.sender = None
def init_sender(self):
"""Initial the sender"""
if not self.session:
self.init_session()
self.sender = self.session.sender(self.queue_name)
def send(self, content, t = 'test'):
"""Sending the content"""
if not self.sender:
self.init_sender()
props = {'type': t}
message = qpid.messaging.Message(properties=props, content = content)
self.sender.send(message)
def typing(self):
"""Sending the contents real time with typing"""
content = ''
while content != 'EOF':
content = raw_input('Start typing:')
self.send(content)
if __name__ == '__main__':
s = QPIDSender()
s.send('Testing at %s' % datetime.now())
s.close()
#!/usr/bin/python
from pprint import pprint
from base import QPIDBase
class QPIDReceiver(QPIDBase):
def __init__(self, **kwargs):
super(QPIDReceiver, self).__init__(**kwargs)
self.receiver = None
def init_receiver(self):
"""Initial the receiver"""
if not self.session:
self.init_session()
self.receiver = self.session.receiver(self.queue_name)
def receive(self):
"""Listing the messages from server"""
if not self.receiver:
self.init_receiver()
try:
while True:
message = self.receiver.fetch()
content = message.content
pprint({'props': message.properties})
pprint(content)
self.session.acknowledge(message)
except KeyboardInterrupt:
pass
self.close()
# Test code
if __name__ == '__main__':
r = QPIDReceiver()
r.receive()
代码非常简单,容易读懂,使用方法是在一台 Linux Server 上安装好 qpid-cpp-server, 并且启动后,在 Client 上安装 python-qpid,然后修改一下 base.py __init__ 方法的 host 字段,或者在代码中自行指定好服务器地址,即可直接执行测试。
需要说明的是 QPID 返回的数据结构,包含可以为 Dictionary 对象的 properties 和只能为纯文本的 content 两个属性,也就是说可以将数据结构保存到 properties,而消息名称保存成 content 中,即:
try:
while True:
message = self.receiver.fetch()
content = message.content
pprint({'props': message.properties})
pprint(content)
self.session.acknowledge(message)
except KeyboardInterrupt:
pass
一个终端执行 receiver.py 监听消息,再开一个终端执行 sender.py,将会如以下输出:
$ python ./receiver.py
{'props': {u'type': u'test', 'x-amqp-0-10.routing-key': u'tmp.testing'}}
'Testing at 2010-12-06 14:54:59.536093'
如果有兴趣试下 QPIDSender.typing() 方法,再把 Kerberos 的用户名读出来,就可以做一个 IM 啦~
问题:现在似乎 Sender 发出的消息一次只能有一个 Receiver 接收,也就是现有代码不能用于 SOA,而这理论上应该是不应该的,依然在探索。
(可以尝试打开两个 receiver.py 测试)