™技术博客

skywalking | 队列内核

2021年2月20日

队列内核

  • 基于无锁环状队列,生产者-消费者 内存消息队列模型;
  • 作用是在生产者和消费者之间创建一个缓冲异步内存队列,防止因skywalking收集数据方生产数据的速度大于往后端发送数据的速度而造成数据积压和生产方阻塞

Buffer

  • 两种实现方式:ArrayBlockingQueueBufferBuffer
  • skywalking队列内核中数据的载体,队列之中的数据都存储在buffer中。
  • 属性
    • Object[] buffer:存储数据数组, 固定长度(由buffer.bufferSize决定)
    • BufferStrategy strategy:入队策略;存在新旧数据冲突时的处理策略
      • 默认OVERRIDE:直接覆盖旧数据
      • BLOCKING:使用ArrayBlockingQueueBuffer,基于ArrayBlockingQueue;不适合Agent端
      • IF_POSSIBLE 策略:直接返回false
    • AtomicRangeInteger index:原子循环索引,基于AtomicIntegerArray,范围同bufferSize
  • 方法
    • obtain()方法:一次性读取(并清理)Buffer中全部数据的功能,同时也提供了部分读取的重载
    • save() 方法负责向底层数组中填充数据

      buffer数据存储介质 和 AtomicRangeInteger原子性和循环性 组成skywalking队列内核

Channel

  • Channel是管理Buffer的载体
  • 属性
    • QueueBuffer<T>[] bufferChannelsBuffer的数组集合,是队列内核中的数据载体
    • IDataPartitioner<T> dataPartitioner:决定数据存储位置与重试次数
      • ProducerThreadPartitioner:根据ThreadID,保证相同线程数据在一个Buffer中。
      • SimpleRollingPartitioner:简单循环自增选择器,使用无锁整型(volatile修饰)的自增并取模,选择要写入的 Buffer
    • BufferStrategy strategy:队列策略,与Buffer保持一致
    • long size:Channels能够容纳的数据大小。其值 = 1 * channelSize * bufferSize
      • channelSize 参数:Channel的数量
      • bufferSize参数:每个Bufferbuffer的大小
         bufferSize 、channelSize 参数与 Channels之间的关系

DataCarrier

  • 队列内核的门户:与skywalking其他模块进行交互
  • 初始化必须参数:channelSizebufferSize
  • 属性
    • Channels<T> channels
    • IDriver driver
    • String name

实现Demo TraceSegmentServiceClient

生产者

  • 初始化
    • CHANNEL_SIZE = Buffer.CHANNEL_SIZE=5
    • BUFFER_SIZE = Buffer.BUFFER_SIZE=300
    • BufferStrategy = IF_POSSIBLE
  • 队列模型
    • Buffer 自实现环形队列
  • 生产消息
    • produce()方法:调用Channels#save()
  • 数据分发:dataPartitioner#partition() 默认使用 SimpleRollingPartitioner
    • 判断当前数据应该存储在哪个Bufferi++ % channelSize
    • 判断当前数据应该存储在buffer的索引:index.getAndIncrement()
    • 重试次数 dataPartitioner.maxRetryCount() 默认为 3

消费管理 ConsumeDriver

  • IConsumer消费者和ConsumerThread线程组,按照一定的消费模式集成到一起,提供相关API。
    • 依赖 ConsumerThread 的实现是 ConsumerDriver
    • 依赖 MultipleChannelsConsumer 的实现是 BulkConsumerPool
  • 方法
    • isRunning:检测当前IDriver是否正在运行
    • close:关闭消费线程
    • begin:启动消费线程
  • ConsumerDriver
    • 维护了固定数量的ConsumerThread[]线程,共同消费一个Channels中的数据
    • 消费的 ChannelsConsumerThread 线程数以及两者的绑定关系一旦确定,在整个 ConsumerDriver 的生命周期中不会再进行变更。
  • 消费线程分配逻辑 ConsumerDriver#allocateBuffer2Thread() 方法
    • consumerThreads.length < channelSize:每个ConsumerThread线程会处理多个Channel
    • consumerThreads.length == channelSize:每个ConsumerThread线程处理一个Channel
    • consumerThreads.length > channelSize:有一些消费线程就没有作用

