安装 php-rabbit: RabbitMQ 的 PHP 扩展

RabbitMQ 官方提供了三种 PHP 可用的扩展,分别是:
php-amqp
http://code.google.com/p/php-amqp/
php-rabbit
http://code.google.com/p/php-rabbit/
php-amqplib
http://code.google.com/p/php-amqplib/

这里我选择使用 php-rabbit,因为这个项目的 Activity 比较高,名字也比较权威 🙂

安装步骤:

下载 RabbitMQ-C 客户端库 librabbitmq:
http://hg.rabbitmq.com/rabbitmq-c/
下载 RabbitMQ 协议代码生成工具:
http://hg.rabbitmq.com/rabbitmq-codegen/

注意 librabbitmq 的版本号,使用最新版本可能会导致下面编译 php-rabbit 时出错:

/bin/sh /work/setup/php-rabbit/libtool –mode=compile cc -I. -I/work/setup/php-rabbit -DPHP_ATOM_INC -I/work/setup/php-rabbit/include -I/work/setup/php-rabbit/main -I/work/setup/php-rabbit -I/usr/local/php/include/php -I/usr/local/php/include/php/main -I/usr/local/php/include/php/TSRM -I/usr/local/php/include/php/Zend -I/usr/local/php/include/php/ext -I/usr/local/php/include/php/ext/date/lib -I/usr/local/include -DHAVE_CONFIG_H -g -O2 -c /work/setup/php-rabbit/rabbit.c -o rabbit.lo
mkdir .libs
cc -I. -I/work/setup/php-rabbit -DPHP_ATOM_INC -I/work/setup/php-rabbit/include -I/work/setup/php-rabbit/main -I/work/setup/php-rabbit -I/usr/local/php/include/php -I/usr/local/php/include/php/main -I/usr/local/php/include/php/TSRM -I/usr/local/php/include/php/Zend -I/usr/local/php/include/php/ext -I/usr/local/php/include/php/ext/date/lib -I/usr/local/include -DHAVE_CONFIG_H -g -O2 -c /work/setup/php-rabbit/rabbit.c -fPIC -DPIC -o .libs/rabbit.o
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_class___construct’:
/work/setup/php-rabbit/rabbit.c:227: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c:230: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_class_isConnected’:
/work/setup/php-rabbit/rabbit.c:329: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class___construct’:
/work/setup/php-rabbit/rabbit.c:363: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c:368: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_delete’:
/work/setup/php-rabbit/rabbit.c:504: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c:512: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_purge’:
/work/setup/php-rabbit/rabbit.c:574: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c:580: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_bind’:
/work/setup/php-rabbit/rabbit.c:646: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_unbind’:
/work/setup/php-rabbit/rabbit.c:713: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_consume’:
/work/setup/php-rabbit/rabbit.c:781: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_queue_class_get’:
/work/setup/php-rabbit/rabbit.c:917: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_exchange_class___construct’:
/work/setup/php-rabbit/rabbit.c:1169: warning: ‘php_std_error_handling’ is deprecated (declared at /usr/local/php/include/php/main/php.h:295)
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_exchange_class_delete’:
/work/setup/php-rabbit/rabbit.c:1318: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c:1326: error: unknown field ‘ticket’ specified in initializer
/work/setup/php-rabbit/rabbit.c: In function ‘zim_rabbit_exchange_class_bind’:
/work/setup/php-rabbit/rabbit.c:1534: error: unknown field ‘ticket’ specified in initializer
make: *** [rabbit.lo] Error 1

看了下是头文件 /usr/local/include/amqp_framing.h 中的结构体定义与源代码中冲突,因此怀疑是 librabbitmq 版本的问题。
使用 svn 上的代码也不行,很明显当前版本不适用。

看到官方下载页面上的声明:

Specification
The current RabbitMQ server and Java client library releases implement AMQP protocol version 0-8. The .NET/C# client implements AMQP protocol versions 0-8 and 0-9.

估计得用 0.8 版,我使用以下方式安装 librabbitmq 时可以编译通过:

wget http://hg.rabbitmq.com/rabbitmq-c/archive/ce1eaceaee94.tar.gz -O rabbitmq-c.tar.gz
tar zxf rabbitmq-c.tar.gz
wget http://hg.rabbitmq.com/rabbitmq-codegen/archive/c7c5876a05bb.tar.gz -O rabbitmq-codegen.tar.gz
tar zxf rabbitmq-codegen.tar.gz
mv rabbitmq-codegen-c7c5876a05bb/ rabbitmq-c-ce1eaceaee94/codegen
cd rabbitmq-c-ce1eaceaee94/
autoreconf -i && ./configure && make && make install

如果提示:

