QPID 消息队列初步
QPID 是一种高性能 Message Bus,目前因为牵扯到工具的 SOA 架构,我的项目中将会整合它,以将自身对数据库的修改提交到 Message Bus 中, 供其它程序监听调用。
目前主流的 Message Bus 主要有以下几种:
而之所以选择 QPID 是因为它有以下几个优点(引用源):
- Supports transactions
- Persistence using a pluggable layer — I believe the default is Apache Derby
- This like the other Java based product is HIGHLY configurable
- Management using JMX and an Eclipse Management Console application - http://www.lahiru.org/2008/08/what-qpid-management-console-can-do.html
- The management console is very feature rich
- Supports message Priorities
-
Automatic client failover using configurable connection properties -
- http://qpid.apache.org/cluster-design-note.html
- http://qpid.apache.org/starting-a-cluster.html
- http://qpid.apache.org/cluster-failover-modes.html
- Cluster is nothing but a set of machines have all the queues replicated
- All queue data and metadata is replicated across all nodes that make up a cluster
- All clients need to know in advance which nodes make up the cluster
- Retry logic lies in the client code
- Durable Queues/Subscriptions
- Has bindings in many languages
- For the curious: http://qpid.apache.org/current-architecture.html
而对我而言,QPID 比较有优势的地方是,一有 Python 的 bindding(Perl 的兄弟对不起了),二是源代码比较充足。
为此我写了两个基类,简单地调用了 QPID Python 中的 Receiver 和 Sender,相对于 message_transfer 方法,这种方法可以传递 Dictionary 对象,一共三个文件,其实也可以合在一起使用。
#!/usr/bin/python import qpid import qpid.messaging import logging logging.basicConfig() class QPIDBase(object): def __init__(self, host='10.66.93.193', port='5672', queue_name='tmp.testing', username='guest', password='guest'): """ Arguments: host port queue_name username password """ self.host = host self.port = port self.queue_name = queue_name self.username = username self.password = password self.connection = None self.session = None def init_connect(self, mechanism='PLAIN'): """Initial the connection""" url = 'amqp://guest/guest@%s:%s' %(self.host, self.port) self.connection = qpid.messaging.Connection( url = url, sasl_mechanisms=mechanism, reconnect=True, reconnect_interval=60, reconnect_limit=60, username=self.username, password=self.password ) self.connection.open() def init_session(self): """Initial the session""" if not self.connection: self.init_connect() self.session = self.connection.session() def close(self): """Close the connection and session""" self.session.close() self.connection.close()
#!/usr/bin/python import qpid.messaging from datetime import datetime from base import QPIDBase class QPIDSender(QPIDBase): def __init__(self, **kwargs): super(QPIDSender, self).__init__(**kwargs) self.sender = None def init_sender(self): """Initial the sender""" if not self.session: self.init_session() self.sender = self.session.sender(self.queue_name) def send(self, content, t = 'test'): """Sending the content""" if not self.sender: self.init_sender() props = {'type': t} message = qpid.messaging.Message(properties=props, content = content) self.sender.send(message) def typing(self): """Sending the contents real time with typing""" content = '' while content != 'EOF': content = raw_input('Start typing:') self.send(content) if __name__ == '__main__': s = QPIDSender() s.send('Testing at %s' % datetime.now()) s.close()
#!/usr/bin/python from pprint import pprint from base import QPIDBase class QPIDReceiver(QPIDBase): def __init__(self, **kwargs): super(QPIDReceiver, self).__init__(**kwargs) self.receiver = None def init_receiver(self): """Initial the receiver""" if not self.session: self.init_session() self.receiver = self.session.receiver(self.queue_name) def receive(self): """Listing the messages from server""" if not self.receiver: self.init_receiver() try: while True: message = self.receiver.fetch() content = message.content pprint({'props': message.properties}) pprint(content) self.session.acknowledge(message) except KeyboardInterrupt: pass self.close() # Test code if __name__ == '__main__': r = QPIDReceiver() r.receive()
代码非常简单,容易读懂,使用方法是在一台 Linux Server 上安装好 qpid-cpp-server, 并且启动后,在 Client 上安装 python-qpid,然后修改一下 base.py __init__ 方法的 host 字段,或者在代码中自行指定好服务器地址,即可直接执行测试。
需要说明的是 QPID 返回的数据结构,包含可以为 Dictionary 对象的 properties 和只能为纯文本的 content 两个属性,也就是说可以将数据结构保存到 properties,而消息名称保存成 content 中,即:
try: while True: message = self.receiver.fetch() content = message.content pprint({'props': message.properties}) pprint(content) self.session.acknowledge(message) except KeyboardInterrupt: pass
一个终端执行 receiver.py 监听消息,再开一个终端执行 sender.py,将会如以下输出:
$ python ./receiver.py {'props': {u'type': u'test', 'x-amqp-0-10.routing-key': u'tmp.testing'}} 'Testing at 2010-12-06 14:54:59.536093'
如果有兴趣试下 QPIDSender.typing() 方法,再把 Kerberos 的用户名读出来,就可以做一个 IM 啦~
问题:现在似乎 Sender 发出的消息一次只能有一个 Receiver 接收,也就是现有代码不能用于 SOA,而这理论上应该是不应该的,依然在探索。
(可以尝试打开两个 receiver.py 测试)