这里是文章模块栏目内容页
php实现多线程定时任务执行功能

首先是需要做个php定时(例如5秒、3秒)执行一段php功能,完成业务需要。比如客户留言的同时,发送小程序订阅消息通知客户,同时发送客服消息 通知管理员有新的留言进来。主要的业务,在一个php请求中无法顺利完成,需要把业务插入job表,然后定时去执行job中的业务记录。

<?php
/*这是一个任务类,可以序列号存储到数据表中,执行时反序列化为类执行execute方法*/
class joblist
{
   function execute($parent){
       return ;
   } 
}

class Queue
{

    public $tableName = 'ewei_shop_queue';
    public $channel = 'queue';

    public $deleteReleased = true;


    private $_workerPid;

    public function __construct()
    {
        spl_autoload_register(array('self', 'autoload'), true, true);
    }


    public static function autoload($className)
    {

        $classFile =  str_replace('\\', '/', $className) . '.php';

        if ($classFile === false || !is_file($classFile)) {
            return;
        }

        include $classFile;
    }

    /**
     * Listens queue and runs each job.
     *
     * @param bool $repeat whether to continue listening when queue is empty.
     * @param int $timeout number of seconds to sleep before next iteration.
     * @return null|int exit code.
     * @internal for worker command only
     * @since 2.0.2
     */
    public function run($repeat, $timeout = 0)
    {
        return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
            while ($canContinue()) {
                if ($payload = $this->reserve()) {
                    if ($this->handleMessage(
                        $payload['id'],
                        $payload['job'],
                        $payload['ttr'],
                        $payload['attempt']
                    )) {
                        $this->release($payload);
                    }
                } elseif (!$repeat) {
                    break;
                } elseif ($timeout) {
                    sleep($timeout);
                }
            }
        });
    }



    /**
     * Runs worker.
     *
     * @param callable $handler
     * @return null|int exit code
     * @since 2.0.2
     */
    protected function runWorker(callable $handler)
    {
        $this->_workerPid = getmypid();

        /** @var SignalLoop $loop */
        $loop = new SignalLoop($this);

        file_put_contents('queue.pid',$this->_workerPid);
        if (function_exists('chmod')){
            chmod('queue.pid',0777);
        }

        $exitCode = null;
        try {
            call_user_func($handler, function () use ($loop) {
                return $loop->canContinue();
            });
        } finally {
            $this->_workerPid = null;
        }

        return null;
    }



    /**
     * Takes one message from waiting list and reserves it for handling.
     *
     * @return array|false payload
     * @throws Exception in case it hasn't waited the lock
     */
    protected function reserve()
    {
        //$payload = pdo_fetch("SELECT * FROM ".tablename($this->tableName)." WHERE `channel`='{$this->channel}' AND `reserved_at` IS NULL AND `pushed_at`<=:time - delay ORDER BY `priority` ASC, `id` ASC limit 1",array(':time'=>time()));
      /*  if (is_array($payload)) {
            $payload['reserved_at'] = time();
            $payload['attempt'] = (int) $payload['attempt'] + 1;

            pdo_update($this->tableName,array(
                'reserved_at' => $payload['reserved_at'],
                'attempt' => $payload['attempt'],
            ),array(
                'id' => $payload['id'],
            ));

            // pgsql
            if (is_resource($payload['job'])) {
                $payload['job'] = stream_get_contents($payload['job']);
            }
        }*/
        $payload = [];
        $payload['reserved_at'] = time();
        $payload['attempt'] = 1;
        $payload['job'] = serialize( new joblist() );
        $payload['ttr'] = '';
        $payload['id'] = 1;
        return $payload;
    }


    protected function handleMessage($id, $message, $ttr, $attempt)
    {

        list($job, $error) = $this->unserializeMessage($message);

        if (empty($job)){
            return false;
        }
        $job->execute($this);
        return true;
    }

    /**
     * @param $serialized
     * @return array
     */
    public function unserializeMessage($serialized)
    {

        try {
            $job = unserialize($serialized);
        } catch (\Exception $e) {
            return array(null, new Exception($serialized, $e->getMessage(), 0, $e));
        }
        return array($job, null);
    }


    /**
     * @param array $payload
     */
    protected function release($payload)
    {
        if ($this->deleteReleased) {
            $this->writeLog('run ok end');
        } else {
          // pdo_update($this->tableName,array('done_at' => time()), array('id' => $payload['id']));
            echo 'run ok update';
        }
    }
   
    protected  function writeLog($info){
     
            $logfile = 'queue.log' ;
            $fh = fopen($logfile,'ab'); //打开,追加模式
            fwrite($fh,date("Y-m-d H:i:s")."#:".$info."\r\n");
            fclose($fh);

 
    }

    public function fileGlob($path,$recursive = true){
        $res = array();
        if (substr($path,-1) !== '*')
        {
            $path = $path.'*';
        }
        foreach(glob($path) as $file){
            if($file != '.' && $file != '..'){
                $relative_path = str_replace('/home/php73/','',$file);
                if(is_dir($file)){
                    if ($recursive)
                    {
                        $res = array_merge($res,$this->fileGlob($file . '/*',$recursive));
                    }
                }else{
                    $res[$relative_path] = $file;
                }
            }
        }
        return $res;
    }
}


/**
 * Signal Loop.
 *
 * @author Roman Zhuravlev <zhuravljov@gmail.com>
 * @since 2.0.2
 */
class SignalLoop
{
    /**
     * @var array of signals to exit from listening of the queue.
     */
    public $exitSignals = array(
        15, // SIGTERM
        2,  // SIGINT
        1,  // SIGHUP
    );
    /**
     * @var array of signals to suspend listening of the queue.
     * For example: SIGTSTP
     */
    public $suspendSignals = array();
    /**
     * @var array of signals to resume listening of the queue.
     * For example: SIGCONT
     */
    public $resumeSignals = array();

    /**
     * @var Queue
     */
    protected $queue;

    /**
     * @var bool status when exit signal was got.
     */
    private static $exit = false;
    /**
     * @var bool status when suspend or resume signal was got.
     */
    private static $pause = false;


    /**
     * @param Queue $queue
     * @inheritdoc
     */
    public function __construct($queue)
    {
        $this->queue = $queue;
    }

    /**
     * Sets signal handlers.
     *
     * @inheritdoc
     */
    public function init()
    {
        if (extension_loaded('pcntl')) {
            foreach ($this->exitSignals as $signal) {
                pcntl_signal($signal, function () {
                    self::$exit = true;
                });
            }
            foreach ($this->suspendSignals as $signal) {
                pcntl_signal($signal, function () {
                    self::$pause = true;
                });
            }
            foreach ($this->resumeSignals as $signal) {
                pcntl_signal($signal, function () {
                    self::$pause = false;
                });
            }
        }
    }

    /**
     * Checks signals state.
     *
     * @inheritdoc
     */
    public function canContinue()
    {
        if (extension_loaded('pcntl')) {
            pcntl_signal_dispatch();
            // Wait for resume signal until loop is suspended
            while (self::$pause && !self::$exit) {
                usleep(10000);
                pcntl_signal_dispatch();
            }
        }

        return !self::$exit;
    }
}
$queue = new Queue();
$queue->run(true,3);

然后,在php的 cli客户端,执行此queue.php。发现它没3秒就执行一个任务。


好了,本文内容结束,感谢您的参与阅读。