帖子
帖子
用户
博客
课程

一文带你理解YonBIP事件中心

YonBuilder应用构建 2024-1-16 21:58 245人浏览 0人回复
摘要

通过事件中心可以实现服务解耦,降低系统之间耦合性,提升系统扩展性。 比如:审批过程中通过事件中心发送待办、已办消息。处理过程中,审批中心只需要在发生对应事件后把事件发送到事件中心即可,无需关注具体发送 ...

1、事件中心的使用场景

事件中心作为YonBIP的底层支撑服务,承载着重要使命,下面是部分功能和对应的使用场景:

  • 解耦:通过事件中心可以实现服务解耦,降低系统之间耦合性,提升系统扩展性。

比如:审批过程中通过事件中心发送待办、已办消息。处理过程中,审批中心只需要在发生对应事件后把事件发送到事件中心即可,无需关注具体发送消息过程

  • 削峰:通过事件中心可削除系统流量峰值降低系统压力。

比如:B2C订单中心,电商下单的时候,需要生成销售出库单,这时的流量可能很大,实际处理过程中在订单中心下单后,订单中心会通过事件中心发送事件,事件中心再推销售出库单,当瞬间量很大的时候,事件中心可以作为缓存,减轻销售出库单所在服务的压力

  • 异步处理:使用事件中心是异步处理,可以加快系统响应时间。

比如:当配置有会员邀请有礼,并且邀请注册会员后,需要给被邀请人发放奖励,发放奖励的过程是通过事件中心实现,负责注册的接口只需要完成注册本身,并把事件发送给事件中心即可返回,无需等待发放奖励处理,从而加快处理速度

  • 事件分发:事件中心支持一个事件类型下面多个监听订阅

比如:多语内容更新事件,订阅有600+个监听

2、事件中心的技术实现

事件中心包含2个服务:iuap-apcom-event、iuap-apcom-eventeos,iuap-apcom-event是主服务,iuap-apcom-eventeos负责读取EOS事件
从功能上来说,整体分为两部分:上游生产事件、下游消费事件,上下游都支持多种接入模式,最常用的是:上游EOS模式 + 下游同步/异步的推送模式,本文只介绍这种模式。

2.1EOS生产同步/异步消费模式整体流程

  • EOS生产结合同步消费模式处理流程如下


1、 上游服务调用事件中心接口,把事件写入本地库
2、 Eventeos扫描事件并修改记录状态
3、 Eventeos服务把事件发送到kafka
4、 Eventeos服务把事件保存到event库
5、 事件中心服务消费kafka消息
6、 事件中心服务给订阅的服务推送事件
7、 事件中心服务把返回的状态保存到event库
8、 如果事件消费失败,事件中心服务把失败的事件发送到kafka的重试topic
9、 事件中心服务消费重试消息
10、 事件中心服务通过时间轮定时重试消息,重复第6步推送事件

- EOS生产结合异步消费模式处理流程如下

1、 上游服务调用事件中心接口,把事件写入本地库
2、 Eventeos扫描事件并修改记录状态
3、 Eventeos服务把事件发送到kafka
4、 Eventeos服务把事件保存到event库
5、 事件中心服务消费kafka消息
6、 事件中心服务给订阅的服务推送事件
7、 下游服务给事件中心上报消费状态
8、 事件中心服务把返回的状态保存到event库
9、 如果事件消费失败,事件中心服务把失败的事件发送到kafka的重试topic
10、 事件中心服务消费重试消息
11、 事件中心服务通过时间轮定时重试消息,重复第6步推送事件

2.2 业务服务落库

2.2.1 落库方法

