数据协议
proto文件
proto
文件:management/Management.proto
方法及参数
- 注册方法定义:
reportInstanceProperties
- 心跳方法定义:
keepAlive
- 注册数据定义:
InstanceProperties
1
2
3
4
5message InstanceProperties {
string service = 1;
string serviceInstance = 2;
repeated KeyStringValuePair properties = 3;
} - 心跳数据定义:
InstancePingPkg
1
2
3
4message InstancePingPkg {
string service = 1;
string serviceInstance = 2;
}
数据采集
gRPC客户端
ServiceManagementClient
处理逻辑
prepare()
方法
- 监听注册:将自身注册到
GRPCChannelManager
, 监听gRPC底层网络连接状态 - 实例名生成:规则
InstanceName=UUID@ip
或Config.Agent.INSTANCE_NAME
配置指定定制化为
ip@hostname
boot()
方法
- 定时任务:启动定时注册及心跳任务;间隔30秒,或
Config.Collector.HEARTBEAT_PERIOD
指定
statusChanged()
方法
- 监听事件:
GRPCChannelManager
gRPC网络连接状态GRPCChannelStatus
改变时触发 - 处理逻辑: 连接成功,重建本地存根
Stub
Runnable#run()
方法
- 网络未连接,不处理
- 循环次数%10==0,进行实例注册与更新
- 循环次数%10!=0,进行服务与实例的心跳检测
数据接收
gRPC服务端
ManagementServiceHandler
传输数据
注册信息demo
1 | service: "demo::projectA" |
心跳信息demo
1 | service: "demo::projectA" |
数据处理
数据源及转换器
- 数据源:
ServiceInstanceUpdate
继承Source
- 类型转换:
InstanceTrafficDispatcher
实现SourceDispatcher
- 原始数据类型:
InstanceTraffic
继承Metrics
指标数据
流计算模型
- 指标计算模型:
MetricsStreamProcessor
- 分布式计算,分为两个极端
- 阶段L1: 针对相同ID的实体,在当前OAP节点进行汇集计算;
- 阶段L2: 通过ID进行hash路由,在分布式环境中,使得离散数据进行二次汇集;再通过与存储中数据(内存或数据库)的汇集计算,完成入库。
流计算阶段
L1阶段职责
- 数据接收与解析,并在当前OAP节点内进行数据聚合,使用OAL或者其他聚合模式
MetricsAggregateWorker
in()
方法:数据接收onWorker
数据解析:调用指标combine()
方法进行聚合
L2阶段职责
- 分布式聚合:根据一定的路由规则,将经过L1聚合后的数据路由到指定节点后,进行二次汇集。
MetricsRemoteWorker
: 数据发送Senderin()
方法:接收上一步worker数据send()
方法:调用RemoteSenderService
发送数据到指定OAP节点- 路由规则:
HashCode
(指标模型)Rolling
ForeverFirst
- 调用发送客户端
RemoteClient
:GRPCRemoteClient
和SelfRemoteClient
MetricsPersistentWorker
: 数据接收Receiverin()
方法:接收上一步worker数据onWokr()
方法:写入缓存ReadWriteSafeCache
prepareBatch()
:二次聚合并落库
二次聚合规则
- 数据库现有比较:查库规则 = 接收到的数据列表 - 缓存中已有的实体列表;查库,结果入缓存
- 二次聚合:接收到的数据列表 与 缓存中的数据,进行二次聚合,具体规则
- 缓存存在但是实体不支持更新,直接跳过不处理
- 缓存存在且实体支持更新,进行二次聚合,
prepareBatchUpdate
落库 - 缓存不存在,直接
prepareBatchInsert
落库
数据存储
批量存储定时器
PersistenceTimer
- 枚举单例
- 定时任务:core模块启动完毕,启动该定时器
- 每隔3s(或
persistentPeriod
指定),调用extractDataAndSave
批量更新或存储 - 调用具体
PersistenceWorker#prepareBatch()
具体实现完成落库
批量存储
MetricsPersistentWorker#prepareBatch
- 批量落库: 单次批量大小上限2000 (硬编码)
数据存储demo
Instance
(注册及心跳上送)
1 | { |
Service
(心跳上送)
1 | { |
扫描二维码,分享此文章