™技术博客

skywalking | 指标流式处理(端点与地址)

2021年2月23日

数据协议

proto文件

  • proto文件:language-agent/Tracing.proto

方法及参数

  • TraceSegment方法定义: collect
  • TraceSegment数据定义:SegmentObject
1
2
3
4
5
6
7
8
9
10
11
12
13
14
message SegmentObject {
// A string id represents the whole trace.
string traceId = 1;
// A unique id represents this segment.
string traceSegmentId = 2;
// Span collections included in this segment.
repeated SpanObject spans = 3;
// The logic name represents the service.
string service = 4;
// The logic name represents the service instance.
string serviceInstance = 5;
// Whether the segment includes all tracked spans.
bool isSizeLimited = 6;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
message SpanObject {
// The number id of the span.
int32 spanId = 1;
// The number id of the parent span in the whole segment.
int32 parentSpanId = 2;
// Start timestamp in milliseconds of this span,
int64 startTime = 3;
// End timestamp in milliseconds of this span,
int64 endTime = 4;
// In the across thread and across process, these references targeting the parent segments.
repeated SegmentReference refs = 5;
// A logic name represents this span.
string operationName = 6;
// Remote address of the peer in RPC/MQ case.This is required when spanType = Exit
string peer = 7;
// Span type represents the role in the RPC context.
SpanType spanType = 8;
// Span layer represent the component tech stack, related to the network tech.
SpanLayer spanLayer = 9;
// Component id is a predefinited number id in the SkyWalking.
int32 componentId = 10;
// The status of the span.
bool isError = 11;
// String key, String value pair.
repeated KeyStringValuePair tags = 12;
// String key, String value pair with an accurate timestamp.
repeated Log logs = 13;
// Force the backend don't do analysis, if the value is TRUE.
bool skipAnalysis = 14;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
message SegmentReference {
// Represent the reference type.
// Typically, refType == CrossProcess means SpanObject#spanType = entry.
RefType refType = 1;
// A string id represents the whole trace.
string traceId = 2;
// Another segment id as the parent.
string parentTraceSegmentId = 3;
// The span id in the parent trace segment.
int32 parentSpanId = 4;
// The service logic name of the parent segment.
string parentService = 5;
// The service logic name instance of the parent segment.
string parentServiceInstance = 6;
// The endpoint name of the parent segment.
string parentEndpoint = 7;
// The network address, including ip/hostname and port, which is used in the client side.
// Such as Client --> use 127.0.11.8:913 -> Server
// then, in the reference of entry span reported by Server, the value of this field is 127.0.11.8:913.
string networkAddressUsedAtPeer = 8;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
message Log {
// The timestamp in milliseconds of this event.,
int64 time = 1;
// String key, String value pair.
repeated KeyStringValuePair data = 2;
}

enum SpanType {
// Server side of RPC. Consumer side of MQ.
Entry = 0;
// Client side of RPC. Producer side of MQ.
Exit = 1;
// A common local code execution.
Local = 2;
}

// A ID could be represented by multiple string sections.
message ID {
repeated string id = 1;
}

// Type of the reference
enum RefType {
// Map to the reference targeting the segment in another OS process.
CrossProcess = 0;
// Map to the reference targeting the segment in the same process of the current one, just across thread.
CrossThread = 1;
}

// Map to the layer of span
enum SpanLayer {
// Unknown layer. Could be anything.
Unknown = 0;
// A database layer, used in tracing the database client component.
Database = 1;
// A RPC layer, used in both client and server sides of RPC component.
RPCFramework = 2;
// HTTP is a more specific RPCFramework.
Http = 3;
// A MQ layer, used in both producer and consuer sides of the MQ component.
MQ = 4;
// A cache layer, used in tracing the cache client component.
Cache = 5;
}

数据采集

gRPC客户端

TraceSegmentServiceClient

处理逻辑

prepare()方法

  • 监听注册:将自身注册到GRPCChannelManager, 监听gRPC底层网络连接状态

boot()方法

  • 创建队列内核DataCarrier:CHANNEL=3, BUFFER=300,BufferStrategy=IF_POSSIBLE

statusChanged()方法

  • 监听事件:GRPCChannelManager gRPC网络连接状态GRPCChannelStatus改变时触发
  • 处理逻辑: 连接成功,重建本地存根Stub

onComplete()方法

  • 将自身注册到TracingContext中来监听TracingContext#finish关闭事件

afterFinished()

  • 监听事件:当TracingContext#stopSpan()方法关闭最后一个Span时,会调用finish()方法关闭相应的TraceSegment,触发该方法
  • TraceSegment写入到 DataCarrier

consume()

  • 根据网络连接的状态GRPCChannelStatus决定是否发送
  • 创建GRPCStreamServiceStatus对象
  • 创建观察者StreamObserver,提供回调函数 onNext\onError\onCompleted
  • TraceSegment转换成SegmentObject对象
  • 上送
  • 统计发送的数据量segmentUplinkedCounter
  • 每隔 30s打印一下发送日志: printUplinkStatus()

TraceSegmentServiceClient功能

数据接收

gRPC服务端

TraceSegmentReportServiceHandler

传输数据

上送数据 SegmentObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
traceId: "44c236493f14470fbac8ed17b379f1db.80.16138398133700001"
traceSegmentId: "e712b5d6854e4c52b3763977255c3b52.68.16138398146810000"
spans {
spanId: 2
parentSpanId: 1
startTime: 1613839816016
endTime: 1613839816466
operationName: "H2/JDBI/PreparedStatement/execute"
peer: "localhost:-1"
spanType: Exit
spanLayer: Database
componentId: 32
tags {
key: "db.type"
value: "sql"
}
tags {
key: "db.instance"
value: "test"
}
tags {
key: "db.statement"
value: "INSERT INTO user(name) VALUES(?)"
}
}
spans {
spanId: 1
startTime: 1613839815952
endTime: 1613839816467
operationName: "test.skywalking.springcloud.test.projectb.dao.DatabaseOperateDao.saveUser(java.lang.String)"
spanType: Local
tags {
key: "user.name"
value: "test"
}
}
spans {
spanId: 4
parentSpanId: 3
startTime: 1613839816471
endTime: 1613839816899
operationName: "H2/JDBI/PreparedStatement/execute"
peer: "localhost:-1"
spanType: Exit
spanLayer: Database
componentId: 32
tags {
key: "db.type"
value: "sql"
}
tags {
key: "db.instance"
value: "test"
}
tags {
key: "db.statement"
value: "SELECT * FROM user WHERE name =?"
}
}
spans {
spanId: 3
startTime: 1613839816471
endTime: 1613839816900
operationName: "selectUser"
spanType: Local
tags {
key: "user.name"
value: "test"
}
}
spans {
parentSpanId: -1
startTime: 1613839814682
endTime: 1613839816935
refs {
traceId: "44c236493f14470fbac8ed17b379f1db.80.16138398133700001"
parentTraceSegmentId: "44c236493f14470fbac8ed17b379f1db.80.16138398133700000"
parentSpanId: 1
parentService: "demo::projectA"
parentServiceInstance: "795dfc767ab144cd9add2964fa657b2c@192.168.31.36"
parentEndpoint: "/projectA/{name}"
networkAddressUsedAtPeer: "PROJECTB:80"
}
operationName: "/projectB/{value}"
spanLayer: Http
componentId: 14
tags {
key: "url"
value: "http://192.168.31.36:8762/projectB/test"
}
tags {
key: "http.method"
value: "GET"
}
}
service: "demo::projectB"
serviceInstance: "3d312f128bc94f2ea70b310037ed9eec@192.168.31.36"

数据解析

  • agent-analyzer.TraceAnalyzer#doAnalysis
  • 网络地址:NetworkAddressAliasMappingListener#parseEntry()
  • 服务/实例/端点: MultiScopesAnalysisListener#parseEntry 取自服务端EntrySpan

EntrySpan:应用服务的提供端或入口端点,TraceSegment中第一个Span。例如,HTTP-Server、RPC-Server、MQ-Consumer等入口服务,插件在接收到请求时会创建相应的EntrySpan。
ExitSpan:当请求离开当前服务、进入其他服务时会创建ExitSpan类型的Span。例如, Http Client、RPC Client发起远程调用、MQ-producer生产消息、 Redis客户端的一次Redis调用、MySQL客户端的一次数据库查询
LocalSpan: 本地方法调用时可能创建的Span类型,代表依次普通的Java方法调用,与跨进程无关。

数据处理

数据源及转换器

  • 数据源:

    • Service
    • ServiceInstance
    • Endpoint
    • NetworkAddressAliasSetup
  • 类型转换:

    • ServiceTrafficDispatcher
    • InstanceTrafficDispatcher
    • EndpointTrafficDispatcher
    • NetworkAddressAliasSetupDispatcher
  • 原始数据类型:

    • ServiceTraffic
    • InstanceTraffic
    • EndpointTraffic
    • NetworkAddressAlias

    流式处理

  • 参考skywalking系列 | 指标流式处理(注册心跳)#数据处理

数据存储

批量存储

数据存储demo

Service

1
2
3
4
5
6
7
8
9
10
11
{
"_index" : "ali02_service_traffic-20210219",
"_type" : "_doc",
"_id" : "ZGVtbzo6cHJvamVjdEE=.1",
"_score" : 1.0,
"_source" : {
"node_type" : 0,
"name" : "demo::projectA",
"service_group" : "demo"
}
}

Instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index" : "ali02_instance_traffic-20210219",
"_type" : "_doc",
"_id" : "ZGVtbzo6cHJvamVjdEE=.1_NjE1MTIxN2EyZjdiNGU2NWIxZDIyOTZmMjg5YTU3NTNAMTkyLjE2OC4zMS4zNg==",
"_score" : 1.0,
"_source" : {
"last_ping" : 202102191528,
"service_id" : "ZGVtbzo6cHJvamVjdEE=.1",
"name" : "6151217a2f7b4e65b1d2296f289a5753@192.168.31.36",
"time_bucket" : 202102191527,
"properties" : """{"OS Name":"Mac OS X","hostname":"chenrqmac","Process No.":"86312","language":"java","ipv4s":"192.168.31.36"}"""
}
}

Endpoint

1
2
3
4
5
6
7
8
9
10
11
{
"_index" : "ali02_endpoint_traffic-20210221",
"_type" : "_doc",
"_id" : "ZGVtbzo6cHJvamVjdEM=.1_L3Byb2plY3RDL3t2YWx1ZX0=",
"_score" : 1.0,
"_source" : {
"service_id" : "ZGVtbzo6cHJvamVjdEM=.1",
"name" : "/projectC/{value}",
"time_bucket" : 202102210050
}
}

NetworkAddressAlias

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"_index" : "ali02_network_address_alias-20210221",
"_type" : "_doc",
"_id" : "UFJPSkVDVEI6ODA=",
"_score" : 1.0,
"_source" : {
"address" : "PROJECTB:80",
"last_update_time_bucket" : 202102210050,
"represent_service_instance_id" : "ZGVtbzo6cHJvamVjdEI=.1_M2QzMTJmMTI4YmM5NGYyZWE3MGIzMTAwMzdlZDllZWNAMTkyLjE2OC4zMS4zNg==",
"represent_service_id" : "ZGVtbzo6cHJvamVjdEI=.1",
"time_bucket" : 202102210050
}
}

扫描二维码,分享此文章