数据协议
proto文件
- proto文件:- management/Management.proto
方法及参数
- 注册方法定义: reportInstanceProperties
- 心跳方法定义:keepAlive
- 注册数据定义:InstanceProperties1 
 2
 3
 4
 5message InstanceProperties { 
 string service = 1;
 string serviceInstance = 2;
 repeated KeyStringValuePair properties = 3;
 }
- 心跳数据定义: InstancePingPkg1 
 2
 3
 4message InstancePingPkg { 
 string service = 1;
 string serviceInstance = 2;
 }
数据采集
gRPC客户端
ServiceManagementClient
处理逻辑
prepare()方法
- 监听注册:将自身注册到GRPCChannelManager, 监听gRPC底层网络连接状态
- 实例名生成:规则 InstanceName=UUID@ip或Config.Agent.INSTANCE_NAME配置指定定制化为 ip@hostname
boot()方法
- 定时任务:启动定时注册及心跳任务;间隔30秒,或Config.Collector.HEARTBEAT_PERIOD指定
statusChanged()方法
- 监听事件:GRPCChannelManagergRPC网络连接状态GRPCChannelStatus改变时触发
- 处理逻辑: 连接成功,重建本地存根Stub
Runnable#run()方法
- 网络未连接,不处理
- 循环次数%10==0,进行实例注册与更新
- 循环次数%10!=0,进行服务与实例的心跳检测
数据接收
gRPC服务端
ManagementServiceHandler
传输数据
注册信息demo
| 1 | service: "demo::projectA" | 
心跳信息demo
| 1 | service: "demo::projectA" | 
数据处理
数据源及转换器
- 数据源:ServiceInstanceUpdate继承Source
- 类型转换:InstanceTrafficDispatcher实现SourceDispatcher
- 原始数据类型: InstanceTraffic继承Metrics指标数据
流计算模型
- 指标计算模型:MetricsStreamProcessor
- 分布式计算,分为两个极端
- 阶段L1: 针对相同ID的实体,在当前OAP节点进行汇集计算;
- 阶段L2: 通过ID进行hash路由,在分布式环境中,使得离散数据进行二次汇集;再通过与存储中数据(内存或数据库)的汇集计算,完成入库。
流计算阶段
L1阶段职责
- 数据接收与解析,并在当前OAP节点内进行数据聚合,使用OAL或者其他聚合模式
- MetricsAggregateWorker- in()方法:数据接收
- onWorker数据解析:调用指标- combine()方法进行聚合
 
L2阶段职责
- 分布式聚合:根据一定的路由规则,将经过L1聚合后的数据路由到指定节点后,进行二次汇集。
- MetricsRemoteWorker: 数据发送Sender- in()方法:接收上一步worker数据
- send()方法:调用- RemoteSenderService发送数据到指定OAP节点
- 路由规则:HashCode(指标模型)RollingForeverFirst
- 调用发送客户端RemoteClient:GRPCRemoteClient和SelfRemoteClient
 
- MetricsPersistentWorker: 数据接收Receiver- in()方法:接收上一步worker数据
- onWokr()方法:写入缓存- ReadWriteSafeCache
- prepareBatch():二次聚合并落库
 
二次聚合规则
- 数据库现有比较:查库规则 = 接收到的数据列表 - 缓存中已有的实体列表;查库,结果入缓存
- 二次聚合:接收到的数据列表 与 缓存中的数据,进行二次聚合,具体规则
- 缓存存在但是实体不支持更新,直接跳过不处理
- 缓存存在且实体支持更新,进行二次聚合,prepareBatchUpdate落库
- 缓存不存在,直接prepareBatchInsert落库
数据存储
批量存储定时器
PersistenceTimer
- 枚举单例
- 定时任务:core模块启动完毕,启动该定时器
- 每隔3s(或persistentPeriod指定),调用extractDataAndSave批量更新或存储
- 调用具体 PersistenceWorker#prepareBatch()具体实现完成落库
批量存储
MetricsPersistentWorker#prepareBatch
- 批量落库: 单次批量大小上限2000 (硬编码)
数据存储demo
Instance (注册及心跳上送)
| 1 | { | 
Service (心跳上送)
| 1 | { | 
扫描二维码,分享此文章
