YII消息队列shmilyzxt/yii2-queue实践全记录,记录那些踩过的坑
官方介绍:https://packagist.org/packages/shmilyzxt/yii2-queue
命令行执行
php composer.phar require --prefer-dist shmilyzxt/yii2-queue "dev-master"
第一次下载失败了,更换国内镜像
composer config -g repositories.packagist composer http://packagist.phpcomposer.com
这次提示让输入一个token,点击最后一个连接,可以登录github生成token,生成后复制粘贴过来回车即可
#错误提示 Could not fetch https://api.github.com/repos/jquery/jquery-dist, please create a GitHub OAuth token to go over the API rate limit Head to https://github.com/settings/tokens/new?scopes=repo&description=Composer+on+bogon+2018-11-26+0241
这次安装成功了,可以在项目vender目录下看到多了一个文件夹shmilyzxt
介绍文档里面没有看到数据库的字段, 于是查看源代码,果然有sql文件,在shmilyzxt/yii2-queue/jobs和shmilyzxt/yii2-queue/faild分别发现了一个sql,执行即可。
消息队列数据库
jobs.sql
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for jobs -- ---------------------------- DROP TABLE IF EXISTS `jobs`; CREATE TABLE `jobs` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `queue` varchar(50) DEFAULT NULL, `payload` text, `attempts` int(5) NOT NULL DEFAULT '0', `reserved` tinyint(1) NOT NULL DEFAULT '0', `reserved_at` int(10) DEFAULT NULL, `available_at` int(10) DEFAULT NULL, `created_at` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
存储失败日志的数据库failed_jobs.sql
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for failed_jobs -- ---------------------------- DROP TABLE IF EXISTS `failed_jobs`; CREATE TABLE `failed_jobs` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `connector` varchar(255) DEFAULT NULL, `queue` varchar(255) DEFAULT NULL, `payload` text, `failed_at` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
我准备采用数据库存储消息队列,如果你使用redis,配置见其官网
在common/config/main.php中,为components数组增加下列配置
'queue' => [ 'class' => 'shmilyzxt\queue\queues\DatabaseQueue', //队列使用的类 'jobEvent' => [ //队列任务事件配置,目前任务支持2个事件 'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'], 'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'], ], 'connector' => [//队列中间件链接器配置(这是因为使用数据库,所以使用yii\db\Connection作为数据库链接实例) 'class' => 'yii\db\Connection', 'dsn' => 'mysql:host=localhost;dbname=yii2advanced', 'username' => 'root', 'password' => '', 'charset' => 'utf8', ], 'table' => 'jobs', //存储队列数据表名 'queue' => 'default', //队列的名称 'expire' => 60, //任务过期时间 'maxJob' =>0, //队列允许最大任务数,0为不限制 'failed' => [//任务失败日志记录(目前只支持记录到数据库) 'logFail' => true, //开启任务失败处理 'provider' => [ //任务失败处理类 'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider', 'db' => [ //数据库链接 'class' => 'yii\db\Connection', 'dsn' => 'mysql:host=localhost;dbname=yii2advanced', 'username' => 'root', 'password' => '', 'charset' => 'utf8', ], 'table' => 'failed_jobs' //存储失败日志的表名 ], ], ]
更新:这块可以后面还要改,继续往下看吧,这么配置真正实施不行
队列需要有处理的handler,比如处理邮件发送的,处理短信发送的,新建自己的队列处理handler,继承shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle和失败处理方法failed,一个发邮件的jobhandler类似:
class SendEmail extends \shmilyzxt\queue\base\JobHandler{ public function handle($job,$data){ if($job->getAttempts() > 3){ $this->failed($job); } $payload = $job->getPayload(); //$payload即任务的数据,你拿到任务数据后就可以执行发邮件了 //TODO 发邮件 } public function failed($job,$data){ die("发了3次都失败了,算了"); } }
任务入队列提供两个方法:
\Yii::$app->queue->pushOn($hander,$data,$queue='default') 即时任务入队列:这样的任务入队列后,如果队列监听在运行,那么任务会立刻进入ready状态,可以被监听进程执行。 该方法有3个参数,第一个为任务处理handler,第二个为任务数据,第三个为队列名称,默认为 default。
\Yii::$app->queue->laterOn($delay,$handler,$data,$queue='default') 延时任务入队列:这样的任务入队列后不会立刻被队列监听进程之行,需要等待 $delay秒后任务才就绪。
目前支持的handler有:
1,新建自己的队列处理handler,继承、shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle()和失败处理方法failed()。
2, 一个php闭包,形如 function($job,$data){}
\Yii::$app->queue->pushOn(new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email'); \Yii::$app->queue->pushOn(function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email'); \Yii::$app->queue->laterOn(120,new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email'); \Yii::$app->queue->pushOn(120,function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');
任务入队执行成功,会在jobs表中添加一条数据
这里我遇到了很多问题,先看官网说法
启动后台队列监听进程,对任务进行处理,您可以使用yii console app来启动,你也可以使用更高级的如swoole来高效的运行队列监听, 目前提供了一个Worker类,在控制台程序使用Worker::listen(Queue $queue,$queueName='default',$attempt=10,$memory=512,$sleep=3,$delay=0)可以 启动队列监听进程,其中 $attempt是任务尝试次数,$memory是允许使用最大内存,$sleep表示每次尝试从队列中获取任务的间隔时间,$delay代表把任务重新加入队列 时是否延时(0代表不延时),一个标准yii console app 启动队列监听进程代码如下;
class WorkerController extends \yii\console\Controller { public function actionListen($queueName='default',$attempt=10,$memeory=128,$sleep=3 ,$delay=0){ Worker::listen(\Yii::$app->queue,$queueName,$attempt,$memeory,$sleep,$delay); } } yii worker/listen default 10 128 3 0
很简单是不是,可是YII的命令行启动我始终也是运行不起来啊,首先根本没有yii这个命令,如果你对yii命令行熟悉,那我恭喜你,如果不熟悉,呵呵,估计现在就搞不下去了,我足足搞了一下午啊。
没有yii命令,我们现在项目的根目录下创建一个文件,名为yii
#!/usr/bin/php <?php /** * Yii console bootstrap file. * * @link http://www.yiiframework.com/ * @copyright Copyright (c) 2008 Yii Software LLC * @license http://www.yiiframework.com/license/ */ defined('YII_DEBUG') or define('YII_DEBUG', true); defined('YII_ENV') or define('YII_ENV', 'dev'); require(__DIR__ . '/common/config/constant.php'); //引入了我定义的一些常量,根据你的需要看是否引入 require(__DIR__ . '/vendor/autoload.php'); //必须的 require(__DIR__ . '/vendor/yiisoft/yii2/Yii.php'); //必须的 $config = require(__DIR__ . '/commands/console.php'); //这个是新增的文件 $application = new yii\console\Application($config); require(__DIR__ . '/common/helpers/SendEmailMessage.php'); //引入发送邮件的类,根据你的情况引入,如果不引入,后面不能使用这个类 require(__DIR__ . '/common/helpers/EmailHelper.php'); //引入发送邮件的类,根据你的情况引入,如果不引入,后面不能使用这个类 $exitCode = $application->run(); exit($exitCode);
其中/commands/console.php是我创建的一个新文件,用来存放队列配置的,文章第二部分“配置”实际那样做是不行的
项目下新增目录结构:
+backend ...原后台代码 +frontend ...原前端代码 +commands //新增的文件夹 console.php //新增的文件,队列配置的 WorkerController.php //新增的文件,用来监听消息队列的
/commands/console.php 内容如下
<?php return [ 'id' => 'app-commands', 'basePath' =>dirname(__DIR__), 'controllerNamespace' => 'app\commands', 'components' => [ 'db' => [ 'class' => 'yii\db\Connection', 'dsn' => 'mysql:host=127.0.0.1;dbname=***', 'username' => 'root', 'password' => '****', 'charset' => 'utf8', 'tablePrefix' => '', 'emulatePrepare' => false, 'attributes' => [ PDO::ATTR_STRINGIFY_FETCHES => false, ], 'enableSchemaCache' => false, 'schemaCacheDuration' => 3600, ], 'mailer' => [ 'class' => 'yii\swiftmailer\Mailer', 'useFileTransport' => false, 'transport' => [ 'class' => 'Swift_SmtpTransport', 'host' => EMAIL_SERVICE, 'username' => EMAIL_USER, 'password' => EMAIL_PWD, 'port' => EMAIL_GR_PORT, 'encryption' => 'SSL', ], 'messageConfig'=>[ 'charset'=>'UTF-8', 'from'=>[EMAIL_USER=>EMAIL_USER_NAME] ], ], 'queue' => [ 'class' => 'shmilyzxt\queue\queues\DatabaseQueue', //队列使用的类 'jobEvent' => [ //队列任务事件配置,目前任务支持2个事件 // 'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'], // 'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'], ], 'connector' => [//队列中间件链接器配置(这是因为使用数据库,所以使用yii\db\Connection作为数据库链接实例) 'class' => 'yii\db\Connection', 'dsn' => 'mysql:host=127.0.0.1;dbname=***', 'username' => 'root', 'password' => '****', 'charset' => 'utf8', ], 'table' => 'jobs', //存储队列数据表名 'queue' => 'email', //队列的名称 'expire' => 300, //任务过期时间 'maxJob' =>0, //队列允许最大任务数,0为不限制 'failed' => [//任务失败日志记录(目前只支持记录到数据库) 'logFail' => true, //开启任务失败处理 'provider' => [ //任务失败处理类 'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider', 'db' => [ //数据库链接 'class' => 'yii\db\Connection', 'dsn' => 'mysql:host=127.0.0.1;dbname=***', 'username' => 'root', 'password' => '**', 'charset' => 'utf8', ], 'table' => 'jobs_failed' //存储失败日志的表名 ], ], ] ], ];
在components下要配置db信息,为了链接数据库的,还要配置mailer,为了发送邮件的,最后还有queue的配置
注意一点:数据库不要配置成localhost,要采用127.0.0.1,否则可能链接不上,坑~,因为这两种不一样,一个是通过socket链接数据库,一个是通过网络链接,改成127.0.0.1后可能你还是连接不上数据库,如果你用的是MAMP,修改MySQL的配置,勾选Allow network to Mysql from other computer
/commands/WorkerController.php 内容如下
<?php namespace app\commands; use yii\console\Controller; use shmilyzxt\queue\Worker; class WorkerController extends Controller{ public function actionListen($queueName='email',$attempt=3,$memeory=128,$sleep=3 ,$delay=0){ Worker::listen(\Yii::$app->queue,$queueName,$attempt,$memeory,$sleep,$delay); } }
配置完成之后在目录下输入命令 ./yii ,看看是否看到 “worker/listen”命令,如果有就可以进行监听了,如果没有,检查一下上面是否和我的一致,注意命名空间和文件夹要一致,这个问题我也足足搞了很久,坑
./yii worker/listen
好了,现在可以测试了,想办法通过接口创建一个任务,看看能不能发邮件,我在后台搞了一个接口,浏览器中输入地址访问一次添加一次任务
<?php namespace backend\controllers\systems; use app\command\WorkerController; use backend\controllers\BaseController; use common\helpers\SendEmailMessage; class TestController extends BaseController{ public function actionAddSendEmailQueue(){ $sendEmail=new SendEmailMessage(); \Yii::$app->queue->pushOn($sendEmail,['email'=>'501351981@qq.com','title'=>'test','content'=>'email test'],'email'); echo "添加成功了"; } }
我遇到的问题,刚开始发送邮件是可以的,但是命令运行一段时间之后,就收不到邮件了,通过try 发现错误是
YII Swift_TransportException: Expected response code 250 but got code "", message'',
可能的错误信息2
exception 'yii\base\ErrorException' with message 'fwrite(): SSL: Broken pipe' in
vendor/swiftmailer/swiftmailer/lib/classes/Swift/Transport/StreamBuffer.php:232
起初我猜想是php运行超时了,修改php的超时时间,无效,最后改了这个好了,添加红色部分代码
修改vendor/swiftmailer/swiftmailer/lib/classes/Swift/Mailer.php:90
public function send(Swift_Mime_Message $message, &$failedRecipients = null)
{
$failedRecipients = (array) $failedRecipients;
if (!$this->_transport->isStarted()) {
$this->_transport->start();
}
$sent = 0;
try {
$sent = $this->_transport->send($message, $failedRecipients);
} catch (Swift_RfcComplianceException $e) {
foreach ($message->getTo() as $address => $name) {
$failedRecipients[] = $address;
}
} catch (\Exception $exception) {
$this->_transport->stop();
sleep(10);
throw $exception;
} finally {
$this->_transport->stop();
sleep(1);
}
return $sent;
}
终于可以正常运行了,至此告一段落,真是一次艰难的旅途。。。。。。。
正式使用我们还有很多问题要解决
首先是如何在后台运行命令:
当我们在命令行启动 worker/listen进程后,关闭命令行,进程也跟着结束了,这肯定不行的,要改为在后台一直运行,可以采用“nohup 命令所在目录 命令 &”来后台执行命令
nohup /Applications/XAMPP/xamppfiles/htdocs/***/yii worker/listen &
进程执行后,我还是担心,万一进程挂掉了怎么办,岂不是邮件服务不能用了,因此我还想再建立一个定时任务,比如每10分钟执行一次,判断进程还在不在,如果不在了就启动一个,这需要一个shell脚本
#!/bin/sh ps -fe|grep worker/listen |grep -v grep if [ $? -ne 0 ] then nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen & echo "重新启动了1个worker/listen任务....." else echo "监听程序正在运行呢,别担心....." fi
还没有完,到这里还是有问题,就是邮件发送的太慢了,想到的一个办法就是多开几个任务,比如开3个
#!/bin/sh ps -fe|grep worker/listen |grep -v grep if [ $? -ne 0 ] then nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen & nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen & nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen & echo "启动了3个worker/listen任务....." else echo "监听程序正则进行....." fi
很棒,邮件发送速度真的快了很多,很开心,再测试一下吧,我发邮件用的是腾讯企业邮箱,建立了一个发送任务,看看发1000封邮件需要多长时间,当我开心的看着收件箱渐渐增多的时候,突然收不到邮件了....
原来腾讯邮件有发送量和频率的限制,发送量:每天整个公司对外只能发送 购买人数*50封,发送频率没有查到....
到这里真的觉得做一件事真的是太难了,跨过重重险阻,结果....看了下自己搭建邮件发送服务器,有点麻烦,本人后台功力不够,只能先选择使用腾讯邮件进行发送了,只能降低发送频率了,折腾了一圈发现,当初用队列就是为了加快发送频率,结果现在又不能随心所欲使用,有些遗憾,不过也学到了一些知识。
就是这样,一次踩了无数坑的学习经历。
点赞(1)