checking finding a python with simplejson installed… configure: error: could not find a python that can ‘import simplejson’

请安装 python 的 simplejson 库:

# wget http://pypi.python.org/packages/source/s/simplejson/simplejson-2.1.1.tar.gz
# tar zxf simplejson-2.1.1.tar.gz
# cd simplejson-2.1.1
# python setup.py install

下载并安装 php-rabbit:

# wget http://php-rabbit.googlecode.com/files/php-rabbit.r91.tar.gz
# tar zxf php-rabbit.r91.tar.gz
# cd php-rabbit
# /usr/local/php/bin/phpize
# ./configure –with-php-config=/usr/local/php/bin/php-config –with-rabbit
# make && make install

修改 php.ini,在尾部加上:
extension = rabbit.so

查看扩展是否安装成功:
# php -m | grep rabbit
rabbit

完工!!

— EOF —

RabbitMQ: 消息交换模块 Exchange

RabbitMQ 中的 Exchange 负责对消息进行路由。
当收到 Publisher 传递给它的消息后,Exchange 会根据路由键 routing key 决定将消息加入到哪些消息队列中。

Exchange 有三种类型:

1. 直接交换类型 Direct Exchange Type
使用一个固定字符串作为 routing key。
若 exchange 和 message queue 绑定的 key 和 routing key 一样时,消息会被路由到绑定的这个 message queue 中。

2. 扇形交换类型 Fanout Exchange Type
该类型不使用 routing key,进来什么就出去什么。
扇形结构,可用于负载均衡。

3. 主题交换类型 Topic Exchange Type
routing key 中可以包括通配符。
在与 message queue 绑定时,也可以使用通配符。
这种交换类型的起名应该和微博中的应用有关,比如 *.sometopic 代表 sometopic 主题下的所有项目。
通配符:* 表示一个词 # 表示0个或多个词

— EOF —

RabbitMQ: 批量创建节点并组成集群

多核 CPU 上创建集群的好处很明显,可以充分地利用 CPU 运算能力。

之前讲的都是在单机上用命令方式逐个创建节点,然后由这些节点组成集群。

其实如果是在同一台计算机上创建多个节点,RabbitMQ 提供了一个便捷的命令来批量操作。

这个命令是:rabbitmq-multi。

命令格式:
# rabbitmq-multi start_all count
其中 count 为要创建的节点的数量。

运行该命令,将会创建 count 个节点,这些节点监听的端口从 5672 顺序开始。第一个节点名称为环境变量 RABBITMQ_NODENAME 定义的名称,之后的节点名称则以环境变量 RABBITMQ_NODENAME 定义的节点名和节点序号组成,并以下划线 _ 连接。

如:定义的节点名称为 test,创建3个节点,则3个节点的名称分别为:test、test_1、test_2。

为了让节点创建时自动装载集群配置,先创建集群配置文件 /etc/rabbitmq/rabbitmq_cluster.config。
因为是批量创建,所以配置内容与前文所述有所差别,配置内容:
[‘test@gentoo’].

创建3个节点:
# ./rabbitmq-multi start_all 3

查看创建的节点状态:
# ./rabbitmq-multi status
Status of all running nodes…
Node ‘test_2@gentoo’ with Pid 30120: running
Node ‘test_1@gentoo’ with Pid 30073: running
Node ‘test@gentoo’ with Pid 30026: running
done.

查看集群中节点状态:
# ./rabbitmqctl -n test@gentoo status
Status of node test@gentoo …
[…,
{nodes,[test_2@gentoo,test_1@gentoo,test@gentoo]},
{running_nodes,[test_2@gentoo,test_1@gentoo,test@gentoo]}]
…done.
# ./rabbitmqctl -n test_1@gentoo status
Status of node test_1@gentoo …
[…,
{nodes,[test_2@gentoo,test_1@gentoo,test@gentoo]},
{running_nodes,[test_2@gentoo,test@gentoo,test_1@gentoo]}]
…done.
# ./rabbitmqctl -n test_2@gentoo status
Status of node test_2@gentoo …
[…,
{nodes,[test_2@gentoo,test_1@gentoo,test@gentoo]},
{running_nodes,[test@gentoo,test_1@gentoo,test_2@gentoo]}]
…done.

停止集群中的所有节点:
# ./rabbitmq-multi stop_all
Stopping all nodes…
Stopping node test_2@gentoo
Erlang has closed
….OK
Stopping node test_1@gentoo
Erlang has closed
….OK
Stopping node test@gentoo
Erlang has closed
….OK
done.

— EOF —

RabbitMQ: 从配置文件创建集群

