kafka-ISR

Partition & Replica

首先来看Partition的Replication副本是个什么概念. 每个Partition都可以有多个Replication.
其中Leader副本负责读写, Follower副本负责从Leader拉取数据. Replica分布在不同的Broker上.
所以Replica只需要两个属性: Partition(分区), brokerid(所在的Kafka节点).

k_partition

一个Replication有两个重要的元数据: HighWatermark和LogEndOffset.

  • HighWatermark是用来确保消费者能获取到的消息的最高水位,超过这个水位的消息是不会被客户端看到的.
  • 由于Leader负责读写,所以HW只能由Leader更新,但是怎么时候更新,可能由follower在更新LEO时通知Leader修改.
  • LogEndOffset是所有的Replica都会有的:Leader在消息追加后会更新,follower在从Leader抓取消息也也会更新.

k_update_leo

问:最后一个可选的Log是怎么用来判断是否是本地的Replica? Local和Remote又是针对什么而言?
答:一个Partition的所有Replicas都是保存在Leader节点的内存中的. 而不是让每个节点自己管理自己的信息,
如果这样的话,每个Kafka节点的信息就都是不一样的! 而分布式集群管理是要有一个中心来管理所有节点信息的.
因为现在是在Leader节点上,并且是由Leader管理所有的Replicas,Leader自己就是Local,其他Replics都是Remote!
Leader Replica = Local Replica, Follower Repicas = Remote Replicas.

问:因为Partitions是分布在不同的Kafka节点,本身每个节点记录的Partitions就是不一样的了.
答:没错,分布式节点保存的信息是不一样的,但是同一个Partition的所有Replicas应该是交给Leader管理的.
而follower上的Replicas并不需要管理自己的状态,因为Leader替他们管理好了.

k_local_remote

class Replica(val brokerId: Int, val partition: Partition, time: Time = SystemTime, initialHighWatermarkValue: Long = 0L, val log: Option[Log] = None) {
  // the high watermark offset value, in non-leader replicas only its message offsets are kept
  @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
  // the log end offset value, kept in all replicas;
  // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
  @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata

  def isLocal: Boolean = log match {
    case Some(l) => true    // 创建Replica时指定Log时, 则表示是本地的Replication
    case None => false
  }

  // 设置LEO: 不应该在Local Replication的Partition上设置LEO. 对于Local Replica,它是由Log的EndOffset指定,而不能调用改update方法
  private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
    if (!isLocal) logEndOffsetMetadata = newLogEndOffset
  }
  // 获取LEO
  def logEndOffset = if (isLocal) log.get.logEndOffsetMetadata else logEndOffsetMetadata

  // 设置HW: 不应该在非Local Replication的Partition上设置HW, 即只能在Local Replica上设置, Local Replica也是Leader Replica
  def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
    if (isLocal) highWatermarkMetadata = newHighWatermark
  }
  // 获取HW
  def highWatermark = highWatermarkMetadata
}

在Partition中创建Replica,如果不在assignedReplicaMap中,根据是否是本地(Leader)来创建Replica实例.
Local的Replica比Remote的多了offset和Log. Replica的isLocal()会根据是否有Log判断是不是Local.

  def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
    val replicaOpt = getReplica(replicaId)
    replicaOpt match {
      case Some(replica) => replica
      case None =>
        if (isReplicaLocal(replicaId)) {
          val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
          // TopicAndPartition的Log
          val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
          // log.dirs是所有TopicAndPartition的父目录, 而checkpoints也是全局的
          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
          val offsetMap = checkpoint.read
          // checkpoints中记录了所有Partition的offset信息, 所以可以根据TopicAndPartition找到对应的offset
          if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
          val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
          // 本地的是Leader, 所以要给出offset和Log.
          val localReplica = new Replica(replicaId, this, time, offset, Some(log))
          addReplicaIfNotExists(localReplica)
        } else {
          // 远程的是follower.
          val remoteReplica = new Replica(replicaId, this, time)
          addReplicaIfNotExists(remoteReplica)
        }
        getReplica(replicaId).get
    }
  }

