Kafka源码深度剖析——LogManager是什么?

上一小节我们使用场景驱动的方式代码阅读到log.append方法,其实说白了代码就是调用了Log对象的append方法,不过相信很多同学还不知道Log对象是代表的啥?怎么出来的?所以代码看到这儿我们先停一下,现在不得不回头看一下之前的初始化方法了。

-     本次目标     -

上一小节我们使用场景驱动的方式代码阅读到log.append方法,其实说白了代码就是调用了Log对象的append方法,不过相信很多同学还不知道Log对象是代表的啥?怎么出来的?所以代码看到这儿我们先停一下,现在不得不回头看一下之前的初始化方法了。


-     源码剖析     -

首先我们上一小节分析源码的时候根据场景驱动的方式分析到了KafkaAPi里的handleProducerRequest方法,里面调用了ReplicaManager对象的appendMessages方法,代码如下:

defhandleProducerRequest(request: RequestChannel.Request) {
......
replicaManager.appendMessages(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedMessagesPerPartition,
sendResponseCallback)
. ......
}

所以正常情况下,这一讲我们需要回头看一看ReplicaManager是如何初始化的,我们现在回到KafkaServer的start方法,之前我们只是看了里面关于网络部分的代码,这次我们看里面比较重要的几个方法:

defstartup() {
......
/**
* 启动了LogManager
*/
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup();
/**
* 启动ReplicaManager
* ReplicaManger里面传进去了一个重要的参数logManager
*
*/
replicaManager = newReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()

}
......

在这个里面我们看到了ReplicaManager的初始化方法,我们发现其初始化的时候传进去了一个重要的参数:logManager所以要想了解ReplicaManager我们就不得不先去分析LogManager是啥,这就是咱们这次课的重点!看一下LogManager创建的方法:

   privatedefcreateLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
//读取配置文件
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)

val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
topic -> LogConfig.fromProps(defaultProps, configs)
}
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
//TODO 创建了一个LogManager对象
//我们可以点进去看一下他的初始化操作
//注意logDirs代表的是就是我们配置Kafka的时候
//指定的数据存储的目录
newLogManager(logDirs = config.logDirs.map(newFile(_)).toArray,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)
}

我们知道在Scala里面new一个对象的时候会执行这个对象的主构造函数,一般主构造函数里面会有重要的初始化操作,故我们看一下其主构造函数:

classLogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extendsLogging {
valRecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
valLockFile = ".lock"
valInitialTaskDelayMs = 30*1000
privateval logCreationOrDeletionLock = newObject
//一个集合,一个分区对应一个Log对象
privateval logs = newPool[TopicAndPartition, Log]()
//创建和检验目录是否可读
createAndValidateLogDirs(logDirs)
privateval dirLocks = lockLogDirs(logDirs)
privateval recoveryPointCheckpoints = logDirs.map(dir => (dir, newOffsetCheckpoint(newFile(dir, RecoveryPointCheckpointFile)))).toMap
//TODO 加载目录
//每个目录就对应一个Log对象,把log对象存入了logs里面
loadLogs()

// public, so we can access this from kafka.admin.DeleteTopicTest
//TODO 创建了日志清理对象
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
newLogCleaner(cleanerConfig, logDirs, logs, time = time)
else
null
........
}

这段代码里面出现了一个Log的概念,我用图现在给大家整理了一下LogManagerLog的关系:

attachments-2021-01-QC4EaKqh5ffd191ba88bd.png

