首先是需要做个php定时(例如5秒、3秒)执行一段php功能,完成业务需要。比如客户留言的同时,发送小程序订阅消息通知客户,同时发送客服消息 通知管理员有新的留言进来。主要的业务,在一个php请求中无法顺利完成,需要把业务插入job表,然后定时去执行job中的业务记录。
<?php /*这是一个任务类,可以序列号存储到数据表中,执行时反序列化为类执行execute方法*/ class joblist { function execute($parent){ return ; } } class Queue { public $tableName = 'ewei_shop_queue'; public $channel = 'queue'; public $deleteReleased = true; private $_workerPid; public function __construct() { spl_autoload_register(array('self', 'autoload'), true, true); } public static function autoload($className) { $classFile = str_replace('\\', '/', $className) . '.php'; if ($classFile === false || !is_file($classFile)) { return; } include $classFile; } /** * Listens queue and runs each job. * * @param bool $repeat whether to continue listening when queue is empty. * @param int $timeout number of seconds to sleep before next iteration. * @return null|int exit code. * @internal for worker command only * @since 2.0.2 */ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if ($payload = $this->reserve()) { if ($this->handleMessage( $payload['id'], $payload['job'], $payload['ttr'], $payload['attempt'] )) { $this->release($payload); } } elseif (!$repeat) { break; } elseif ($timeout) { sleep($timeout); } } }); } /** * Runs worker. * * @param callable $handler * @return null|int exit code * @since 2.0.2 */ protected function runWorker(callable $handler) { $this->_workerPid = getmypid(); /** @var SignalLoop $loop */ $loop = new SignalLoop($this); file_put_contents('queue.pid',$this->_workerPid); if (function_exists('chmod')){ chmod('queue.pid',0777); } $exitCode = null; try { call_user_func($handler, function () use ($loop) { return $loop->canContinue(); }); } finally { $this->_workerPid = null; } return null; } /** * Takes one message from waiting list and reserves it for handling. * * @return array|false payload * @throws Exception in case it hasn't waited the lock */ protected function reserve() { //$payload = pdo_fetch("SELECT * FROM ".tablename($this->tableName)." WHERE `channel`='{$this->channel}' AND `reserved_at` IS NULL AND `pushed_at`<=:time - delay ORDER BY `priority` ASC, `id` ASC limit 1",array(':time'=>time())); /* if (is_array($payload)) { $payload['reserved_at'] = time(); $payload['attempt'] = (int) $payload['attempt'] + 1; pdo_update($this->tableName,array( 'reserved_at' => $payload['reserved_at'], 'attempt' => $payload['attempt'], ),array( 'id' => $payload['id'], )); // pgsql if (is_resource($payload['job'])) { $payload['job'] = stream_get_contents($payload['job']); } }*/ $payload = []; $payload['reserved_at'] = time(); $payload['attempt'] = 1; $payload['job'] = serialize( new joblist() ); $payload['ttr'] = ''; $payload['id'] = 1; return $payload; } protected function handleMessage($id, $message, $ttr, $attempt) { list($job, $error) = $this->unserializeMessage($message); if (empty($job)){ return false; } $job->execute($this); return true; } /** * @param $serialized * @return array */ public function unserializeMessage($serialized) { try { $job = unserialize($serialized); } catch (\Exception $e) { return array(null, new Exception($serialized, $e->getMessage(), 0, $e)); } return array($job, null); } /** * @param array $payload */ protected function release($payload) { if ($this->deleteReleased) { $this->writeLog('run ok end'); } else { // pdo_update($this->tableName,array('done_at' => time()), array('id' => $payload['id'])); echo 'run ok update'; } } protected function writeLog($info){ $logfile = 'queue.log' ; $fh = fopen($logfile,'ab'); //打开,追加模式 fwrite($fh,date("Y-m-d H:i:s")."#:".$info."\r\n"); fclose($fh); } public function fileGlob($path,$recursive = true){ $res = array(); if (substr($path,-1) !== '*') { $path = $path.'*'; } foreach(glob($path) as $file){ if($file != '.' && $file != '..'){ $relative_path = str_replace('/home/php73/','',$file); if(is_dir($file)){ if ($recursive) { $res = array_merge($res,$this->fileGlob($file . '/*',$recursive)); } }else{ $res[$relative_path] = $file; } } } return $res; } } /** * Signal Loop. * * @author Roman Zhuravlev <zhuravljov@gmail.com> * @since 2.0.2 */ class SignalLoop { /** * @var array of signals to exit from listening of the queue. */ public $exitSignals = array( 15, // SIGTERM 2, // SIGINT 1, // SIGHUP ); /** * @var array of signals to suspend listening of the queue. * For example: SIGTSTP */ public $suspendSignals = array(); /** * @var array of signals to resume listening of the queue. * For example: SIGCONT */ public $resumeSignals = array(); /** * @var Queue */ protected $queue; /** * @var bool status when exit signal was got. */ private static $exit = false; /** * @var bool status when suspend or resume signal was got. */ private static $pause = false; /** * @param Queue $queue * @inheritdoc */ public function __construct($queue) { $this->queue = $queue; } /** * Sets signal handlers. * * @inheritdoc */ public function init() { if (extension_loaded('pcntl')) { foreach ($this->exitSignals as $signal) { pcntl_signal($signal, function () { self::$exit = true; }); } foreach ($this->suspendSignals as $signal) { pcntl_signal($signal, function () { self::$pause = true; }); } foreach ($this->resumeSignals as $signal) { pcntl_signal($signal, function () { self::$pause = false; }); } } } /** * Checks signals state. * * @inheritdoc */ public function canContinue() { if (extension_loaded('pcntl')) { pcntl_signal_dispatch(); // Wait for resume signal until loop is suspended while (self::$pause && !self::$exit) { usleep(10000); pcntl_signal_dispatch(); } } return !self::$exit; } } $queue = new Queue(); $queue->run(true,3);
然后,在php的 cli客户端,执行此queue.php。发现它没3秒就执行一个任务。
好了,本文内容结束,感谢您的参与阅读。
实用工具: JSON字符串格式化 | js压缩代码格式化工具 | 异步XMLHttpRequests库axios.js文档 | vue-axios文档 | Go语言文档