注意上面的log.dir是TopicAndPartition的目录(每个TopicAndPartition目录是唯一的).
log.dir.getParentFile指的是配置文件中的log.dirs,而不是Partition的目录.
log.dirs是server.properties的log.dirs配置项,它是所有TopicAndPartition的父目录.

getOrCreateReplica被调用的地方是在ReplicaManager.becomeLeaderOrFollower->makeFollowers

ReplicaManager->OffsetCheckpoint

Local Replica的offset来源于High watermark的checkpoint(因为HW很重要,所有需要做检查点).
ReplicaManager的highWatermarkCheckpoints是一个Map:日志目录(log.dirs)->OffsetCheckpoint.

  val highWatermarkCheckpoints = config.logDirs.map(dir => 
    (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))
  ).toMap

下面的/data/kafka目录是server.properties中的log.dirs的配置项,会在这个目录下生成checkpoint文件.

➜  kafka  ll                                                                        ⬅️ log.dirs目录
-rw-r--r--  1 zhengqh  staff     0B  1 14 16:13 recovery-point-offset-checkpoint
-rw-r--r--  1 zhengqh  staff    18B  1 14 16:13 replication-offset-checkpoint       ⬅️ offset-checkpoint文件
drwxr-xr-x  4 zhengqh  staff   136B  1 14 16:13 wikipedia-0
➜  kafka  ll wikipedia-0                                                            ⬅️ topic-partition目录         
-rw-r--r--  1 zhengqh  staff    10M  1 14 16:13 00000000000000000000.index          ⬅️ Segment日志文件和索引文件
-rw-r--r--  1 zhengqh  staff     0B  1 14 16:13 00000000000000000000.log

ReplicaManager是管理所有的Partition的,而checkpoints里记录的offsetMap是所有Partition共用的.
所以可以根据某个特定的TopicAndPartition找到它在checkpoints中对应的offset,用来创建Replica.

k_offset_checkpoint

因为OffsetCheckpoint记录的是TopicAndPartition到offset的映射关系.所以这个类中只是文件的读写操作.

class OffsetCheckpoint(val file: File) extends Logging {
  def write(offsets: Map[TopicAndPartition, Long]) {
    // write the current version and the number of entries, then the entries, finally flush to disk
    offsets.foreach { case (topicPart, offset) =>
        writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
        writer.newLine()
    }
  }
  def read(): Map[TopicAndPartition, Long] = {
    var offsets = Map[TopicAndPartition, Long]()
    line = reader.readLine()
    while(line != null) {
        val pieces = line.split("\\s+")              
        val topic = pieces(0)
        val partition = pieces(1).toInt
        val offset = pieces(2).toLong
        offsets += (TopicAndPartition(topic, partition) -> offset)
        line = reader.readLine()
    }
  }
}

ReplicaManager是在becomeLeaderOrFollower调度Checkpoint的写入,这个线程是定时运行的,确保HW是最新的.

  private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)   // 原子变量,确保只有一个线程刷写checkpoint文件
  private val allPartitions = new Pool[(String, Int), Partition]                // 所有的Partitions

  def startHighWaterMarksCheckPointThread() = {
    if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
  }

  // Flushes the highwatermark value for all partitions to the highwatermark file 把所有Partitions的HW值刷写到文件中
  def checkpointHighWatermarks() {
    val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
    for((dir, reps) <- replicasByDir) {
      val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap
      highWatermarkCheckpoints(dir).write(hwms)
    }
  }

如何根据Partition得到offset: Partition->Replica->LogOffsetMetadata->messageOffset.

  • 找出所有的Partitions,获取其Replica,确保有Replica的Partition
  • 根据log.dirs重新分组(确保有Log), 每个log.dirs对应了replicas列表
  • 对每个log.dirs, 循环replicas, 转换成TopicAndPartition到HW的映射
  • 最终往每个log.dirs写入了属于这个dir的TopicAndPartition->offset信息

