为什么要搭建异步任务中心
执行耗时任务
作为web开发者,经常会遇到从用户操作开始到服务响应结束耗时较长的功能,例如导出导入大批量的Excel数据,给成千上万的用户发送邮件或信息,文件打包下载等等,如果采用同步的方式,响应时间太久,服务端或客户端(浏览器)连接会断开,导致任务中止,数据不一致,影响用户体验,采用异步的方式可以避免这个问题。
统一管理异步任务,方便维护与开发
将异步任务统一管理,有利于统一做异常处理,方便将代码进行组件化,提高开发效率。
分担应用服务器的压力
异步任务中心可以独立部署到另外一台服务器,与应用服务器分开,将耗时耗资源的任务从应用服务器剥离,减轻应用服务器压力。
如何搭建异步任务中心
流程
用户操作发起任务请求,应用服务接收到任务之后投递到异步任务中心,异步任务中心异步处理完任务之后,将结果反馈到应用服务,用户可以通过任务列表页面查看任务执行的结果。
用到的技术和工具
- thinkphp5.1,用于编写接口与业务逻辑。
- think-queue2.0+redis,用于创建消息队列执行异步任务。
- supervisor,对think-queue创建的队列进程进行管理。
- MySQL,用于保存任务记录。
步骤(以Excel导出任务为例)
创建MySQL异步任务表
CREATE TABLE `async_task` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '用户ID(cor_admin_user表user_id)',
`name` varchar(100) NOT NULL DEFAULT '' COMMENT '任务名称',
`task_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务类型:0文件打包,1云盘打包下载,2Excel导出,3Excel导入',
`identification` varchar(60) NOT NULL DEFAULT '' COMMENT '任务标识:详见AsyncTaskEnum文件',
`task_params` text COMMENT '任务参数:例如导出列表的查询条件等等',
`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态:0进行中,1完成,2失败',
`out_file_name` varchar(120) NOT NULL DEFAULT '' COMMENT '输出文件名称',
`download_url` varchar(255) NOT NULL DEFAULT '' COMMENT '下载路径',
`exception_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '异常信息',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '最后更新时间',
`agency_user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '经销商的用户ID',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1810 DEFAULT CHARSET=utf8 COMMENT='异步任务表'
;
定义Excel导出接口,后面需要导出的Excel服务类可以实现这个接口进行导出
<?php
namespace app\contract\excel;
use think\queue\Job;
interface ExportTaskInterface
{
/**
* 开始导出任务
* @param array $params 创建任务的参数
* @return mixed
*/
public function startExportTask(array $params) : array;
/**
* 导出Excel
* @param array $params
* @return mixed
*/
public function doExportTask(array $params) : array;
/**
* 任务失败时的处理方法
* @param \Exception $e
* @param array $params
* @param Job $job
* @return mixed
*/
public function exportTaskFailedHandle(\Exception $e, array $params, Job $job) : array;
/**
* 任务完成时的处理方法
* @param array $exportResult
* @param Job $job
* @return mixed
*/
public function taskFinishHandle(array $exportResult, Job $job) : array;
}
定义Excel导出Job
,这个Job
就会调用实现了ExportTaskInterface
接口服务类相关的导出方法
<?php
namespace app\job\jobs;
use app\common\MyLog;
use app\contract\excel\ExportTaskInterface;
use app\library\exception\ServiceLogicException;
use think\queue\Job;
class ExportExcelJob
{
public function fire(Job $job, $data)
{
if (!isset($data['handler'])){
$job->delete();
throw new ServiceLogicException('缺少处理类');
}
try {
$this->handleJob(new $data['handler'], $data, $job);
}catch (\Exception $e){
var_dump($e->getFile() . $e->getLine() . $e->getMessage());
MyLog::errorLog('error', $e->getFile() . $e->getLine() . $e->getMessage(), 'export_excel_task');
$job->delete();
}
}
protected function handleJob(ExportTaskInterface $exportTask, $data, Job $job)
{
MyLog::infoLog('export_excel_task', '接收到Excel导出任务:' . json_encode($data), 'export_excel_task');
try {
$result = $exportTask->doExportTask($data);
$exportTask->taskFinishHandle($result, $job);
MyLog::infoLog('export_excel_task_res', '执行成功:' . json_encode($result), 'export_excel_task');
}catch (\Exception $e){
MyLog::infoLog('export_excel_task_err', $e->getFile() . $e->getLine() . $e->getMessage(), 'export_excel_task');
$exportTask->exportTaskFailedHandle($e, $data, $job);
}
}
public function fail(Job $job, $data)
{
$job->delete();
}
}
编写业务逻辑类,实现ExportTaskInterface
接口,并补充导出相关的方法
<?php
namespace app\admin\service;
use app\admin\model\UserInfo;
use app\common\MyLog;
use app\models\Order;
use app\contract\excel\ExportTaskInterface;
use app\library\enum\AsyncTaskEnum;
use app\library\exception\ServiceLogicException;
use app\library\traits\QueryBuilder;
use app\models\AsyncTask;
use app\service\common\XlsWriterService;
use app\service\QueueService;
use think\queue\Job;
use third_library\LibTime;
class FirstOrderCustomerService extends BaseService implements ExportTaskInterface
{
protected $mapField = 'firstOrderCustomerList';
public function index($params)
{
$returnType = $params['return_type'] ?? 'list';
if ($returnType == 'export'){
return $this->startExportTask($params);
}
$query = $this->getQuery($params);
return $query->paginate($params['per_page'], null, ['page' => $params['page']])
->toArray();
}
protected function getQuery($params)
{
$query = Order::alias('o')
->leftJoin(UserInfo::getTable() . ' ui', 'o.user_id = ui.user_id')
->where('o.is_first_order', 1)
->append([
'order_time',
])->field([
'ui.company_name',
'o.add_time',
'o.goods_amount',
'o.order_sn',
'ui.salesman',
'ui.service_name'
]);
$this->buildQuery(function ($field, $size, $keyword, $searchType) use ($query){
if ($searchType == 2){ // 下单时间
if ($size == '='){
$startTime = $keyword . ' ' . '00:00:00';
$endTime = $keyword . ' ' . '23:59:59';
$startTime = LibTime::getInstance()->local_strtotime($startTime);
$endTime = LibTime::getInstance()->local_strtotime($endTime);
$query->where($field, '>=', $startTime);
$query->where($field, '<=', $endTime);
}else{
$keyword = LibTime::getInstance()->local_strtotime($keyword);
$query->where($field, $size, $keyword);
}
}else{
$query->where($field, $size, $keyword);
}
}, $params);
return $query;
}
public function startExportTask(array $params): array
{
$order = Order::where('is_first_order', 1)->find();
if (empty($order)){
throw new ServiceLogicException('没有可导出的数据');
}
$name = '今日下单客户' . date("YmdHis");
$data = [
'user_id' => $params['admin_id'],
'name' => $name,
'task_type' => AsyncTaskEnum::TASK_TYPE_EXCEL_EXPORT,
'identification' => AsyncTaskEnum::IDENTIFICATION_EXCEL_EXPORT,
'task_params' => json_encode($params),
'out_file_name' => $name . '.xlsx',
];
$taskId = AsyncTask::createTask($data);
$params['task_id'] = $taskId;
$params['handler'] = self::class;
QueueService::push('excel_export', $params);
return ['msg' => '已创建导出任务,请在下载中心查看', 'task_id' => $taskId];
}
public function doExportTask(array $params): array
{
$query = $this->getQuery($params);
$now = time();
$filename = "今日下单客户{$params['admin_id']}_{$now}.xlsx";
$field = [
'company_name' => ['name' =>'订单编号'],
'order_time' => ['name' =>'首次下单时间'],
'goods_amount' => ['name' =>'首次下单金额'],
'order_sn' => ['name' =>'订单号'],
'salesman' => ['name' =>'现销售对接人'],
'service_name' => ['name' =>'内勤名称']
];
$fileObj = XlsWriterService::getInstance()
->constMemory($filename, null, false)
->field($field);
$page = 1;
while (true){
$orders = $query->page($page, 500)
->order('order_id', 'desc')
->select()
->toArray();
if (empty($orders)) break;
$items = [];
foreach ($orders as $val) {
$item = [];
foreach ($field as $key => $value) {
$item[$key] = $val[$key] ?? '';
}
$items[] = array_values($item);
}
!empty($items) && $fileObj->data($items);
$page++;
}
$filePath = $fileObj->output();
return ['file_path' => $filePath, 'task_id' => $params['task_id']];
}
public function exportTaskFailedHandle(\Exception $e, array $params, Job $job): array
{
AsyncTask::where('id', $params['task_id'])->update([
'status' => AsyncTaskEnum::TASK_STATUS_FAIL,
'exception_msg' => $e->getMessage()
]);
$job->delete();
return [];
}
public function taskFinishHandle(array $exportResult, Job $job): array
{
AsyncTask::where('id', $exportResult['task_id'])->update([
'status' => AsyncTaskEnum::TASK_STATUS_FINISH,
'download_url' => $exportResult['file_path']
]);
$job->delete();
return [];
}
}
为了能让异步任务顺利的运行,需要对startExportTask
、doExportTask
、exportTaskFailedHandle
,taskFinishHandle
四个方法补充相应的逻辑
startExportTask
,定义导出文件的名称,创建导出任务保存到数据库,并将任务投递到ExportExcelJob
执行
doExportTask
,获取导出的数据,整理数据格式,调用xlswrite
插件导出Excel文件
exportTaskFailedHandle
,任务失败后,更改任务的状态为失败,并保存失败的原因
taskFinishHandle
,任务成功后,更改状态为成功,并保存下载链接
这四个方法对需要导出Excel的服务都适用,可以封装成trait
,这样就不用在每个业务逻辑类重复写这四个方法,在后面的文章再详细讲讲。
启动异步任务队列,在项目的根目录执行下面命令,也可以使用绝对路径执行,启动成功后,用户进行导出操作,接口把任务投递到当前队列,就可以执行导出任务了
php think queue:work --daemon --queue excel_export
使用supervisor
对任务队列进程进行管理,以centos
为例
安装教程可以在网上参考,安装完之后在/etc/supervisord.d
文件夹创建supervisor.ini
文件,并写入以下内容,就可以使用supervisor进行管理了。
;导出Excel
[program:excel_export_task]
command=/www/server/php/73/bin/php /www/wwwroot/devtpro/src/cron/think queue:work --daemon --queue excel_export
directory=/www/wwwroot/devtpro/src/cron
process_name=%(process_num)02d
numprocs=1
autostart=true
autorestart=true
startsecs=1
startretries=20
redirect_stderr=true
user=devops
最后,编写接口查询async_task
表记录,展示到前端页面让用户进行下载,至此,就完成了Excel异步导出功能的开发
总结
其它的异步任务与上面的Excel导出任务大同小异,参考上面的例子增加相应的服务即可。