egg.js

NodeJS初探 - egg.js

简单用egg.js搭建一个消息系统,可实现异步消息队列、延迟消息队列
以及替代corntab的定时任务

1、egg.js

  • 1.1、定时任务
1
2
3
4
5
6
7
8
9
10
cron-parser 支持可选的秒(linux crontab 不支持)
* * * * * *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ |
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
│ │ │ │ └───── month (1 - 12)
│ │ │ └────────── day of month (1 - 31)
│ │ └─────────────── hour (0 - 23)
│ └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
module.exports = {
schedule: {
// cron: '0 0 */3 * * *', // corntab风格的定时任务

interval: '1m', // 1 分钟间隔
type: 'all', // all-指定所有的worker都需要执行 worker-随机某个worker执行
},
async task(ctx) {
const res = await ctx.curl('http://www.api.com/cache', {
dataType: 'json',
});
ctx.app.cache = res.data;
},
};
  • 1.2、自定义定时任务

在 agent.js 中继承 agent.ScheduleStrategy,然后通过 agent.schedule.use() 注册即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
const config = { redis: agent.redis };
Object.assign(config, agent.config.beeQueue, { isWorker: true });
// 创建异步队列对象
agent.redisQueue = new Queue('queue', config);

// bee-queue异步队列
class QueueStrategy extends agent.ScheduleStrategy {
start() {
agent.redisQueue.process(async job => this.sendOne(job.data));
}
}
// 输出自定义定时任务
agent.schedule.use('queue', QueueStrategy);

ScheduleStrategy 基类提供了:

  • schedule - 定时任务的属性,disable 是默认统一支持的,其他配置可以自行解析。
  • this.sendOne(…args) - 随机通知一个 worker 执行 task,args 会传递给 subscribe(…args) 或 task(ctx, …args)。
  • `this.sendAll(…args) - 通知所有的 worker 执行 task

2、bee-queue

采用redis作为驱动的消息队列中间件,支持异步队列、延迟队列

  • 2.1、创建队列实例

    创建队列需要一个beeQueue的实例,为了避免redis连接数爆炸,所以这里要用到单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// app.js
const config = {
// 任务名前缀
prefix: 'monitor',
// 心跳时间间隔 ms
stallInterval: 5000,
// 如果所有延迟的作业都在该时间之外,队列将再次检查经过该时间后是否没有错过任何作业 ms
nearTermWindow: 1200000,
// 任务允许延迟的时间 ms
delayedDebounce: 1000,
// 此队列是否要作业
isWorker: false,
// 接收作业事件
getEvents: false,
// 发送作业事件
sendEvents: false,
// 启用延迟作业
activateDelayedJobs: true,
// 自动删除成功的作业
removeOnSuccess: false,
// 自动删除失败的作业
removeOnFailure: false,
// 最大查询作业数量
redisScanCount: 100,
// redis实例
redis: app.redis
};
// 创建异步队列实例
this.app.redisQueue = new Queue('queue', config);
  • 2.2、队列入队
1
2
3
4
5
6
7
8
9
const job = this.app.redisQueue.createJob({name: 'test', data: {}});

// 延迟时间 (ms)
job.delayUntil(delay)

// 保存队列
job.save().then((job) => {
// job enqueued, job.id populated
});

3、实现异步队列

  • 3.1、定义一个控制器用于接收data创建异步队列
  • 3.2、在agent.js中自定义定时任务
  • 3.3、根据队列name创建schedule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// queue.js
'use strict';

/**
* bee-queue异步队列消费
* User: Srako
* Date: 2020/05/12 18:10
* @email srakor@163.com
* @link http://srako.github.io
*/

const moment = require('moment');

module.exports = app => {
return {
schedule: {
type: 'queue',
},

async task(ctx, args) {
if (!args.hasOwnProperty('name')) {
ctx.logger.error('异步队列名称不存在:', args.name);
return;
}
// 服务名称
const serviceName = ctx.helper.toCamel(args.name);

// 不存在处理服务
if (!ctx.service.hasOwnProperty(serviceName)) {
ctx.logger.error('异步队列不存在消费者:', args.name);
return;
}

try {
const result = await ctx.service[serviceName].consume(args.data);

if (result !== true) {
ctx.logger.error('异步队列消费失败:', args, ':', result);
return;
}
ctx.logger.info('异步队列消费成功:', args);
} catch (e) {
ctx.logger.error('异步队列消费失败:catch:', args, ':', e.res);

// 失败任务,两分钟后重试
const job = app.redisQueue.createJob({ name: args.name, data: args.data });
job.delayUntil(moment()
.add(2, 'm')
.format('x'))
.save()
.then(job => {
ctx.logger.info('异步队列重新创建成功:', job.data);
});
}
},
};
};
  • 3.4、根据name创建对应service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// queueService.js
'use strict';

/**
* 企业微信消息
* User: Srako
* Date: 2020/09/08 18:27
* @email srakor@163.com
* @link http://srako.github.io
*/

const Service = require('egg').Service;

class templateMessageService extends Service {

/**
* @param {object} data 对象
* @param {int} data.user_id 后台admin_id
* @param {string} data.scene 模版场景
* @return {Promise<*>} promise
*/
async consume(data) {
const { ctx, app } = this;

// 调用对应接口消费队列
const result = await ctx.curl(app.config.apiUrl + 'work/send_message', {
method: 'POST',
dataType: 'json',
data,
});
return result.status === 200 ? true : result;
}
}

module.exports = templateMessageService;