Zmq(ZeroMQ)是什么?
ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等. ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。
ZMQ的安装(Linux)
//软件编译
git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install
#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib
sudo ldconfig
ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装
git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install
sudo vim /etc/php.ini
//添加扩展
[zeromq]
extension = zmq.so
:wq
查看扩展是否加载成功:php -m | grep php
。
先写一个简单请求-应答,首先是服务端reply.php(应答者)
<?php
$pContext = new ZMQContext();
$pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
$strMessage = $pServer->recv();
echo $strMessage."\r\n";
$pServer->send("From Server1:".$strMessage);
}
然后是客户端requst.php(请求者)
<?php
$pContext = new ZMQContext();
$pClient = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());
分别在不同终端预先一下程序
#请求端可以先启动等待通信返回(即使对方没有启动或返回)
php requset.php
#启动服务
php reply.php
使用ZeroMQ进行通信的步骤
-
使用ZMQContext创建一个上下文
-
使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
- PUB,SUB (推送和订阅)
- REQ,REP (请求和应答)
- REQ,ROUTER (take care, REQ inserts an extra null frame)
- DEALER,REP (take care, REP assumes a null frame)
- DEALER,ROUTER
- DEALER,DEALER
- ROUTER,ROUTER
- PUSH,PULL
- PAIR,PAIR
分类包括
- 轮询,REQ,PUSH,DEALER
- 多播,PUB
- 公平排队,REP,SUB,PULL,DEALER
- 明确寻址,ROUTER
- 单播,PAIR
-
如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
- 进程内部通信,inproc://
- 进程间通信,ipc://
- 网络间通信,tcp://
- 多播,pgm://
-
使用send/recv发送/接收消息
使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳.
ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。
再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合
,比如下面这个
这里使用ROUTER
和DEALER
作为中介,转发请求,客户端可以异步发送求
,不用等待服务端响应。
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);
$arrRead = $arrWrite = array();
while(true){
$nEvent = $pPoll->poll($arrRead, $arrWrite);
if ($nEvent > 0) {
foreach($arrRead as $pSocket){
if($pSocket === $pFrontend){
while (true){
$strMessage = $pSocket->recv();
$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
$pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
if(!$nMore){
break;
}
}
}
else if ($pSocket === $pBackend){
while (true){
$strMessage = $pSocket->recv();
$nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
$pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
if(!$nMore){
break;
}
}
}
}
}
}
然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上
<?php
$pContext = new ZMQContext();
$pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
$strMessage = $pServer->recv();
echo $strMessage."\r\n";
$pServer->send("From Server1:".$strMessage);
}
这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();
ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者
<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");
while (true) {
$pPublisher->send("A", ZMQ::MODE_SNDMORE);
$pPublisher->send("1:We don't want to see this");
$pPublisher->send("B", ZMQ::MODE_SNDMORE);
$pPublisher->send("1:We would like to see this");
sleep (1);
}
然后是订阅者
$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");
while (true) {
// Read envelope with address
$address = $pSubscriber->recv();
// Read message contents
$contents = $pSubscriber->recv();
printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}
Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。
ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php
<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);
$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");
$pPush->send("Hello Client 1");
客户端Pull.php
<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);
//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");
var_dump($pPull->recv());
如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接
<?php
$pContext = new ZMQContext();
$pServer = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
$message = $pServer->recv();
echo $message . PHP_EOL;
$pServer->send("Hello from server1:".$message);
}
客户端可以选择走TCP或者IPC进行消息通信
<?php
$pContext = new ZMQContext();
$pClient = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;
使用ZeroMQ的进程内部消息通信也很简单
$pServer = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");
$pClient = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());
var_dump($pServer->recv());
ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。
更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展。
参考链接: