- 本次目标 -
上一小节我们使用场景驱动的方式代码阅读到log.append方法,其实说白了代码就是调用了Log对象的append方法,不过相信很多同学还不知道Log对象是代表的啥?怎么出来的?所以代码看到这儿我们先停一下,现在不得不回头看一下之前的初始化方法了。
- 源码剖析 -
首先我们上一小节分析源码的时候根据场景驱动的方式分析到了KafkaAPi里的handleProducerRequest方法,里面调用了ReplicaManager对象的appendMessages方法,代码如下:
defhandleProducerRequest(request: RequestChannel.Request) {
......
replicaManager.appendMessages(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedMessagesPerPartition,
sendResponseCallback)
. ......
}
所以正常情况下,这一讲我们需要回头看一看ReplicaManager是如何初始化的,我们现在回到KafkaServer的start方法,之前我们只是看了里面关于网络部分的代码,这次我们看里面比较重要的几个方法:
defstartup() {
......
/**
* 启动了LogManager
*/
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup();
/**
* 启动ReplicaManager
* ReplicaManger里面传进去了一个重要的参数logManager
*
*/
replicaManager = newReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()
}
......
在这个里面我们看到了ReplicaManager的初始化方法,我们发现其初始化的时候传进去了一个重要的参数:logManager。所以要想了解ReplicaManager我们就不得不先去分析LogManager是啥,这就是咱们这次课的重点!看一下LogManager创建的方法:
privatedefcreateLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
//读取配置文件
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
topic -> LogConfig.fromProps(defaultProps, configs)
}
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
//TODO 创建了一个LogManager对象
//我们可以点进去看一下他的初始化操作
//注意logDirs代表的是就是我们配置Kafka的时候
//指定的数据存储的目录
newLogManager(logDirs = config.logDirs.map(newFile(_)).toArray,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)
}
我们知道在Scala里面new一个对象的时候会执行这个对象的主构造函数,一般主构造函数里面会有重要的初始化操作,故我们看一下其主构造函数:
classLogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extendsLogging {
valRecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
valLockFile = ".lock"
valInitialTaskDelayMs = 30*1000
privateval logCreationOrDeletionLock = newObject
//一个集合,一个分区对应一个Log对象
privateval logs = newPool[TopicAndPartition, Log]()
//创建和检验目录是否可读
createAndValidateLogDirs(logDirs)
privateval dirLocks = lockLogDirs(logDirs)
privateval recoveryPointCheckpoints = logDirs.map(dir => (dir, newOffsetCheckpoint(newFile(dir, RecoveryPointCheckpointFile)))).toMap
//TODO 加载目录
//每个目录就对应一个Log对象,把log对象存入了logs里面
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
//TODO 创建了日志清理对象
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
newLogCleaner(cleanerConfig, logDirs, logs, time = time)
else
null
........
}
这段代码里面出现了一个Log的概念,我用图现在给大家整理了一下LogManager和Log的关系:
logDir:表示用户配置的日志存放路径,通过log.dir配置,可以配置多个。LogManager会维护一个LogDir的列表。
Log: 每个partition的日志目录,代表topic的一个分区副本。LogManager会维护本broker上所有的Log对象。
LogSegment:partition中的日志段对象,每个Log都会有N个日志段。这个日志段包括了日志文件和对应的索引文件。如上的代码中我们看到LogManager创建的时候会做两个动作:(1)创建和检查目录(2)加载日志目录的文件,创建和检查目录。
private def createAndValidateLogDirs(dirs: Seq[File]) {
//TODO 检查目录是否配置重复
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
thrownew KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
//TODO 遍历所有的目录
for(dir <- dirs) {
if(!dir.exists) {
info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
//如果目录不存在则创建目录
val created = dir.mkdirs()
if(!created)
thrownew KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
//TODO 检验目录是否可读
if(!dir.isDirectory || !dir.canRead)
thrownew KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
}
}
加载日志目录的文件
private def loadLogs(): Unit = {
info("Loading logs.")
val startMs = time.milliseconds
//数组里面有线程池
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
//遍历所有的目录
for (dir <- this.logDirs) {
//针对每个目录都创建一个线程池,一个线程池对应一个目录
//因为一个目录里面有多个分区,一个分区就一个线程。
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
//检查上一次关闭是否正常关闭
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
//如果文件存在说明是非正常关闭
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
//读取日志检查点
var recoveryPoints = Map[TopicAndPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
debug("Loading log '" + logDir.getName + "'")
//根据目录名解析partition的信息,获取到分区信息
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
//每个分区目录就是一个Log对象
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
//把每个Log存入logs里
val previous = this.logs.put(topicPartition, current)
//判断是否有重复的分区目录
if (previous != null) {
thrownew IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
遍历每个日志目录时,会先读取日志检查点文件,然后读取日志目录下的所有文件,然后创建相关的Log对象。需要注意的是,由于加载过程比较慢,对于每个日志目录都会创建一个线程来加载,最后等所有线程都加载完毕后才会退出loadLogs()方法。因此,创建LogManager的过程是阻塞的,当LogManager创建完成后,说明所有的分区目录都加载进来了。
到这儿LogManager的初始化操作就做完了,接着就调用了startup方法。
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup();
接着我们分析一下这个方法:
/**
* Start the background threads to flush logs and do log cleanup
* 启动了三个后台线程
* 第一个:启动清理日志的线程
* 第二个:启动日志刷盘操作
* 第三个:设置检查点
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
我们看到里面启动了三个重要的周期性调度的任务:第一个:启动清理日志的线程第二个:启动日志刷盘操作第三个:设置检查点
- 总结 -
到目前为止我们根据场景驱动的方式看到了log.append的方法,但是很多同学其实会有点蒙圈,只是跟着我一直看这个代码,不知道我们这小节分析的代码里面的ReplicaManager、Log这些都是些啥对象,下一小节我们回过头来分析ReplicaManger和Log的初始化操作,分析一下到底这两个是啥对象。我们到现在用图总结一下我们的代码走到哪儿了?
这一小节我们重点分析了LogManager的一些初始化的操作,搞清楚了LogManager、Log、分区、LogDir他们之间的关系,我们看到了LogManger初始化完了会启动三个周期性的调度任务,下一讲我们重点剖析一下这三个调度任务都是干嘛的?
大家加油!!!
相关内容推荐:
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!