™技术博客

skywalking | 指标流式处理(注册心跳)

2021年2月23日

数据协议

proto文件

  • proto文件:management/Management.proto

方法及参数

  • 注册方法定义: reportInstanceProperties
  • 心跳方法定义:keepAlive
  • 注册数据定义:InstanceProperties
    1
    2
    3
    4
    5
    message InstanceProperties {
    string service = 1;
    string serviceInstance = 2;
    repeated KeyStringValuePair properties = 3;
    }
  • 心跳数据定义: InstancePingPkg
    1
    2
    3
    4
    message InstancePingPkg {
    string service = 1;
    string serviceInstance = 2;
    }

数据采集

gRPC客户端

ServiceManagementClient

处理逻辑

prepare()方法

  • 监听注册:将自身注册到GRPCChannelManager, 监听gRPC底层网络连接状态
  • 实例名生成:规则 InstanceName=UUID@ipConfig.Agent.INSTANCE_NAME 配置指定

    定制化为 ip@hostname

boot()方法

  • 定时任务:启动定时注册及心跳任务;间隔30秒,或Config.Collector.HEARTBEAT_PERIOD指定

statusChanged()方法

  • 监听事件:GRPCChannelManager gRPC网络连接状态GRPCChannelStatus改变时触发
  • 处理逻辑: 连接成功,重建本地存根Stub

Runnable#run()方法

  • 网络未连接,不处理
  • 循环次数%10==0,进行实例注册与更新
  • 循环次数%10!=0,进行服务与实例的心跳检测

数据接收

gRPC服务端

ManagementServiceHandler

传输数据

注册信息demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
service: "demo::projectA"
serviceInstance: "7ff3e127186d45848e7371ffae1f6f73@192.168.31.36"
properties {
key: "OS Name"
value: "Mac OS X"
}
properties {
key: "hostname"
value: "chenrqmac"
}
properties {
key: "ipv4"
value: "192.168.31.36"
}
properties {
key: "Process No."
value: "76357"
}
properties {
key: "language"
value: "java"
}

心跳信息demo

1
2
service: "demo::projectA"
serviceInstance: "121454622972403eb6490293f4e51f86@192.168.31.36"

数据处理

数据源及转换器

  • 数据源:ServiceInstanceUpdate 继承 Source
  • 类型转换:InstanceTrafficDispatcher 实现 SourceDispatcher
  • 原始数据类型: InstanceTraffic 继承 Metrics 指标数据

流计算模型

  • 指标计算模型:MetricsStreamProcessor
  • 分布式计算,分为两个极端
  1. 阶段L1: 针对相同ID的实体,在当前OAP节点进行汇集计算;
  2. 阶段L2: 通过ID进行hash路由,在分布式环境中,使得离散数据进行二次汇集;再通过与存储中数据(内存或数据库)的汇集计算,完成入库。

流计算阶段

L1阶段职责

  • 数据接收与解析,并在当前OAP节点内进行数据聚合,使用OAL或者其他聚合模式
  • MetricsAggregateWorker
    • in()方法:数据接收
    • onWorker数据解析:调用指标 combine() 方法进行聚合

L2阶段职责

  • 分布式聚合:根据一定的路由规则,将经过L1聚合后的数据路由到指定节点后,进行二次汇集。
  • MetricsRemoteWorker: 数据发送Sender
    • in()方法:接收上一步worker数据
    • send()方法:调用 RemoteSenderService 发送数据到指定OAP节点
    • 路由规则:HashCode(指标模型) Rolling ForeverFirst
    • 调用发送客户端RemoteClientGRPCRemoteClientSelfRemoteClient
  • MetricsPersistentWorker: 数据接收Receiver
    • in()方法:接收上一步worker数据
    • onWokr()方法:写入缓存 ReadWriteSafeCache
    • prepareBatch():二次聚合并落库

二次聚合规则

  1. 数据库现有比较:查库规则 = 接收到的数据列表 - 缓存中已有的实体列表;查库,结果入缓存
  2. 二次聚合:接收到的数据列表 与 缓存中的数据,进行二次聚合,具体规则
  • 缓存存在但是实体不支持更新,直接跳过不处理
  • 缓存存在且实体支持更新,进行二次聚合,prepareBatchUpdate 落库
  • 缓存不存在,直接prepareBatchInsert落库

数据存储

批量存储定时器

PersistenceTimer

  • 枚举单例
  • 定时任务:core模块启动完毕,启动该定时器
  • 每隔3s(或persistentPeriod指定),调用extractDataAndSave批量更新或存储
  • 调用具体 PersistenceWorker#prepareBatch() 具体实现完成落库

批量存储

MetricsPersistentWorker#prepareBatch

  • 批量落库: 单次批量大小上限2000 (硬编码)

数据存储demo

Instance (注册及心跳上送)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index" : "ali02_instance_traffic-20210219",
"_type" : "_doc",
"_id" : "ZGVtbzo6cHJvamVjdEE=.1_NjE1MTIxN2EyZjdiNGU2NWIxZDIyOTZmMjg5YTU3NTNAMTkyLjE2OC4zMS4zNg==",
"_score" : 1.0,
"_source" : {
"last_ping" : 202102191528,
"service_id" : "ZGVtbzo6cHJvamVjdEE=.1",
"name" : "6151217a2f7b4e65b1d2296f289a5753@192.168.31.36",
"time_bucket" : 202102191527,
"properties" : """{"OS Name":"Mac OS X","hostname":"chenrqmac","Process No.":"86312","language":"java","ipv4s":"192.168.31.36"}"""
}
}

Service (心跳上送)

1
2
3
4
5
6
7
8
9
10
11
{
"_index" : "ali02_service_traffic-20210219",
"_type" : "_doc",
"_id" : "ZGVtbzo6cHJvamVjdEE=.1",
"_score" : 1.0,
"_source" : {
"node_type" : 0,
"name" : "demo::projectA",
"service_group" : "demo"
}
}

扫描二维码,分享此文章