队列内核
- 基于无锁环状队列,生产者-消费者 内存消息队列模型;
- 作用是在生产者和消费者之间创建一个缓冲异步内存队列,防止因skywalking收集数据方生产数据的速度大于往后端发送数据的速度而造成数据积压和生产方阻塞
Buffer
- 两种实现方式:
ArrayBlockingQueueBuffer
和Buffer
- skywalking队列内核中数据的载体,队列之中的数据都存储在
buffer
中。 - 属性
Object[] buffer
:存储数据数组, 固定长度(由buffer.bufferSize决定)BufferStrategy strategy
:入队策略;存在新旧数据冲突时的处理策略- 默认
OVERRIDE
:直接覆盖旧数据 BLOCKING
:使用ArrayBlockingQueueBuffer
,基于ArrayBlockingQueue
;不适合Agent端IF_POSSIBLE
策略:直接返回false
- 默认
AtomicRangeInteger index
:原子循环索引,基于AtomicIntegerArray
,范围同bufferSize
。
- 方法
obtain()
方法:一次性读取(并清理)Buffer中全部数据的功能,同时也提供了部分读取的重载save()
方法负责向底层数组中填充数据buffer
数据存储介质 和AtomicRangeInteger
原子性和循环性 组成skywalking队列内核
Channel
Channel
是管理Buffer
的载体- 属性
QueueBuffer<T>[] bufferChannels
:Buffer
的数组集合,是队列内核中的数据载体IDataPartitioner<T> dataPartitioner
:决定数据存储位置与重试次数ProducerThreadPartitioner
:根据ThreadID
,保证相同线程数据在一个Buffer
中。SimpleRollingPartitioner
:简单循环自增选择器,使用无锁整型(volatile
修饰)的自增并取模,选择要写入的Buffer
。
BufferStrategy strategy
:队列策略,与Buffer
保持一致long size
:Channels能够容纳的数据大小。其值 =1 * channelSize * bufferSize
channelSize
参数:Channel
的数量bufferSize
参数:每个Buffer
中buffer
的大小
DataCarrier
- 队列内核的门户:与skywalking其他模块进行交互
- 初始化必须参数:
channelSize
、bufferSize
- 属性
Channels<T> channels
IDriver driver
String name
实现Demo TraceSegmentServiceClient
生产者
- 初始化
CHANNEL_SIZE = Buffer.CHANNEL_SIZE=5
BUFFER_SIZE = Buffer.BUFFER_SIZE=300
BufferStrategy = IF_POSSIBLE
- 队列模型
- Buffer 自实现环形队列
- 生产消息
produce()
方法:调用Channels#save()
- 数据分发:
dataPartitioner#partition()
默认使用SimpleRollingPartitioner
- 判断当前数据应该存储在哪个
Buffer
:i++ % channelSize
- 判断当前数据应该存储在
buffer
的索引:index.getAndIncrement()
- 重试次数
dataPartitioner.maxRetryCount()
默认为 3
- 判断当前数据应该存储在哪个
消费管理 ConsumeDriver
IConsumer
消费者和ConsumerThread
线程组,按照一定的消费模式集成到一起,提供相关API。- 依赖
ConsumerThread
的实现是ConsumerDriver
- 依赖
MultipleChannelsConsumer
的实现是BulkConsumerPool
- 依赖
- 方法
isRunning
:检测当前IDriver
是否正在运行close
:关闭消费线程begin
:启动消费线程
ConsumerDriver
- 维护了固定数量的
ConsumerThread
[]线程,共同消费一个Channels
中的数据 - 消费的
Channels
、ConsumerThread
线程数以及两者的绑定关系一旦确定,在整个ConsumerDriver
的生命周期中不会再进行变更。
- 维护了固定数量的
- 消费线程分配逻辑
ConsumerDriver#allocateBuffer2Thread()
方法consumerThreads.length < channelSize
:每个ConsumerThread
线程会处理多个Channel
consumerThreads.length == channelSize
:每个ConsumerThread
线程处理一个Channel
consumerThreads.length > channelSize
:有一些消费线程就没有作用
消费线程 ConsumerThread
- 属性
threadName
consumer
消费对象:IConsumer
实例,反射新建,如TraceSegmentServiceClient
running
:状态dataSources
:绑定的Channel
即Buffer
消费数据源consumeCycle
:消费循环间隔时间 默认20毫秒
- 消费逻辑
run()
- 每个
ConsumerThread
初始化一个初始容量在1500的consumeList
- 从每个
dataSource
取数存入consumeList
- 由消费者对象重写的
consume()
方法进行消费,如TraceSegmentServiceClient
- 每次消费完毕,清空
consumeList
,休眠consumeCycle
毫秒 循环消费
- 每个
- 关闭逻辑
- 退出时,显示调用
consumerThread.shutdown()
修改running
状态 - 线程退出循环,执行最后一次保障消费逻辑
- 调用
onExit()
执行消费对象退出前动作
- 退出时,显示调用
实现Demo MetricsAggregateWorker
生产者
- 初始化
CHANNEL_SIZE = Buffer.CHANNEL_SIZE=2
BUFFER_SIZE = Buffer.BUFFER_SIZE=10000
BufferStrategy = BLOCKING
- 队列模型
ArrayBlockingQueueBuffer
基于 JDKArrayBlockingQueue
- 生产消息
produce()
方法:调用Channels#save()
- 数据分发:
dataPartitioner#partition()
默认使用SimpleRollingPartitioner
- 判断当前数据应该存储在哪个
Buffer
:i++ % channelSize
- 判断当前数据应该存储在
buffer
的索引:index.getAndIncrement()
- 判断当前数据应该存储在哪个
消费管理BulkConsumePool
- 一般统一维护在
ConsumerPoolFactory
中 ConsumerPoolFactory
是通过枚举方式实现的单例类,其底层维护了一个Map<String, ConsumerPool>
集合,其中Key就是BulkConsumePool
的名称-
allConsumers
字段(List类型)中维护了当前启动的MultipleChannelsConsumer
线程 -
BulkConsumePool
的核心实现在其add()
方法,通过该方法向BulkConsumePool
添加新Channels
以及对应IConsumer
时,会通过getLowestPayload()
方法选择负载最低的MultipleChannelsConsumer
线程进行处理(即当前处理Group
最少的线程)
消费线程池MultipleChannelsConsumer
MultipleChannelsConsumer
提供了另一种消费模式,与ConsumerThread
的区别在于:MultipleChannelsConsumer
线程可以处理多组Group
,每个Group
都是一个IConsumer + 一个 Channels
的组合MultipleChannelsConsumer
可以同时消费多个Group
,每个Group
中的IConsumer
对象包含了消费逻辑,Channels
对象包含了待消费的数据- 一旦
Channels
被添加到MultipleChannelsConsumer
中,将会被一个MultipleChannelsConsumer
完全消费,不会像ConsumerThread
那样分区域部分消费 - 在
run()
方法中,MultipleChannelsConsumer
线程会定时循环遍历其消费的全部Group
,一旦发现可消费的数据,就会循环调用consume()
方法处理每个Group
。 - 底层通过一个
ArrayList
维护Group
集合(consumeTargets
字段) - 通过 Copy-on-Write 的方式保证线程安全,即在调用
addNewTarget()
方法向consumeTargets
集合添加Group
时,会创建一个新的ArrayList
集合并拷贝原集合内容,然后向新集合中添加数据,待新集合添加完成之后,直接替换原有集合 - 之所以这样做是因为在添加的过程中,
MultipleChannelsConsumer
线程可能正在循环处理consumeTargets
集合,这也是consumeTargets
用volatile
修饰的原因
扫描二维码,分享此文章