redis用来消息队列的模型

定义公共载入文件

<?php
//redis_connectionner.php
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
return $redis;
  • 利用redis的生产者消费模式进行消息队列 (不建议使用), 有些时候可能会出现程序莫名其妙的卡住,问题不好复现

    • 利用redis的生产者和消费者模型进行消息队列实现

    创建生产者(发布消息 | 入队)文件

    <?php
    //enqueue.php
    $redis = require "../redis_connectionner.php";
    fwrite(STDOUT, "生产者 -> 消息队列服务器(redis)过程演示:" . PHP_EOL);
    $str = "qwertyuiopasdfghjklzxcvbnm1234567890";
    while (true) {
    	$char = $str[rand(0, strlen($str) - 1)];
    	sleep(1);
    	$redis->rpush("tasks",$char);
    	fwrite(STDOUT, "入队:$char" . PHP_EOL);
    }
    
    

    创建消费者(处理消息 | 出队)文件

    <?php
    //dequeue.php
    $redis = require "../redis_connectionner.php";
    fwrite(STDOUT, "消费者 <- 消息队列服务器(redis)过程演示:" . PHP_EOL);
    
    while (true) {
    	sleep(1);
    	$value = $redis->lpop('tasks');
    	if ($value) {
    		fwrite(STDOUT, "出队:" . $value . PHP_EOL);
    	} else {
    		fwrite(STDOUT, "队列为空" . PHP_EOL);
    	}
    }
    
    

    启动入队文件

    root@haochen /mnt/e/queue/demo_redis_queue
      # php enqueue.php                                                                                                     !654
    生产者 -> 消息队列服务器(redis)过程演示:
    入队:r
    入队:3
    入队:n
    入队:u
    入队:g
    入队:m
    入队:m
    入队:r
    

    执行出队文件

        root@haochen /mnt/e/queue/demo_redis_queue
          # php dequeue.php                                                                                                     !655
        消费者 <- 消息队列服务器(redis)过程演示:
        出队:r
        出队:3
        出队:n
        出队:u
        出队:g
        出队:m
        出队:m
        出队:r
        队列为空
        队列为空
    
    

** 利用redis生产者消费者模式可以实现简单的队列, 但是据说在某些时候队列可能会卡住,导致对接无法进行.无法复现,只能说一下 **

  • 利用redis发布订阅模式实现消息队列

    创建发布文件

    <?php
    $redis = require "../redis_connectionner.php";
    fwrite(STDOUT, "发布 -> 消息队列服务器(redis)过程演示:" . PHP_EOL);
    $str = "qwertyuiopasdfghjklzxcvbnm1234567890";
    $char = $str[rand(0, strlen($str) - 1)];
    $redis->publish("tasks",$char);
    fwrite(STDOUT, "入队:$char" . PHP_EOL);
    

    创建订阅文件

    <?php
    $redis = require "../redis_connectionner.php";
    fwrite(STDOUT, "订阅 <- 消息队列服务器(redis)过程演示:" . PHP_EOL);
    
    while (true) {
    	sleep(1);
    	$value = $redis->subscribe(['tasks'],'callback');	//无法获取到任务的时候会阻塞进程,直到发布者发布新的消息
    }
    
    function callback($instance,$channelName,$message){
    	fwrite(STDOUT,"任务模型:" . $channelName.",");
    	if ($message) {
    		fwrite(STDOUT, "出队:" . $message . PHP_EOL);
    	} else {
    		fwrite(STDOUT, "队列为空" . PHP_EOL);
    	}
    }
    
    

    先运行订阅.php

    root@haochen /mnt/e/queue/demo_redis_push_sub_queue
    # php subscribe.php                                                                                                   !658
    订阅 <- 消息队列服务器(redis)过程演示:
    #处于阻塞状态, 等待发布出来的消息队列
    

    再启动另一个窗口运行发布者

    root@haochen:/mnt/e/queue/demo_redis_push_sub_queue# php publish.php
    发布 -> 消息队列服务器(redis)过程演示:
    入队:y
    

    订阅窗口则会立即执行发布的任务

    # php subscribe.php    
    订阅 <- 消息队列服务器(redis)过程演示:
    任务模型:tasks,出队:y
    

** 在练习代码的时候出现了订阅端抛出异常 PHP Fatal error: Uncaught RedisException: read error on connection **

解决办法

由于redis扩展也是基于php 的socket方式实现,因此该参数值同样会起作用。找到了问题就比较好解决了:

1. 修改php.ini设置default_socket_timeout = 600
2.ini_set('default_socket_timeout', -1);
3.修改redis.conf, timeout 设为0

参考资料