在上游服务引入eos包后,调用eos的接口(com.yonyou.iuap.event.service.EventService#fireLocalEvent)就行,eos包会自动保存到对应的表里边
事件表的关键字段如下:

2.2.2 问题排查方法

首先查看事件表里边是否保存有记录

如果事件没有记录,说明事件没有保存进去,首先检查程序是不是调用了接口(比如调用的函数里边有没有逻辑判断),如果调用了接口,但是数据库里边没记录,需要查看上游服务日志进行排查

2.3 EOS扫描事件

2.3.1 扫描逻辑和配置方法

EOS事件写到库里边之后,eventeos服务会定时检查是否有新事件,如果有的话,会读取后发往kafka,并修改本地事件记录的状态
要想让EOS服务做扫描,需要先添加配置,配置包括2部分

  • 事件库配置逻辑数据源名称
  • Yms里边把逻辑数据源共享给eventeos(事件中心-消息发送服务)

事件库配置的关键字段如下

2.3.2 问题排查方法

1、如果在本地事件库里边事件的状态都是waitsend,需要考虑是不是没有配置EOS扫描,首先在数据库中查询是否有配置,并且active是1
比如查看iuap-apcom-workflow_ubpm数据源

2、然后查看yms里边有没有把数据源共享给eventeos(事件中心-消息发送服务)


如果数据源没有共享给eventeos服务,需要把数据源共享给eventeos服务,并在yms里边发布配置,然后重启eventeos服务
3、如果前2步都没问题,查看eventeos服务的日志,看有没有异常日志
4、如果还没找到问题,可以跟踪一下代码,处理方法为
com.yonyou.tx.mq.rabbitmq.EosSendTask#timeToQueryMQInMQSend

2.4 EOS发送到kafka并落库

2.4.1 处理流程

Eventeos服务读取到事件后,会把事件发送到kafka,并落库到event库

事件中心里边,的事件是根据事件类型和事件源2个维度组合来确定一类事件的,比如: UPDATE_AFTER事件类型表示数据更新之后的事件,结合USER事件源之后(事件类型:UPDATE_AFTER + 事件源:USER)表示用户信息更新之后的事件。

2.4.2 查询topic方法

发送到kafka时,查询topic方法的逻辑如下:
整体规则:基础topic+环境编码,比如:基础topic是common_type_topic,生产环境上完整topic就是common_type_topic_online
基础topic规则
1、如果事件类型配置表的ext6不为空,使用ext6作为topic
2、如果事件类型配置表的ext1不为空,使用sourceId + "_" + eventType作为topic
3、其他情况使用默认topic:common_type_topic
代码在:com.yonyou.iuap.event.util.TopicNameUtils#createTopicNameByEventDefine
Topic在事件中心服务启动时创建,如果日志中报错没找到topic,重启事件中心服务即可解决

2.4.3Topic列表

基础topic编码列表如下(实际环境中需要在基础topic后面添加环境编码)

注意:topic可能会有变动,以项目实际情况为准

2.4.4 保存到event库

事件发送到kafka之后,会保存到事件服务的库里边,主要有三张表:事件记录、事件体、事件状态
事件记录表关键字段

事件体表关键字段

事件状态表关键字段

代码位置在com.yonyou.iuap.event.service.impl.EventRecordServiceSimplified#batchInsertRecord

2.4.5 问题排查方法

首先到事件中心控制台里边查看事件状态(事件中心控制台使用方法见附录一),如果可以找到记录,根据记录状态进行分析,如果没有找到记录,查看eventeos服务日志,找到异常日志,找到出现问题的位置并处理

2.5 事件中心消费kafka消息并分发事件

2.5.1 处理流程

整体处理流程如下:event服务消费kafka消息,根据订阅配置表发送到下游服务,返回后把结果保存到事件库中并更新状态(如果是异步事件,是下游服务处理完之后,回调event的接口上报状态)

消费kafka消息的方法是:com.yonyou.iuap.event.util.KafkaEventUtils#consumeMsg

2.5.2 查询订阅监听

接受到kafka消息之后,要根据事件类型在订阅的配置中查找到所有对事件的订阅,然后按订阅的配置信息分发事件。
订阅信息保存在事件的数据库中,主要字段如下

事件的订阅也可以在事件中心控制台里边的“事件监听管理(new)”里边查看

2.5.3 更新事件状态

下游返回事件消费状态后,事件中心会保存到事件状态表里边(主要字段见2.4.4)

2.5.4 问题排查方法

这步如果出现问题,首先在事件中心控制台中查看事件状态
1、如果事件状态是“进入”,说明事件中心还没有消费kafka消息,或者消费后处理失败,首先检查eventeos和event服务日志是否有异常日志,如果有异常日志就根据异常日志排查,如果没有异常日志,可能是kafka消息积压导致,可以使用下面的语句检查积压情况
bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip:port> --describe --group <group-name>
结果里边LAG表示积压的消息量,如果LAG值很大,说明有消息积压,可以通过增加对应topic的分区数和事件中心服务的实例数来加快处理速度。另外,还需要加速下游消费者的消费速度,特别是对于同步订阅的消费者。
2、如果状态是“失败”或者“推送失败”,说明事件中心已经接收kafka消息,并且给订阅者推送了事件,在事件控制台中查看返回信息、错误信息,并结合日志具体确认问题
3、如果状态是“异步发送中”、“推送成功”和“拉式未回调”,说明事件正在处理中,如果长时间状态没变化,可能是下游处理慢,或者回调失败,可以检查下游服务和数据的状态来判断是哪种情况,如果是没消费完,说明是下游处理慢,需要优化效率,如果实际已经消费完,查看日志确定具体问题
4、如果状态是“成功”,说明事件已经处理成功

2.6 重试

2.6.1 重试模式

事件中心支持如下重试模式:

  • 不重试
  • 默认策略,则一共重试5 次,每次间隔5 分钟或1分钟(可以配置)。
  • 循环策略,则间隔5 分钟或1分钟(可以配置),在固定时间(可在页面配置)内重试。

2.6.2 处理流程

重试的处理流程如下:消费失败后,事件中心把事件发送到kafka,事件中心再消费kafka的重试消息,然后加入时间轮,时间轮到了之后,会按前面的流程再次给对应的订阅者推送事件。

给Kafka发送消息时候重试的topic规则如下
如果事件类型的ext2值是1,而且监听配置的ext18也是1,使用【common_retry_1m + 环境编码】,否则使用【common_retry_5m + 环境编码】,topic会在事件中心服务启动的时候自动创建
发送重试事件代码在:com.yonyou.iuap.event.service.impl.KafkaSendService#sendToRetry2

2.6.3 问题排查方法

这步的问题可以在事件中心控制台的事件查看功能中查看,如果事件的状态是失败,最后执行时间距离查看时间超过了重试间隔,而且重试次数小于最大重试次数,重试就可能有问题了,这种情况可以在控制台选择对应事件,点击重新发送按钮进行手动试重。

3.附录

附录一 事件中心控制台使用方法

如果遇到事件相关问题,可以登录事件中心控制台进行排查。
登录方法:
1、 使用有事件中心控制台权限的账号登录业务中台
2、 在浏览器地址栏中输入下面地址,进入事件中心控制台https://业务中台域名/iuap-apcom-event/ucf-wh/eventCenter/index.html#/

  • 事件查询

点击事件查询进入事件查询页面。遇到事件相关问题,事件工作台的事件查询是非常重要的工具,遇到事件问题之后优先打开事件查询功能查看

可以按照事件源、事件类型等信息查询,也可以根据事件状态过滤,比如要查询失败的事件,可以在事件状态中选择“失败”,再点查询即可。
查询结果中可以看到事件类型、事件源、发送地址、事件内容、返回信息等关键信息,对于失败的事件,还可以看错误信息,并且可以做重发和查看回调记录

  • 应用事件源管理

事件源模块可以按应用查看和维护事件源,比如在左侧点击人力资源->人力公共服务后,可以看到对应的事件源是CORE_HR

  • 事件类型管理(new)

事件类型管理模块可以查看和维护事件类型,比如按事件源CORE_HR,事件类型CONFIRM_ENTRY(确认入职),可以查到下面结果,在排查问题的时候,可以在该模块查询对应的事件

  • 事件监听管理(new)

事件监听管理模块可以查看和维护事件监听,比如还是按上面的事件源CORE_HR,事件类型CONFIRM_ENTRY(确认入职),可以查到3条记录,说明有对该事件有3个订阅

可以看到3个订阅都是rest请求的方式,具体请求地址可以在“事件接收地址”里边查看。对应具体某个订阅,还可以点击浮动的编辑按钮查看具体信息

点击编辑后可以看到

4 参考文献

1、《iuap-事件中心开发红皮书.pdf》https://ikm.yonyou.com/share?shareid=b30fada124fc73a8fbb1a186c426a2b5

本文暂无评论,快来抢沙发!