消费线程 ConsumerThread

  • 属性
    • threadName
    • consumer消费对象:IConsumer实例,反射新建,如TraceSegmentServiceClient
    • running:状态
    • dataSources:绑定的ChannelBuffer消费数据源
    • consumeCycle:消费循环间隔时间 默认20毫秒
  • 消费逻辑 run()
    • 每个 ConsumerThread 初始化一个初始容量在1500的consumeList
    • 从每个dataSource取数存入 consumeList
    • 由消费者对象重写的 consume()方法进行消费,如TraceSegmentServiceClient
    • 每次消费完毕,清空consumeList,休眠consumeCycle毫秒 循环消费
  • 关闭逻辑
    • 退出时,显示调用 consumerThread.shutdown() 修改 running 状态
    • 线程退出循环,执行最后一次保障消费逻辑
    • 调用onExit()执行消费对象退出前动作

实现Demo MetricsAggregateWorker

生产者

  • 初始化
    • CHANNEL_SIZE = Buffer.CHANNEL_SIZE=2
    • BUFFER_SIZE = Buffer.BUFFER_SIZE=10000
    • BufferStrategy = BLOCKING
  • 队列模型
    • ArrayBlockingQueueBuffer 基于 JDK ArrayBlockingQueue
  • 生产消息
    • produce()方法:调用Channels#save()
  • 数据分发:dataPartitioner#partition() 默认使用 SimpleRollingPartitioner
    • 判断当前数据应该存储在哪个Bufferi++ % channelSize
    • 判断当前数据应该存储在buffer的索引:index.getAndIncrement()

消费管理BulkConsumePool

  • 一般统一维护在 ConsumerPoolFactory
  • ConsumerPoolFactory 是通过枚举方式实现的单例类,其底层维护了一个 Map<String, ConsumerPool>集合,其中Key就是 BulkConsumePool的名称
  • allConsumers 字段(List类型)中维护了当前启动的 MultipleChannelsConsumer 线程
  • BulkConsumePool的核心实现在其add()方法,通过该方法向BulkConsumePool添加新Channels以及对应 IConsumer 时,会通过 getLowestPayload()方法选择负载最低的MultipleChannelsConsumer线程进行处理(即当前处理Group最少的线程)

消费线程池MultipleChannelsConsumer

  • MultipleChannelsConsumer 提供了另一种消费模式,与ConsumerThread的区别在于:MultipleChannelsConsumer线程可以处理多组Group,每个Group都是一个IConsumer + 一个 Channels的组合
  • MultipleChannelsConsumer可以同时消费多个Group,每个Group中的IConsumer对象包含了消费逻辑,Channels对象包含了待消费的数据
  • 一旦Channels被添加到MultipleChannelsConsumer中,将会被一个 MultipleChannelsConsumer完全消费,不会像ConsumerThread那样分区域部分消费
  • run()方法中,MultipleChannelsConsumer线程会定时循环遍历其消费的全部Group,一旦发现可消费的数据,就会循环调用consume()方法处理每个Group
  • 底层通过一个ArrayList维护Group集合(consumeTargets 字段)
  • 通过 Copy-on-Write 的方式保证线程安全,即在调用addNewTarget()方法向 consumeTargets集合添加Group时,会创建一个新的ArrayList集合并拷贝原集合内容,然后向新集合中添加数据,待新集合添加完成之后,直接替换原有集合
  • 之所以这样做是因为在添加的过程中,MultipleChannelsConsumer线程可能正在循环处理consumeTargets集合,这也是consumeTargetsvolatile修饰的原因
    添加 Group 的核心逻辑

扫描二维码,分享此文章