YII消息队列shmilyzxt/yii2-queue实践全记录,记录那些踩过的坑

蛰伏已久 蛰伏已久 2018-11-26

官方介绍:https://packagist.org/packages/shmilyzxt/yii2-queue

安装

命令行执行

php composer.phar require --prefer-dist shmilyzxt/yii2-queue "dev-master"

第一次下载失败了,更换国内镜像

 composer config -g repositories.packagist composer http://packagist.phpcomposer.com

这次提示让输入一个token,点击最后一个连接,可以登录github生成token,生成后复制粘贴过来回车即可

#错误提示
Could not fetch https://api.github.com/repos/jquery/jquery-dist, 
please create a GitHub OAuth token to go over the API rate limit
Head to https://github.com/settings/tokens/new?scopes=repo&description=Composer+on+bogon+2018-11-26+0241

这次安装成功了,可以在项目vender目录下看到多了一个文件夹shmilyzxt


安装数据库

介绍文档里面没有看到数据库的字段, 于是查看源代码,果然有sql文件,在shmilyzxt/yii2-queue/jobs和shmilyzxt/yii2-queue/faild分别发现了一个sql,执行即可。

消息队列数据库

jobs.sql

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for jobs
-- ----------------------------
DROP TABLE IF EXISTS `jobs`;
CREATE TABLE `jobs` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(50) DEFAULT NULL,
  `payload` text,
  `attempts` int(5) NOT NULL DEFAULT '0',
  `reserved` tinyint(1) NOT NULL DEFAULT '0',
  `reserved_at` int(10) DEFAULT NULL,
  `available_at` int(10) DEFAULT NULL,
  `created_at` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

存储失败日志的数据库failed_jobs.sql

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for failed_jobs
-- ----------------------------
DROP TABLE IF EXISTS `failed_jobs`;
CREATE TABLE `failed_jobs` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `connector` varchar(255) DEFAULT NULL,
  `queue` varchar(255) DEFAULT NULL,
  `payload` text,
  `failed_at` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;

配置

我准备采用数据库存储消息队列,如果你使用redis,配置见其官网

在common/config/main.php中,为components数组增加下列配置

'queue' => [
            'class' => 'shmilyzxt\queue\queues\DatabaseQueue', //队列使用的类
            'jobEvent' => [ //队列任务事件配置,目前任务支持2个事件
                'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
                'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
            ],
            'connector' => [//队列中间件链接器配置(这是因为使用数据库,所以使用yii\db\Connection作为数据库链接实例)
                'class' => 'yii\db\Connection',
                'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                'username' => 'root',
                'password' => '',
                'charset' => 'utf8',
            ],
            'table' => 'jobs', //存储队列数据表名
            'queue' => 'default', //队列的名称
            'expire' => 60, //任务过期时间
            'maxJob' =>0, //队列允许最大任务数,0为不限制
            'failed' => [//任务失败日志记录(目前只支持记录到数据库)
                'logFail' => true, //开启任务失败处理
                'provider' => [ //任务失败处理类
                    'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider',
                    'db' => [ //数据库链接
                        'class' => 'yii\db\Connection',
                        'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                        'username' => 'root',
                        'password' => '',
                        'charset' => 'utf8',
                    ],
                    'table' => 'failed_jobs' //存储失败日志的表名
                ],
            ],
        ]

更新:这块可以后面还要改,继续往下看吧,这么配置真正实施不行

新建队列处理handler

队列需要有处理的handler,比如处理邮件发送的,处理短信发送的,新建自己的队列处理handler,继承shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle和失败处理方法failed,一个发邮件的jobhandler类似:

class SendEmail extends \shmilyzxt\queue\base\JobHandler{

    public function handle($job,$data){
        if($job->getAttempts() > 3){
            $this->failed($job);
        }

        $payload = $job->getPayload();

        
        //$payload即任务的数据,你拿到任务数据后就可以执行发邮件了
        //TODO 发邮件
    }

