kkbida - 开源消息投递中间件详细解析

项目简介

kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 开源地址

快速开始

从gitee拉取代码

git clone https://gitee.com/kekingcn/kkbida

准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息

使用maven构建

cd kkbida
mvn clean install -DskipTests

运行

cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar

浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin

功能介绍

kkbida目前提供首页统计图表历史任务待处理任务失败通知管理四个功能模块

  • 首页统计图表可查看消息总数、成功、失败数和最近一周的每天的统计数据
  • 历史任务提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能
  • 待处理任务提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能
  • 失败通知管理即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:配置

原理解析

系统架构详见:架构 的说明和架构图

下面从代码总体结构sdk任务队列处理webhook四个方面详细解析

1. 代码总体结构

从gitee拉取代码使用idea导入后,看到代码结构如下

项目分为三个模块kk-callcenter-mainkk-callcenter-sdkthird-party

  • kk-callcenter-main为主服务,向外提供dubbo和http接口服务,并提供web管理页面功能
  • kk-callcenter-sdk为提供给接入系统用的sdk包,通过sdk包能方便快速的向主服务发送消息任务,详细使用说明见 使用说明
  • third-partyklock(同为凯京科技开源的分布式锁组件,详见:https://gitee.com/kekingcn/spring-boot-klock-starter )依赖

2. sdk

如下图所示,kk-callcenter-sdk提供一个CallBackService接口和一个CallBackServiceHttpImpl http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask model类作为任务参数,并提供在任务失败的时候抛出的CallBackException异常类

使用详情见 获取callbackservice对象调用callbackservice发起回调

同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以

3. 任务队列处理

查看kk-callcenter-sdk模块下TaskService任务处理类源码TaskDueueHadler队列处理类源码

TaskService 类提供如下两个方法

/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
    delayedQueue.offer(task,delay,timeUnit);
}

/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
    task.setTaskId(UUID.randomUUID().toString());
    task.setCreateDate(new Timestamp(System.currentTimeMillis()));
    task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    blockingQueue.addAsync(task);
}

分别为添加延迟任务和添加即时任务

  • 当收到任务消息传入时,会调用addTask方法向redis队列添加一条即时任务
  • 当任务处理失败时,会调用addDelayTask方法向reids队列添加一条延时任务

TaskDueueHadler类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskServiceaddDelayTask,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息

4. webhook

当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知

查看kk-callcenter-sdk模块下TaskService任务处理类源码

/**
 * @auther: chenjh
 * @time: 2019/3/6 14:44
 * @description
 */
@Component
public class WebHookClientComponent {

    private HttpClient httpclient = HttpClients.createDefault();

    private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";

    public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
        if (webhook == null || msgContent.trim().isEmpty()) {
            throw new WebHookException("webhook对象或消息对象为空");
        }
        if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
            throw new WebHookException("暂只支持POST请求");
        }
        HttpPost httppost = new HttpPost(webhook.getUrl());
        httppost.addHeader("Content-Type", webhook.getContentType());
        StringEntity se = new StringEntity(msgContent, "utf-8");
        httppost.setEntity(se);
        SendResult sendResult = new SendResult();
        HttpResponse response = httpclient.execute(httppost);
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            //钉钉消息,判断返回码
            if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
                String result = EntityUtils.toString(response.getEntity());
                JSONObject obj = JSONObject.parseObject(result);
                Integer errcode = obj.getInteger("errcode");
                sendResult.setErrorCode(errcode);
                sendResult.setErrorMsg(obj.getString("errmsg"));
                sendResult.setIsSuccess(errcode.equals(0));
            } else {
                sendResult.setIsSuccess(true);
            }
        } else {
            sendResult.setIsSuccess(false);
        }
        return sendResult;
    }
}

通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定

结语

读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献

作者简介

陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。
欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

关于架构&运维部

凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。

架构组招聘

目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn