标签搜索

使用thinkphp、think-queue、redis、supervisor搭建异步任务中心

basil
2023-06-29 / 187 阅读

为什么要搭建异步任务中心

执行耗时任务

作为web开发者,经常会遇到从用户操作开始到服务响应结束耗时较长的功能,例如导出导入大批量的Excel数据,给成千上万的用户发送邮件或信息,文件打包下载等等,如果采用同步的方式,响应时间太久,服务端或客户端(浏览器)连接会断开,导致任务中止,数据不一致,影响用户体验,采用异步的方式可以避免这个问题。

统一管理异步任务,方便维护与开发

将异步任务统一管理,有利于统一做异常处理,方便将代码进行组件化,提高开发效率。

分担应用服务器的压力

异步任务中心可以独立部署到另外一台服务器,与应用服务器分开,将耗时耗资源的任务从应用服务器剥离,减轻应用服务器压力。

如何搭建异步任务中心

流程

异步任务中心时序图
用户操作发起任务请求,应用服务接收到任务之后投递到异步任务中心,异步任务中心异步处理完任务之后,将结果反馈到应用服务,用户可以通过任务列表页面查看任务执行的结果。

用到的技术和工具

  • thinkphp5.1,用于编写接口与业务逻辑。
  • think-queue2.0+redis,用于创建消息队列执行异步任务。
  • supervisor,对think-queue创建的队列进程进行管理。
  • MySQL,用于保存任务记录。

步骤(以Excel导出任务为例)

