QPID 消息队列初步

QPID 是一种高性能 Message Bus,目前因为牵扯到工具的 SOA 架构,我的项目中将会整合它,以将自身对数据库的修改提交到 Message Bus 中, 供其它程序监听调用。

目前主流的 Message Bus 主要有以下几种:

  1. RabbitMQ
  2. Apache ActiveMQ
  3. Apache qpid

而之所以选择 QPID 是因为它有以下几个优点(引用源):

  • Supports transactions
  • Persistence using a pluggable layer — I believe the default is Apache Derby
  • This like the other Java based product is HIGHLY configurable
  • Management using JMX and an Eclipse Management Console application - http://www.lahiru.org/2008/08/what-qpid-management-console-can-do.html
  • The management console is very feature rich
  • Supports message Priorities
  • Automatic client failover using configurable connection properties -
    • http://qpid.apache.org/cluster-design-note.html
    • http://qpid.apache.org/starting-a-cluster.html
    • http://qpid.apache.org/cluster-failover-modes.html
  • Cluster is nothing but a set of machines have all the queues replicated
  • All queue data and metadata is replicated across all nodes that make up a cluster
  • All clients need to know in advance which nodes make up the cluster
  • Retry logic lies in the client code
  • Durable Queues/Subscriptions
  • Has bindings in many languages
  • For the curious: http://qpid.apache.org/current-architecture.html

而对我而言,QPID 比较有优势的地方是,一有 Python 的 bindding(Perl 的兄弟对不起了),二是源代码比较充足。

为此我写了两个基类,简单地调用了 QPID Python 中的 Receiver 和 Sender,相对于 message_transfer 方法,这种方法可以传递 Dictionary 对象,一共三个文件,其实也可以合在一起使用。

#!/usr/bin/python

import qpid
import qpid.messaging
import logging
logging.basicConfig()

class QPIDBase(object):
    def __init__(self, host='10.66.93.193', port='5672', queue_name='tmp.testing', username='guest', password='guest'):
        """
        Arguments:
            host
            port
            queue_name
            username
            password
        """
        self.host = host
        self.port = port
        self.queue_name = queue_name
        self.username = username
        self.password = password
        self.connection = None
        self.session = None
    
    def init_connect(self, mechanism='PLAIN'):
        """Initial the connection"""
        url = 'amqp://guest/guest@%s:%s' %(self.host, self.port)
        self.connection = qpid.messaging.Connection(
            url = url, sasl_mechanisms=mechanism,
            reconnect=True, reconnect_interval=60, reconnect_limit=60,
            username=self.username, password=self.password
        )
        self.connection.open()
        
    def init_session(self):
        """Initial the session"""
        if not self.connection:
            self.init_connect()
        self.session = self.connection.session()
    
    def close(self):
        """Close the connection and session"""
        self.session.close()
        self.connection.close()
#!/usr/bin/python

import qpid.messaging
from datetime import datetime
from base import QPIDBase

class QPIDSender(QPIDBase):
    def __init__(self, **kwargs):
        super(QPIDSender, self).__init__(**kwargs)
        self.sender = None
    
    def init_sender(self):
        """Initial the sender"""
        if not self.session:
            self.init_session()
        self.sender = self.session.sender(self.queue_name)
    
    def send(self, content, t = 'test'):
        """Sending the content"""
        if not self.sender:
            self.init_sender()
        props = {'type': t}
        message = qpid.messaging.Message(properties=props, content = content)
        self.sender.send(message)
    
    def typing(self):
        """Sending the contents real time with typing"""
        content = ''
        while content != 'EOF':
            content = raw_input('Start typing:')
            self.send(content)

if __name__ == '__main__':
    s = QPIDSender()
    s.send('Testing at %s' % datetime.now())
    s.close()
#!/usr/bin/python

from pprint import pprint
from base import QPIDBase

class QPIDReceiver(QPIDBase):
    def __init__(self, **kwargs):
        super(QPIDReceiver, self).__init__(**kwargs)
        self.receiver = None
    
    def init_receiver(self):
        """Initial the receiver"""
        if not self.session:
            self.init_session()
        self.receiver = self.session.receiver(self.queue_name)
        
    def receive(self):
        """Listing the messages from server"""
        if not self.receiver:
            self.init_receiver()
            
        try:
            while True:
                message = self.receiver.fetch()
                content = message.content
                pprint({'props': message.properties})
                pprint(content)
                self.session.acknowledge(message)
        except KeyboardInterrupt:
            pass
        
        self.close()

# Test code
if __name__ == '__main__':
    r = QPIDReceiver()
    r.receive()

代码非常简单,容易读懂,使用方法是在一台 Linux Server 上安装好 qpid-cpp-server, 并且启动后,在 Client 上安装 python-qpid,然后修改一下 base.py  __init__ 方法的 host 字段,或者在代码中自行指定好服务器地址,即可直接执行测试。

需要说明的是 QPID 返回的数据结构,包含可以为 Dictionary 对象的 properties 和只能为纯文本的 content 两个属性,也就是说可以将数据结构保存到 properties,而消息名称保存成 content 中,即:

        try:
            while True:
                message = self.receiver.fetch()
                content = message.content
                pprint({'props': message.properties})
                pprint(content)
                self.session.acknowledge(message)
        except KeyboardInterrupt:
            pass

一个终端执行 receiver.py 监听消息,再开一个终端执行 sender.py,将会如以下输出:

$ python ./receiver.py
{'props': {u'type': u'test', 'x-amqp-0-10.routing-key': u'tmp.testing'}}
'Testing at 2010-12-06 14:54:59.536093'

如果有兴趣试下 QPIDSender.typing() 方法,再把 Kerberos 的用户名读出来,就可以做一个 IM 啦~

问题:现在似乎 Sender 发出的消息一次只能有一个 Receiver 接收,也就是现有代码不能用于 SOA,而这理论上应该是不应该的,依然在探索。

(可以尝试打开两个 receiver.py 测试)

Posted by K*K Mon, 06 Dec 2010 23:03:21 +0800