问:HW是由Leader更新的,Broker节点存储的并不都是Leader Partitions,在做checkpoint时是只对Leader Partition做吗?
答:不是的,ReplicaManager是对所有Partitions做checkpoint的.虽然只有Leader更新HW,但是Leader也会将HW广播给follower的.
当follower向leader fetch request的时候,leader会把hw也传给follower,这样follower也有了hw信息. 这样同一个Partition的
所有Replica都有了hw信息.目的是即使Leader挂掉了,这个hw仍然会保留在其他Replica上,其他replica成为leader后,hw也不会丢失.

问:Replica的highWatermark_=方法不是只有Leader才能调用吗(isLocal),那么leader广播给follower的hw是如何被更新的?

ReplicaManager.allPartitions

allPartitions被放入也是在becomeLeaderOrFollower中.这说明一个Partition在Leader和follower转换时是要做很多工作的.

  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
    var partition = allPartitions.get((topic, partitionId))
    if (partition == null) {
      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
      partition = allPartitions.get((topic, partitionId))
    }
    partition
  }

allPartitions记录的是当前节点的所有Partitions.这些Partitions并不都是Leader.

  private def getLeaderPartitions() : List[Partition] = {
    allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList
  }
  private def maybeShrinkIsr() {
    allPartitions.values.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))  //评估ISR,查看是否有replicas可以从中移除
  }

留个坑: becomeLeaderOrFollower的调用以及ISR收缩时对HW的影响.

ReplicaManager.appendMessages

在Log.append之后,需要通知follower获取这些新的消息.因为现在Leader的LEO已经更新了,follower需要及时获取落后的消息
如果Partition的ISR只有1个,说明没有其他Replication,则Leader的LEO更新后,其HW也要也要一起更新(当然这是特殊情况)

k_hw_leo

为了更方便地查看在append之后的操作,我把相关的代码都列在一起了.之前分析过的就省略了.

  //*****ReplicationManager.scala*****
  private def appendToLocalLog(internalTopicsAllowed: Boolean,messagesPerPartition: Map[TopicAndPartition, MessageSet], requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = {
    messagesPerPartition.map { case (topicAndPartition, messages) =>
      val info = partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks)
      (topicAndPartition, LogAppendResult(info))
    }
  }
  //*****Partition.scala*****
  def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
          // ① 写到Partition的Leader的那个Log
          val log = leaderReplica.log.get       
          val info = log.append(messages, assignOffsets = true)
          // ② probably unblock some follower fetch requests since log end offset has been updated
          replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
          // ③ we may need to increment high watermark since ISR could be down to 1
          (info, maybeIncrementLeaderHW(leaderReplica))
    }     
    // some delayed operations may be unblocked after HW changed 
    // ④ 如果HW改变了,则一些延迟的请求(针对消费者)需要被解锁.当然如果HW没有变化,就不需要通知了
    if (leaderHWIncremented) tryCompleteDelayedRequests()
  }
  //*****ReplicationManager.scala*****
  def appendMessages(timeout: Long, ... ){
      val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
      val produceStatus = localProduceResults.map { case (topicAndPartition, result) =>
        topicAndPartition -> ProducePartitionStatus(result.info.lastOffset + 1, ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // required offset, response status
      }
      if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
        // ⑤ create delayed produce operation 创建延迟的Produce操作
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
        // ⑥ try to complete the request immediately, otherwise put it into the purgatory 尝试立即完成请求,否则放入十八层地狱中炼狱一番(实际可以看做是缓存)
        // this is because while the delayed produce operation is being created, new requests may arrive and hence make this operation completable.
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
      } else {
        // ⑦ we can respond immediately 如果不需要等待ISR同步数据,在成功写到Leader之后,就可以返回响应给Producer了
        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
        responseCallback(produceResponseStatus)
      }
  }