logDir:表示用户配置的日志存放路径,通过log.dir配置,可以配置多个。LogManager会维护一个LogDir的列表。
Log: 每个partition的日志目录,代表topic的一个分区副本。LogManager会维护本broker上所有的Log对象。
LogSegment:partition中的日志段对象,每个Log都会有N个日志段。这个日志段包括了日志文件和对应的索引文件。如上的代码中我们看到LogManager创建的时候会做两个动作(1)创建和检查目录(2)加载日志目录的文件,创建和检查目录。

 private def createAndValidateLogDirs(dirs: Seq[File]) {
//TODO 检查目录是否配置重复
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
thrownew KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
//TODO 遍历所有的目录
for(dir <- dirs) {
if(!dir.exists) {
info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
//如果目录不存在则创建目录
val created = dir.mkdirs()
if(!created)
thrownew KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
//TODO 检验目录是否可读
if(!dir.isDirectory || !dir.canRead)
thrownew KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
}
}

加载日志目录的文件

private def loadLogs(): Unit = {
info("Loading logs.")
val startMs = time.milliseconds
//数组里面有线程池
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
//遍历所有的目录
for (dir <- this.logDirs) {
//针对每个目录都创建一个线程池,一个线程池对应一个目录
//因为一个目录里面有多个分区,一个分区就一个线程。
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
//检查上一次关闭是否正常关闭
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
//如果文件存在说明是非正常关闭
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
//读取日志检查点
var recoveryPoints = Map[TopicAndPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}

val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
debug("Loading log '" + logDir.getName + "'")
//根据目录名解析partition的信息,获取到分区信息
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
//每个分区目录就是一个Log对象
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
//把每个Log存入logs里
val previous = this.logs.put(topicPartition, current)
//判断是否有重复的分区目录
if (previous != null) {
thrownew IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}

jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
    }

遍历每个日志目录时,会先读取日志检查点文件,然后读取日志目录下的所有文件,然后创建相关的Log对象。需要注意的是,由于加载过程比较慢,对于每个日志目录都会创建一个线程来加载,最后等所有线程都加载完毕后才会退出loadLogs()方法。因此,创建LogManager的过程是阻塞的,当LogManager创建完成后,说明所有的分区目录都加载进来了。

到这儿LogManager的初始化操作就做完了,接着就调用了startup方法。

 logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup();

接着我们分析一下这个方法:

 /**
* Start the background threads to flush logs and do log cleanup
* 启动了三个后台线程
* 第一个:启动清理日志的线程
* 第二个:启动日志刷盘操作
* 第三个:设置检查点
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}

我们看到里面启动了三个重要的周期性调度的任务:第一个:启动清理日志的线程第二个:启动日志刷盘操作第三个:设置检查点


-     总结     -

到目前为止我们根据场景驱动的方式看到了log.append的方法,但是很多同学其实会有点蒙圈,只是跟着我一直看这个代码,不知道我们这小节分析的代码里面的ReplicaManager、Log这些都是些啥对象,下一小节我们回过头来分析ReplicaManger和Log的初始化操作,分析一下到底这两个是啥对象。我们到现在用图总结一下我们的代码走到哪儿了?

这一小节我们重点分析了LogManager的一些初始化的操作,搞清楚了LogManager、Log、分区、LogDir他们之间的关系,我们看到了LogManger初始化完了会启动三个周期性的调度任务,下一讲我们重点剖析一下这三个调度任务都是干嘛的?

大家加油!

相关内容推荐:

  Kafka源码深度剖析系列(汇总)

更多技术资料及视频请关注微信公众号或添加 QQ交流群领取

attachments-2020-07-fDTAamMF5f1e8391948b7.png

  • 发表于 2021-01-12 11:36
  • 阅读 ( 27 )

0 条评论

请先 登录 后评论
李希沅 | 奈学教育
李希沅 | 奈学教育

奈学教育 | 金牌讲师

51 篇文章

作家榜 »

  1. NX小编 1251 文章
  2. 58沈剑 322 文章
  3. 热爱技术的小仓鼠 169 文章
  4. 奈学教育 150 文章
  5. 李希沅 | 奈学教育 51 文章
  6. 江帅帅 | 奈学教育 32 文章
  7. 林淮川 | 奈学教育 12 文章
  8. 邱鹏超 3 文章