队列内核
- 基于无锁环状队列,生产者-消费者 内存消息队列模型;
- 作用是在生产者和消费者之间创建一个缓冲异步内存队列,防止因skywalking收集数据方生产数据的速度大于往后端发送数据的速度而造成数据积压和生产方阻塞
Buffer
- 两种实现方式:
ArrayBlockingQueueBuffer和Buffer - skywalking队列内核中数据的载体,队列之中的数据都存储在
buffer中。 - 属性
Object[] buffer:存储数据数组, 固定长度(由buffer.bufferSize决定)BufferStrategy strategy:入队策略;存在新旧数据冲突时的处理策略- 默认
OVERRIDE:直接覆盖旧数据 BLOCKING:使用ArrayBlockingQueueBuffer,基于ArrayBlockingQueue;不适合Agent端IF_POSSIBLE策略:直接返回false
- 默认
AtomicRangeInteger index:原子循环索引,基于AtomicIntegerArray,范围同bufferSize。
- 方法
obtain()方法:一次性读取(并清理)Buffer中全部数据的功能,同时也提供了部分读取的重载save()方法负责向底层数组中填充数据buffer数据存储介质 和AtomicRangeInteger原子性和循环性 组成skywalking队列内核
Channel
Channel是管理Buffer的载体- 属性
QueueBuffer<T>[] bufferChannels:Buffer的数组集合,是队列内核中的数据载体IDataPartitioner<T> dataPartitioner:决定数据存储位置与重试次数ProducerThreadPartitioner:根据ThreadID,保证相同线程数据在一个Buffer中。SimpleRollingPartitioner:简单循环自增选择器,使用无锁整型(volatile修饰)的自增并取模,选择要写入的Buffer。
BufferStrategy strategy:队列策略,与Buffer保持一致long size:Channels能够容纳的数据大小。其值 =1 * channelSize * bufferSizechannelSize参数:Channel的数量bufferSize参数:每个Buffer中buffer的大小
DataCarrier
- 队列内核的门户:与skywalking其他模块进行交互
- 初始化必须参数:
channelSize、bufferSize - 属性
Channels<T> channelsIDriver driverString name
实现Demo TraceSegmentServiceClient
生产者
- 初始化
CHANNEL_SIZE = Buffer.CHANNEL_SIZE=5BUFFER_SIZE = Buffer.BUFFER_SIZE=300BufferStrategy = IF_POSSIBLE
- 队列模型
- Buffer 自实现环形队列
- 生产消息
produce()方法:调用Channels#save()
- 数据分发:
dataPartitioner#partition()默认使用SimpleRollingPartitioner- 判断当前数据应该存储在哪个
Buffer:i++ % channelSize - 判断当前数据应该存储在
buffer的索引:index.getAndIncrement() - 重试次数
dataPartitioner.maxRetryCount()默认为 3
- 判断当前数据应该存储在哪个
消费管理 ConsumeDriver
IConsumer消费者和ConsumerThread线程组,按照一定的消费模式集成到一起,提供相关API。- 依赖
ConsumerThread的实现是ConsumerDriver - 依赖
MultipleChannelsConsumer的实现是BulkConsumerPool
- 依赖
- 方法
isRunning:检测当前IDriver是否正在运行close:关闭消费线程begin:启动消费线程
ConsumerDriver- 维护了固定数量的
ConsumerThread[]线程,共同消费一个Channels中的数据 - 消费的
Channels、ConsumerThread线程数以及两者的绑定关系一旦确定,在整个ConsumerDriver的生命周期中不会再进行变更。
- 维护了固定数量的
- 消费线程分配逻辑
ConsumerDriver#allocateBuffer2Thread()方法consumerThreads.length < channelSize:每个ConsumerThread线程会处理多个ChannelconsumerThreads.length == channelSize:每个ConsumerThread线程处理一个ChannelconsumerThreads.length > channelSize:有一些消费线程就没有作用
消费线程 ConsumerThread
- 属性
threadNameconsumer消费对象:IConsumer实例,反射新建,如TraceSegmentServiceClientrunning:状态dataSources:绑定的Channel即Buffer消费数据源consumeCycle:消费循环间隔时间 默认20毫秒
- 消费逻辑
run()- 每个
ConsumerThread初始化一个初始容量在1500的consumeList - 从每个
dataSource取数存入consumeList - 由消费者对象重写的
consume()方法进行消费,如TraceSegmentServiceClient - 每次消费完毕,清空
consumeList,休眠consumeCycle毫秒 循环消费
- 每个
- 关闭逻辑
- 退出时,显示调用
consumerThread.shutdown()修改running状态 - 线程退出循环,执行最后一次保障消费逻辑
- 调用
onExit()执行消费对象退出前动作
- 退出时,显示调用
实现Demo MetricsAggregateWorker
生产者
- 初始化
CHANNEL_SIZE = Buffer.CHANNEL_SIZE=2BUFFER_SIZE = Buffer.BUFFER_SIZE=10000BufferStrategy = BLOCKING
- 队列模型
ArrayBlockingQueueBuffer基于 JDKArrayBlockingQueue
- 生产消息
produce()方法:调用Channels#save()
- 数据分发:
dataPartitioner#partition()默认使用SimpleRollingPartitioner- 判断当前数据应该存储在哪个
Buffer:i++ % channelSize - 判断当前数据应该存储在
buffer的索引:index.getAndIncrement()
- 判断当前数据应该存储在哪个
消费管理BulkConsumePool
- 一般统一维护在
ConsumerPoolFactory中 ConsumerPoolFactory是通过枚举方式实现的单例类,其底层维护了一个Map<String, ConsumerPool>集合,其中Key就是BulkConsumePool的名称-
allConsumers字段(List类型)中维护了当前启动的MultipleChannelsConsumer线程 -
BulkConsumePool的核心实现在其add()方法,通过该方法向BulkConsumePool添加新Channels以及对应IConsumer时,会通过getLowestPayload()方法选择负载最低的MultipleChannelsConsumer线程进行处理(即当前处理Group最少的线程)
消费线程池MultipleChannelsConsumer
MultipleChannelsConsumer提供了另一种消费模式,与ConsumerThread的区别在于:MultipleChannelsConsumer线程可以处理多组Group,每个Group都是一个IConsumer + 一个 Channels的组合
MultipleChannelsConsumer可以同时消费多个Group,每个Group中的IConsumer对象包含了消费逻辑,Channels对象包含了待消费的数据- 一旦
Channels被添加到MultipleChannelsConsumer中,将会被一个MultipleChannelsConsumer完全消费,不会像ConsumerThread那样分区域部分消费 - 在
run()方法中,MultipleChannelsConsumer线程会定时循环遍历其消费的全部Group,一旦发现可消费的数据,就会循环调用consume()方法处理每个Group。 - 底层通过一个
ArrayList维护Group集合(consumeTargets字段) - 通过 Copy-on-Write 的方式保证线程安全,即在调用
addNewTarget()方法向consumeTargets集合添加Group时,会创建一个新的ArrayList集合并拷贝原集合内容,然后向新集合中添加数据,待新集合添加完成之后,直接替换原有集合 - 之所以这样做是因为在添加的过程中,
MultipleChannelsConsumer线程可能正在循环处理consumeTargets集合,这也是consumeTargets用volatile修饰的原因
扫描二维码,分享此文章