①-④ 添加到Leader的本地日志后,要解锁follower的fetch,这样ISR中的follower才能尽快地同步新增的消息.
⑤-⑥ 如果需要ISR中的follower replicas同步数据,则需要创建延迟的ProducerRequest(生产请求),并交给专门的Purgatory处理
⑦ 不需要等待ISR的数据同步,在成功写到Leader之后,就可以返回响应给Producer了

注意这里创建的DelayedProduce和Producer相关的几个对象的关系.

  • produceStatus: 生产者的状态, 主要是TopicAndPartition和ProducePartitionStatus的映射. 会有多个Partition对应自己的状态
  • produceMetadata: 包含了上面的生产者状态, 以及requiredAcks来自Producer的acks设置
  • responseCallback: 回到函数, 来自于appendMessages外面传入的(来源于KafkaApis定义的回调函数<--客户端回调)
  • produceResponseStatus: 最终返回给客户端的响应信息, 在请求完成时, 会把响应状态传给回调函数,然后调用回调函数,完成客户端的回调

ReplicaManager->DelayOperation

这里先简单看下生产请求需要被延迟的场景,后面一篇专门讲为什么需要延迟(以及使用缓存来保存和移除操作)

delayedRequestRequired

在appendToLocalLog后,满足下面的所有条件时,将会产生一个延迟的Produce请求,并且等待replication完成:

  • required acks = -1 : Producer需要等待所有ISR接收数据(这个最重要了,不过由客户端指定的)
  • there is data to append : 有数据(废话,没数据怎么叫生产消息)
  • at least one partition append was successful 至少一个Partition追加成功(一次请求有多个Partition)
  private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 && messagesPerPartition.size > 0 && localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size
  }

DelayedProduce

localProduceResults -> (TopicAndPartition,LogAppendResult) -> ProducePartitionStatus -> ProduceMetadata -> DelayedProduce
本地(Leader)的生产结果 ->           日志追加结果               -> Produce的Partition状态  -> Produce的元数据  -> 延迟的Produce操作

延迟的Produce操作会被ReplicaManager创建(new DelayedProduce),并被ProduceOperationPurgatory监视.

// A delayed produce operation that can be created by the replica manager and watched in the produce operation purgatory
class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, 
                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) extends DelayedOperation(delayMs) 

abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
  override val expirationMs = delayMs + System.currentTimeMillis()
}

DelayedProduce的delayMs以及acks源头来自于KafkaApis.handleProducerRequest的ProduceRequest:

      replicaManager.appendMessages(produceRequest.ackTimeoutMs.toLong, produceRequest.requiredAcks, internalTopicsAllowed, authorizedRequestInfo, sendResponseCallback)

DelayedOperationPurgatory

ReplicaManager的append操作出现了三个tryCompleteXXX(Fetch,Request,ElseWath)都是交给DelayedOperationPurgatory炼狱工厂.

class ReplicaManager:  
  val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
  val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
  def tryCompleteDelayedFetch(key: DelayedOperationKey)   { val completed = delayedFetchPurgatory.checkAndComplete(key) }
  def tryCompleteDelayedProduce(key: DelayedOperationKey) { val completed = delayedProducePurgatory.checkAndComplete(key) }

// A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)

A.尝试完成延迟的fetch请求的触发条件:

  • A.1 Partition的HW发生变化 (正常的fetch--即consumer的fetch,因为HW是针对消费者而言,消费者最多只能到HW)
  • A.2 新的MessageSet追加到本地日志 (follower的fetch, 新的MessageSet有新的LEO, 而follower是跟踪LEO的)

B.尝试完成延迟的Produce请求的触发条件:

  • B.1 Partition的HW发生变化 (对于acks=-1的情况)
  • B.2 收到了一个follower副本的fetch操作 (对于acks>1的情况? acks不是只有-1,0,1三个值吗?)

Partition->IncrementLeaderHW