前一篇日志简单演示了下手动创建集群的过程,可能你已经发现以命令行的方式创建集群多少有些繁琐。
RabbitMQ 提供了一种创建集群的便捷方法——从配置文件创建集群。

环境变量 RABBITMQ_CLUSTER_CONFIG_FILE 指定了集群配置文件所在路径,默认为:/etc/rabbitmq/rabbitmq_cluster.config。
如果需要,可以在启动 RabbitMQ Server 前设置它来改变配置文件路径。
只需要把要组成集群的各个节点以数组的方式写在配置文件中即可,示例:
[‘test1@gentoo’, ‘test2@gentoo’, ‘test3@gentoo’].

清除上篇文章中创建的3个节点:
# ./rabbitmqctl -n test1@gentoo stop_app
Stopping node test1@gentoo …
…done.
# ./rabbitmqctl -n test1@gentoo reset
Resetting node test1@gentoo …
…done.
# ./rabbitmqctl -n test1@gentoo stop
Stopping and halting node test1@gentoo …
…done.
# ./rabbitmqctl -n test2@gentoo stop_app
Stopping node test2@gentoo …
…done.
# ./rabbitmqctl -n test2@gentoo reset
Resetting node test2@gentoo …
…done.
# ./rabbitmqctl -n test2@gentoo stop
Stopping and halting node test2@gentoo …
…done.
# ./rabbitmqctl -n test3@gentoo stop_app
Stopping node test3@gentoo …
…done.
# ./rabbitmqctl -n test3@gentoo reset
Resetting node test3@gentoo …
…done.
# ./rabbitmqctl -n test3@gentoo stop
Stopping and halting node test3@gentoo …
…done.

然后分别启动 RabbitMQ Server 即可(启动时,以上集群配置就会被加载到节点上)。
# export RABBITMQ_NODENAME=test1
# export RABBITMQ_NODE_PORT=5672
# ./rabbitmq-server -detached

# export RABBITMQ_NODENAME=test2
# export RABBITMQ_NODE_PORT=5673
# ./rabbitmq-server -detached

# export RABBITMQ_NODENAME=test3
# export RABBITMQ_NODE_PORT=5674
# ./rabbitmq-server -detached

查看下3个节点的状态:
# ./rabbitmqctl -n test1@gentoo status
Status of node test1@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test2@gentoo,test1@gentoo]}]
…done.
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test1@gentoo,test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test2@gentoo,test1@gentoo,test3@gentoo]}]
…done.

集群创建完毕!

你可能会问,如何指定某个节点为 ram node 还是 disk node。
通过这种方式创建出来的节点,都是 disk node。
如果需要修改为 ram node,目前只能通过命令来完成了。

— EOF —

RabbitMQ 集群测试

#### 创建节点 ####

在本机上新建三个节点进行测试:

节点1:test1@gentoo
# export RABBITMQ_NODENAME=test1
# export RABBITMQ_NODE_PORT=5672
# ./rabbitmq-server -detached

节点2:test1@gentoo
# export RABBITMQ_NODENAME=test2
# export RABBITMQ_NODE_PORT=5673
# ./rabbitmq-server -detached

节点3:test1@gentoo
# export RABBITMQ_NODENAME=test3
# export RABBITMQ_NODE_PORT=5674
# ./rabbitmq-server -detached

#### 创建集群 ####

下面要将上述结点组成一个 cluster,其中 test2@gentoo 为 ram node,另两个为 disk node。
创建 ram node 和 disk node 方法请参见 RabbitMQ 帮助文档:
http://www.rabbitmq.com/admin-guide.html#cluster
——
Example:

rabbitmqctl cluster rabbit@tanto hare@elena
This command instructs the RabbitMQ node to join the cluster with nodes rabbit@tanto and

hare@elena. If the node is one of these then it becomes a disk node, otherwise a ram

node.
——

将 test2@gentoo 做为 ram node 添加到 cluster 中:
# ./rabbitmqctl -n test2@gentoo stop_app
# ./rabbitmqctl -n test2@gentoo reset
# ./rabbitmqctl -n test2@gentoo cluster test1@gentoo
# ./rabbitmqctl -n test2@gentoo start_app
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test2@gentoo,test1@gentoo]},
{running_nodes,[test1@gentoo,test2@gentoo]}]
…done.

将 test3@gentoo 做为 disk node 添加到 cluster 中:
# ./rabbitmqctl -n test3@gentoo stop_app
# ./rabbitmqctl -n test3@gentoo reset
# ./rabbitmqctl -n test3@gentoo cluster test1@gentoo test3@gentoo
# ./rabbitmqctl -n test3@gentoo start_app
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{running_nodes,[test1@gentoo,test2@gentoo,test3@gentoo]}]
…done.