创建MySQL异步任务表
CREATE TABLE `async_task` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
  `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '用户ID(cor_admin_user表user_id)',
  `name` varchar(100) NOT NULL DEFAULT '' COMMENT '任务名称',
  `task_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务类型:0文件打包,1云盘打包下载,2Excel导出,3Excel导入',
  `identification` varchar(60) NOT NULL DEFAULT '' COMMENT '任务标识:详见AsyncTaskEnum文件',
  `task_params` text COMMENT '任务参数:例如导出列表的查询条件等等',
  `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态:0进行中,1完成,2失败',
  `out_file_name` varchar(120) NOT NULL DEFAULT '' COMMENT '输出文件名称',
  `download_url` varchar(255) NOT NULL DEFAULT '' COMMENT '下载路径',
  `exception_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '异常信息',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '最后更新时间',
  `agency_user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '经销商的用户ID',
  PRIMARY KEY (`id`),
  KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1810 DEFAULT CHARSET=utf8 COMMENT='异步任务表'
;
定义Excel导出接口,后面需要导出的Excel服务类可以实现这个接口进行导出
<?php


namespace app\contract\excel;


use think\queue\Job;

interface ExportTaskInterface
{
    /**
     * 开始导出任务
     * @param array $params 创建任务的参数
     * @return mixed
     */
    public function startExportTask(array $params) : array;

    /**
     * 导出Excel
     * @param array $params
     * @return mixed
     */
    public function doExportTask(array $params) : array;

    /**
     * 任务失败时的处理方法
     * @param \Exception $e
     * @param array $params
     * @param Job $job
     * @return mixed
     */
    public function exportTaskFailedHandle(\Exception $e, array $params, Job $job) : array;

    /**
     * 任务完成时的处理方法
     * @param array $exportResult
     * @param Job $job
     * @return mixed
     */
    public function taskFinishHandle(array $exportResult, Job $job) : array;
}
定义Excel导出Job,这个Job就会调用实现了ExportTaskInterface接口服务类相关的导出方法
<?php


namespace app\job\jobs;


use app\common\MyLog;
use app\contract\excel\ExportTaskInterface;
use app\library\exception\ServiceLogicException;
use think\queue\Job;

class ExportExcelJob
{
    public function fire(Job $job, $data)
    {
        if (!isset($data['handler'])){
            $job->delete();
            throw new ServiceLogicException('缺少处理类');
        }
        try {
            $this->handleJob(new $data['handler'], $data, $job);
        }catch (\Exception $e){
            var_dump($e->getFile() . $e->getLine() . $e->getMessage());
            MyLog::errorLog('error', $e->getFile() . $e->getLine() . $e->getMessage(), 'export_excel_task');
            $job->delete();
        }

    }

    protected function handleJob(ExportTaskInterface $exportTask, $data, Job $job)
    {
        MyLog::infoLog('export_excel_task', '接收到Excel导出任务:' . json_encode($data), 'export_excel_task');
        try {
            $result = $exportTask->doExportTask($data);
            $exportTask->taskFinishHandle($result, $job);
            MyLog::infoLog('export_excel_task_res', '执行成功:' . json_encode($result), 'export_excel_task');
        }catch (\Exception $e){
            MyLog::infoLog('export_excel_task_err', $e->getFile() . $e->getLine() . $e->getMessage(), 'export_excel_task');

            $exportTask->exportTaskFailedHandle($e, $data, $job);
        }
    }

    public function fail(Job $job, $data)
    {
        $job->delete();
    }
}
编写业务逻辑类,实现ExportTaskInterface接口,并补充导出相关的方法
<?php


namespace app\admin\service;


use app\admin\model\UserInfo;
use app\common\MyLog;
use app\models\Order;
use app\contract\excel\ExportTaskInterface;
use app\library\enum\AsyncTaskEnum;
use app\library\exception\ServiceLogicException;
use app\library\traits\QueryBuilder;
use app\models\AsyncTask;
use app\service\common\XlsWriterService;
use app\service\QueueService;
use think\queue\Job;
use third_library\LibTime;

class FirstOrderCustomerService extends BaseService implements ExportTaskInterface
{
    protected $mapField = 'firstOrderCustomerList';

    public function index($params)
    {
        $returnType = $params['return_type'] ?? 'list';
        if ($returnType == 'export'){
            return $this->startExportTask($params);

        }
        $query = $this->getQuery($params);
        return $query->paginate($params['per_page'], null, ['page' => $params['page']])
            ->toArray();

    }

    protected function getQuery($params)
    {
        $query = Order::alias('o')
            ->leftJoin(UserInfo::getTable() . ' ui', 'o.user_id = ui.user_id')
            ->where('o.is_first_order', 1)
            ->append([
                'order_time',
            ])->field([
                'ui.company_name',
                'o.add_time',
                'o.goods_amount',
                'o.order_sn',
                'ui.salesman',
                'ui.service_name'
            ]);
        $this->buildQuery(function ($field, $size, $keyword, $searchType) use ($query){
            if ($searchType == 2){ // 下单时间
                if ($size == '='){
                    $startTime = $keyword . ' ' . '00:00:00';
                    $endTime = $keyword . ' ' . '23:59:59';
                    $startTime = LibTime::getInstance()->local_strtotime($startTime);
                    $endTime = LibTime::getInstance()->local_strtotime($endTime);
                    $query->where($field, '>=', $startTime);
                    $query->where($field, '<=', $endTime);
                }else{
                    $keyword = LibTime::getInstance()->local_strtotime($keyword);
                    $query->where($field, $size, $keyword);
                }
            }else{
                $query->where($field, $size, $keyword);
            }
        }, $params);
        return $query;
    }

    public function startExportTask(array $params): array
    {
        $order = Order::where('is_first_order', 1)->find();
        if (empty($order)){
            throw new ServiceLogicException('没有可导出的数据');
        }
        $name = '今日下单客户' . date("YmdHis");
        $data = [
            'user_id' => $params['admin_id'],
            'name' => $name,
            'task_type' => AsyncTaskEnum::TASK_TYPE_EXCEL_EXPORT,
            'identification' => AsyncTaskEnum::IDENTIFICATION_EXCEL_EXPORT,
            'task_params' => json_encode($params),
            'out_file_name' => $name . '.xlsx',
        ];
        $taskId = AsyncTask::createTask($data);
        $params['task_id'] = $taskId;
        $params['handler'] = self::class;
        QueueService::push('excel_export', $params);
        return ['msg' => '已创建导出任务,请在下载中心查看', 'task_id' => $taskId];
    }

    public function doExportTask(array $params): array
    {
        $query = $this->getQuery($params);
        $now = time();
        $filename = "今日下单客户{$params['admin_id']}_{$now}.xlsx";
        $field = [
            'company_name'              =>  ['name' =>'订单编号'],
            'order_time'                =>  ['name' =>'首次下单时间'],
            'goods_amount'             =>  ['name' =>'首次下单金额'],
            'order_sn'            =>  ['name' =>'订单号'],
            'salesman'          =>  ['name' =>'现销售对接人'],
            'service_name'            =>  ['name' =>'内勤名称']
        ];
        $fileObj = XlsWriterService::getInstance()
            ->constMemory($filename, null, false)
            ->field($field);
        $page = 1;
        while (true){
            $orders = $query->page($page, 500)
                ->order('order_id', 'desc')
                ->select()
                ->toArray();
            if (empty($orders)) break;
            $items = [];
            foreach ($orders as $val) {
                $item = [];
                foreach ($field as $key => $value) {
                    $item[$key] = $val[$key] ?? '';
                }
                $items[] = array_values($item);
            }
            !empty($items) && $fileObj->data($items);
            $page++;
        }
        $filePath = $fileObj->output();
        return ['file_path' => $filePath, 'task_id' => $params['task_id']];
    }

    public function exportTaskFailedHandle(\Exception $e, array $params, Job $job): array
    {
        AsyncTask::where('id', $params['task_id'])->update([
            'status' => AsyncTaskEnum::TASK_STATUS_FAIL,
            'exception_msg' => $e->getMessage()
        ]);
        $job->delete();
        return [];
    }

    public function taskFinishHandle(array $exportResult, Job $job): array
    {
        AsyncTask::where('id', $exportResult['task_id'])->update([
            'status' => AsyncTaskEnum::TASK_STATUS_FINISH,
            'download_url' => $exportResult['file_path']
        ]);
        $job->delete();
        return [];
    }
}

为了能让异步任务顺利的运行,需要对startExportTaskdoExportTaskexportTaskFailedHandletaskFinishHandle四个方法补充相应的逻辑

startExportTask,定义导出文件的名称,创建导出任务保存到数据库,并将任务投递到ExportExcelJob执行
doExportTask,获取导出的数据,整理数据格式,调用xlswrite插件导出Excel文件
exportTaskFailedHandle,任务失败后,更改任务的状态为失败,并保存失败的原因
taskFinishHandle,任务成功后,更改状态为成功,并保存下载链接

这四个方法对需要导出Excel的服务都适用,可以封装成trait,这样就不用在每个业务逻辑类重复写这四个方法,在后面的文章再详细讲讲。

启动异步任务队列,在项目的根目录执行下面命令,也可以使用绝对路径执行,启动成功后,用户进行导出操作,接口把任务投递到当前队列,就可以执行导出任务了
php think queue:work --daemon --queue excel_export
使用supervisor对任务队列进程进行管理,以centos为例

安装教程可以在网上参考,安装完之后在/etc/supervisord.d文件夹创建supervisor.ini文件,并写入以下内容,就可以使用supervisor进行管理了。

;导出Excel
[program:excel_export_task]
command=/www/server/php/73/bin/php /www/wwwroot/devtpro/src/cron/think queue:work --daemon --queue excel_export
directory=/www/wwwroot/devtpro/src/cron
process_name=%(process_num)02d
numprocs=1
autostart=true
autorestart=true
startsecs=1
startretries=20
redirect_stderr=true
user=devops
最后,编写接口查询async_task表记录,展示到前端页面让用户进行下载,至此,就完成了Excel异步导出功能的开发

总结

其它的异步任务与上面的Excel导出任务大同小异,参考上面的例子增加相应的服务即可。

0