Redis.php 6.33 KiB
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\driver;
use Exception;
use think\queue\job\Redis as RedisJob;
class Redis
    /** @var  \Redis */
    protected $redis;
    protected $options = [
        'expire'     => 60,
        'default'    => 'default',
        'host'       => '127.0.0.1',
        'port'       => 6379,
        'password'   => '',
        'timeout'    => 0,
        'persistent' => false
    public function __construct($options)
        if (!extension_loaded('redis')) {
            throw new Exception('redis扩展未安装');
        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        $func        = $this->options['persistent'] ? 'pconnect' : 'connect';
        $this->redis = new \Redis;
        $this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);
        if ('' != $this->options['password']) {
            $this->redis->auth($this->options['password']);
    public function push($job, $data = '', $queue = null)
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    public function later($delay, $job, $data = '', $queue = null)
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
    public function pop($queue = null)
        $original = $queue ?: $this->options['default'];
        $queue = $this->getQueue($queue);
        if (!is_null($this->options['expire'])) {
            $this->migrateAllExpiredJobs($queue);