再来看看三个节点的汇总状态:
# ./rabbitmqctl -n test1@gentoo status
Status of node test1@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test2@gentoo,test1@gentoo]}]
…done.
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test1@gentoo,test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test1@gentoo,test2@gentoo,test3@gentoo]}]
…done.

#### 修改节点类型 ####

下面演示修改节点的类型:
* 将 test2@gentoo 修改为 disk node
* 将 test3@gentoo 修改为 ram node

修改节点类型,只需要 stop_app,不需要 reset。

修改 test2@gentoo:
# ./rabbitmqctl -n test2@gentoo stop_app
# ./rabbitmqctl -n test2@gentoo cluster test1@gentoo test2@gentoo
# ./rabbitmqctl -n test2@gentoo start_app

修改 test3@gentoo:
# ./rabbitmqctl -n test3@gentoo stop_app
# ./rabbitmqctl -n test3@gentoo cluster test1@gentoo test2@gentoo
# ./rabbitmqctl -n test3@gentoo start_app

#### 重启节点 ####

下面通过重启 test1@gentoo 和 test3@gentoo 节点来观察集群的状态:

停止 test1@gentoo:
# ./rabbitmqctl -n test1@gentoo stop
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test2@gentoo,test3@gentoo]}]
…done.

停止 test3@gentoo:
# ./rabbitmqctl -n test3@gentoo stop
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test2@gentoo]}]
…done.

启动 test1@gentoo:
# export RABBITMQ_NODENAME=test1
# export RABBITMQ_NODE_PORT=5672
# ./rabbitmq-server -detached
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test1@gentoo,test2@gentoo]}]
…done.

启动 test3@gentoo:
# export RABBITMQ_NODENAME=test3
# export RABBITMQ_NODE_PORT=5674
# ./rabbitmq-server -detached

再检查3个节点的状态:
# ./rabbitmqctl -n test1@gentoo status
Status of node test1@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test2@gentoo,test1@gentoo]}]
…done.
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test3@gentoo,test1@gentoo,test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo,test2@gentoo,test1@gentoo]},
{running_nodes,[test1@gentoo,test2@gentoo,test3@gentoo]}]
…done.

一切正常!

#### 移除节点 ####

很简单,只需要在 stop_app 后做一次 reset 操作即可。

如移除 test3@gentoo,使其成为独立节点:
# ./rabbitmqctl -n test3@gentoo stop_app
# ./rabbitmqctl -n test3@gentoo reset
# ./rabbitmqctl -n test3@gentoo start_app

查看3个节点的状态:
# ./rabbitmqctl -n test1@gentoo status
Status of node test1@gentoo …
[…,
{nodes,[test2@gentoo,test1@gentoo]},
{running_nodes,[test2@gentoo,test1@gentoo]}]
…done.
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test2@gentoo,test1@gentoo]},
{running_nodes,[test1@gentoo,test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo]},
{running_nodes,[test3@gentoo]}]
…done.

可以发现,test3@gentoo 已从 cluster 中移除。

接着移除 test2@gentoo:
# ./rabbitmqctl -n test2@gentoo stop_app
# ./rabbitmqctl -n test2@gentoo reset
# ./rabbitmqctl -n test2@gentoo start_app

查看3个节点的状态:
# ./rabbitmqctl -n test1@gentoo status
Status of node test1@gentoo …
[…,
{nodes,[test1@gentoo]},
{running_nodes,[test1@gentoo]}]
…done.
# ./rabbitmqctl -n test2@gentoo status
Status of node test2@gentoo …
[…,
{nodes,[test2@gentoo]},
{running_nodes,[test2@gentoo]}]
…done.
# ./rabbitmqctl -n test3@gentoo status
Status of node test3@gentoo …
[…,
{nodes,[test3@gentoo]},
{running_nodes,[test3@gentoo]}]
…done.

就剩下 test1@gentoo 了,这个节点中还残留着 cluster 的信息。

帮助里说到,最后一个节点使用 reset 会出错,要使用 force_reset:

——
Note that we used force_reset here. The reason is that removing a node from a cluster updates only the node-local configuration of the cluster, and that calling reset gets the node to connect to any of the other nodes that it believes are in the cluster, to perform some house-keeping that is necessary when leaving a cluster. However, at this point, there are no other nodes in the cluster, but rabbit@rabbit2 doesn’t know this. As a result, calling reset would fail, as it can’t connect to rabbit@rabbit1 or rabbit@rabbit3, hence the use of force_reset, in which rabbit@rabbit2 does not attempt to contact any other nodes in the cluster. This situation only arises when resetting the last remaining node of a cluster.
——

但是我用 reset 也是正常的!可能是版本不一样吧。

##############

到此为止,先下班回家!

— EOF —