HW表示的是所有ISR中的节点都已经复制完的消息.也是消费者所能获取到的消息的最大offset,所以叫做high watermark.
注意Leader Partition保存了ISR信息.所以可以看到maybeIncrementLeaderHW()是在appendToLocalLog()内一起执行的

C.增加(Leader)Partition的Hight Watermark(HW)的触发条件:

  • C.1 (Leader)Partition的ISR发生变化 (假设某个很慢的节点落后很多从ISR中移除,而其他节点大部分都catch-up,就可以更新HW)
  • C.2 任何Replication的LEO发生变化 (ISR中的followers有任何一个节点LEO改变,看看所有ISR是否都复制了,然后更新HW)
  private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
    // 所有inSync副本中最小的LEO(因为每个follower的LEO都可能不一样), 表示的是最新的hw
    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)

    // Leader本身的hw, 是旧的
    val oldHighWatermark = leaderReplica.highWatermark  // 是一个LogOffsetMetadata
    if(oldHighWatermark.precedes(newHighWatermark)) {   // 比较Leader的messageOffset是否小于ISR的
      leaderReplica.highWatermark = newHighWatermark    // Leader小于ISR, 更新Leader为ISR中最小的
      true      
    }else false     // Returns true if the HW was incremented, and false otherwise.
  }

在Log中append的updateLogEndOffset(LogAppendInfo.lastOffset+1)更新的是LogOffsetMetadata的messageOffset
而这里的oldHighWatermark和newHighWatermark也都是LogOffsetMetadata(最重要的就是messageOffset字段了)!
所以实际比较的是两个LogOffsetMetadata的messageOffset, 只有Leader的HW小于ISR最小的LEO,才更新Leader的HW.

  • leaderReplica是Partition的Leader副本,一个Partition只有一个Leader,读写都发生在Leader上
  • inSyncReplicas是Partition的follower中追赶上Leader的副本(并不是所有Follower都是InSync)

这两个属性都是Partiton级别的,即Partiton要知道它的leader是哪个,以及这个leader管理的isr列表是什么.

其他

delay operation complete

触发条件(延迟请求以及增加HW)中关于ISR的部分都是环环相扣的:

  • leader有新消息写到本地日志(生产者写新数据) --> A.2 --> DelayedFetch
  • leader replication的LEO发生变化(追加了新消息) --> C.2 --> HW
  • follower向Leader发起fetch请求(ISR的follower会和Leader保持同步) --> B.2 --> DelayedProduce
  • follower所在replication的LEO发生变化(拉取了新消息到本地) --> C.2 --> HW
  • 所有replication的LEO发生变化,Leader的HW也会变化(成功提交了消息) --> C.2 --> HW
  • consumer读取至多Leader的HW,HW变化了,解锁consumer --> A.1 --> DelayedFetch
  • producer等待ISR都同步成功,导致HW变化,就可以返回响应 --> B.1 --> DelayedProduce

k_trigger

Partition and Replica Creation

ReplicaManager负责管理Partiton, 所以是在ReplicaManager创建Partition的: getOrCreatePartition
Partition负责管理Replica, 所以是在Partition中创建Replica的: getOrCreateReplica

ReplicaManager要管理Broker上的所有Partition, allPartitions: Pool[(String, Int), Partition]
Partition也要管理分配的所有Replica,还有Leader Replica和ISR.

k_replica_comps


结语

  • Partition副本由Leader和follower组成,只有ISR列表中的副本是仅仅跟着Leader的
  • Leader管理了ISR列表,只有ISR列表中的所有副本都复制了消息,才能认为这条消息是提交的
  • Leader和follower副本都叫做Replica,同一个Partition的不同副本分布在不同Broker上
  • Replica很重要的两个信息是HighWatermark(HW)和LogEndOffset(LEO)
  • 只有Leader Partition负责客户端的读写,follower从Leader同步数据
  • 所有Replica都会对HW做checkpoint,Leader会在follower的拉取请求时广播HW给follower

Ref