    public function failed($job,$data){
        die("发了3次都失败了,算了");
    }
}

任务入队列

任务入队列提供两个方法: 

  • \Yii::$app->queue->pushOn($hander,$data,$queue='default') 即时任务入队列:这样的任务入队列后,如果队列监听在运行,那么任务会立刻进入ready状态,可以被监听进程执行。 该方法有3个参数,第一个为任务处理handler,第二个为任务数据,第三个为队列名称,默认为 default。

  • \Yii::$app->queue->laterOn($delay,$handler,$data,$queue='default') 延时任务入队列:这样的任务入队列后不会立刻被队列监听进程之行,需要等待 $delay秒后任务才就绪。

目前支持的handler有: 

  • 1,新建自己的队列处理handler,继承、shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle()和失败处理方法failed()。

  •  2, 一个php闭包,形如 function($job,$data){}

\Yii::$app->queue->pushOn(new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');
\Yii::$app->queue->pushOn(function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');

\Yii::$app->queue->laterOn(120,new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');
\Yii::$app->queue->pushOn(120,function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');

任务入队执行成功,会在jobs表中添加一条数据

监听任务

这里我遇到了很多问题,先看官网说法

启动后台队列监听进程,对任务进行处理,您可以使用yii console app来启动,你也可以使用更高级的如swoole来高效的运行队列监听, 目前提供了一个Worker类,在控制台程序使用Worker::listen(Queue $queue,$queueName='default',$attempt=10,$memory=512,$sleep=3,$delay=0)可以 启动队列监听进程,其中 $attempt是任务尝试次数,$memory是允许使用最大内存,$sleep表示每次尝试从队列中获取任务的间隔时间,$delay代表把任务重新加入队列 时是否延时(0代表不延时),一个标准yii console app 启动队列监听进程代码如下;

class WorkerController extends \yii\console\Controller
{
    public function actionListen($queueName='default',$attempt=10,$memeory=128,$sleep=3 ,$delay=0){
        Worker::listen(\Yii::$app->queue,$queueName,$attempt,$memeory,$sleep,$delay);
    }
}
yii worker/listen default 10 128 3 0

很简单是不是,可是YII的命令行启动我始终也是运行不起来啊,首先根本没有yii这个命令,如果你对yii命令行熟悉,那我恭喜你,如果不熟悉,呵呵,估计现在就搞不下去了,我足足搞了一下午啊。

没有yii命令,我们现在项目的根目录下创建一个文件,名为yii

#!/usr/bin/php

<?php
/**
 * Yii console bootstrap file.
 *
 * @link http://www.yiiframework.com/
 * @copyright Copyright (c) 2008 Yii Software LLC
 * @license http://www.yiiframework.com/license/
 */

defined('YII_DEBUG') or define('YII_DEBUG', true);
defined('YII_ENV') or define('YII_ENV', 'dev');


require(__DIR__ . '/common/config/constant.php');    //引入了我定义的一些常量,根据你的需要看是否引入

require(__DIR__ . '/vendor/autoload.php');           //必须的
require(__DIR__ . '/vendor/yiisoft/yii2/Yii.php');   //必须的


$config = require(__DIR__ . '/commands/console.php');    //这个是新增的文件
$application = new yii\console\Application($config);

require(__DIR__ . '/common/helpers/SendEmailMessage.php');    //引入发送邮件的类,根据你的情况引入,如果不引入,后面不能使用这个类
require(__DIR__ . '/common/helpers/EmailHelper.php');         //引入发送邮件的类,根据你的情况引入,如果不引入,后面不能使用这个类

$exitCode = $application->run();
exit($exitCode);

其中/commands/console.php是我创建的一个新文件,用来存放队列配置的,文章第二部分“配置”实际那样做是不行的

项目下新增目录结构:

+backend
   ...原后台代码
+frontend
   ...原前端代码
+commands    //新增的文件夹
   console.php  //新增的文件,队列配置的
   WorkerController.php  //新增的文件,用来监听消息队列的

/commands/console.php 内容如下

<?php

return [
    'id' => 'app-commands',
    'basePath' =>dirname(__DIR__),
    'controllerNamespace' => 'app\commands',
    'components' => [
        'db' => [
            'class' => 'yii\db\Connection',
            'dsn' => 'mysql:host=127.0.0.1;dbname=***',
            'username' => 'root',
            'password' => '****',
            'charset' => 'utf8',

            'tablePrefix' => '',
            'emulatePrepare' => false,
            'attributes' => [
                PDO::ATTR_STRINGIFY_FETCHES => false,
            ],
            'enableSchemaCache' => false,
            'schemaCacheDuration' => 3600,
        ],
        'mailer' => [
            'class' => 'yii\swiftmailer\Mailer',
            'useFileTransport' => false,
            'transport' => [
                'class' => 'Swift_SmtpTransport',
                'host' => EMAIL_SERVICE,
                'username' => EMAIL_USER,
                'password' => EMAIL_PWD,
                'port' => EMAIL_GR_PORT,
                'encryption' => 'SSL',
            ],

            'messageConfig'=>[
                'charset'=>'UTF-8',
                'from'=>[EMAIL_USER=>EMAIL_USER_NAME]
            ],
        ],
        
        'queue' => [
            'class' => 'shmilyzxt\queue\queues\DatabaseQueue', //队列使用的类
            'jobEvent' => [ //队列任务事件配置,目前任务支持2个事件
//                'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
//                'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
            ],
            'connector' => [//队列中间件链接器配置(这是因为使用数据库,所以使用yii\db\Connection作为数据库链接实例)
                'class' => 'yii\db\Connection',
                'dsn' => 'mysql:host=127.0.0.1;dbname=***',
                'username' => 'root',
                'password' => '****',
                'charset' => 'utf8',
            ],
            'table' => 'jobs', //存储队列数据表名
            'queue' => 'email', //队列的名称
            'expire' => 300, //任务过期时间
            'maxJob' =>0, //队列允许最大任务数,0为不限制
            'failed' => [//任务失败日志记录(目前只支持记录到数据库)
                'logFail' => true, //开启任务失败处理
                'provider' => [ //任务失败处理类
                    'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider',
                    'db' => [ //数据库链接
                        'class' => 'yii\db\Connection',
                        'dsn' => 'mysql:host=127.0.0.1;dbname=***',
                        'username' => 'root',
                        'password' => '**',
                        'charset' => 'utf8',
                    ],
                    'table' => 'jobs_failed' //存储失败日志的表名
                ],
            ],
        ]
    ],

];

在components下要配置db信息,为了链接数据库的,还要配置mailer,为了发送邮件的,最后还有queue的配置

注意一点:数据库不要配置成localhost,要采用127.0.0.1,否则可能链接不上,坑~,因为这两种不一样,一个是通过socket链接数据库,一个是通过网络链接,改成127.0.0.1后可能你还是连接不上数据库,如果你用的是MAMP,修改MySQL的配置,勾选Allow network to Mysql    from other computer

/commands/WorkerController.php 内容如下

<?php

namespace app\commands;

use yii\console\Controller;
use shmilyzxt\queue\Worker;

class WorkerController extends Controller{

    public function actionListen($queueName='email',$attempt=3,$memeory=128,$sleep=3 ,$delay=0){
        Worker::listen(\Yii::$app->queue,$queueName,$attempt,$memeory,$sleep,$delay);
    }
}

配置完成之后在目录下输入命令 ./yii ,看看是否看到 “worker/listen”命令,如果有就可以进行监听了,如果没有,检查一下上面是否和我的一致,注意命名空间和文件夹要一致,这个问题我也足足搞了很久,坑

./yii worker/listen

测试

好了,现在可以测试了,想办法通过接口创建一个任务,看看能不能发邮件,我在后台搞了一个接口,浏览器中输入地址访问一次添加一次任务

<?php

namespace backend\controllers\systems;
use app\command\WorkerController;
use backend\controllers\BaseController;
use common\helpers\SendEmailMessage;


class TestController extends BaseController{
    public function actionAddSendEmailQueue(){
        $sendEmail=new SendEmailMessage();
        \Yii::$app->queue->pushOn($sendEmail,['email'=>'501351981@qq.com','title'=>'test','content'=>'email test'],'email');
        echo "添加成功了";
    }


}

我遇到的问题,刚开始发送邮件是可以的,但是命令运行一段时间之后,就收不到邮件了,通过try 发现错误是

YII Swift_TransportException: Expected response code 250 but got code "", message'',

可能的错误信息2

exception 'yii\base\ErrorException' with message 'fwrite(): SSL: Broken pipe' in  

vendor/swiftmailer/swiftmailer/lib/classes/Swift/Transport/StreamBuffer.php:232


起初我猜想是php运行超时了,修改php的超时时间,无效,最后改了这个好了,添加红色部分代码

修改vendor/swiftmailer/swiftmailer/lib/classes/Swift/Mailer.php:90

public function send(Swift_Mime_Message $message, &$failedRecipients = null)

    {

        $failedRecipients = (array) $failedRecipients;


        if (!$this->_transport->isStarted()) {

            $this->_transport->start();

        }


        $sent = 0;


        try {

            $sent = $this->_transport->send($message, $failedRecipients);

        } catch (Swift_RfcComplianceException $e) {

            foreach ($message->getTo() as $address => $name) {

                $failedRecipients[] = $address;

            }

        } catch (\Exception $exception) {

            $this->_transport->stop();

            sleep(10);

            throw $exception;

        } finally {

            $this->_transport->stop();

            sleep(1);

        }


        return $sent;

    }

终于可以正常运行了,至此告一段落,真是一次艰难的旅途。。。。。。。

后记

正式使用我们还有很多问题要解决

首先是如何在后台运行命令:

    当我们在命令行启动 worker/listen进程后,关闭命令行,进程也跟着结束了,这肯定不行的,要改为在后台一直运行,可以采用“nohup 命令所在目录 命令 &”来后台执行命令

nohup   /Applications/XAMPP/xamppfiles/htdocs/***/yii worker/listen  &

进程执行后,我还是担心,万一进程挂掉了怎么办,岂不是邮件服务不能用了,因此我还想再建立一个定时任务,比如每10分钟执行一次,判断进程还在不在,如果不在了就启动一个,这需要一个shell脚本

#!/bin/sh
ps -fe|grep worker/listen |grep -v grep
if [ $? -ne 0 ]
then

nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen &

echo "重新启动了1个worker/listen任务....."
else
echo "监听程序正在运行呢,别担心....."
fi

还没有完,到这里还是有问题,就是邮件发送的太慢了,想到的一个办法就是多开几个任务,比如开3个

#!/bin/sh
ps -fe|grep worker/listen |grep -v grep
if [ $? -ne 0 ]
then
nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen &
nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen &
nohup /Applications/XAMPP/xamppfiles/htdocs/www_beta/yii worker/listen &
echo "启动了3个worker/listen任务....."
else
echo "监听程序正则进行....."
fi

很棒,邮件发送速度真的快了很多,很开心,再测试一下吧,我发邮件用的是腾讯企业邮箱,建立了一个发送任务,看看发1000封邮件需要多长时间,当我开心的看着收件箱渐渐增多的时候,突然收不到邮件了....

原来腾讯邮件有发送量和频率的限制,发送量:每天整个公司对外只能发送 购买人数*50封,发送频率没有查到....

到这里真的觉得做一件事真的是太难了,跨过重重险阻,结果....看了下自己搭建邮件发送服务器,有点麻烦,本人后台功力不够,只能先选择使用腾讯邮件进行发送了,只能降低发送频率了,折腾了一圈发现,当初用队列就是为了加快发送频率,结果现在又不能随心所欲使用,有些遗憾,不过也学到了一些知识。

就是这样,一次踩了无数坑的学习经历。



分享到

点赞(1)