An error occurred while loading the file. Please try again.
-
张亚俊 authored9c018366
Redis.php 6.33 KiB
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\driver;
use Exception;
use think\queue\job\Redis as RedisJob;
class Redis
{
/** @var \Redis */
protected $redis;
protected $options = [
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'timeout' => 0,
'persistent' => false
];
public function __construct($options)
{
if (!extension_loaded('redis')) {
throw new Exception('redis扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$func = $this->options['persistent'] ? 'pconnect' : 'connect';
$this->redis = new \Redis;
$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);
if ('' != $this->options['password']) {
$this->redis->auth($this->options['password']);
}
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pop($queue = null)
{
$original = $queue ?: $this->options['default'];
$queue = $this->getQueue($queue);
if (!is_null($this->options['expire'])) {
$this->migrateAllExpiredJobs($queue);