Kafka日志消息解析

我们知道,写入kafka的消息都需要指定一个Topic(主题),Kafka可以根据Topic来对消息进行区分,每个Topic分为多个Partition(分区)。

Partition的概念是为了实现高伸缩性和提供负载均衡的作用,可以很好的让一个比较大的(数据量级)Topic中的消息可以分布到多台broker机器上。不仅如此,也可以提高并行能力,因为水平扩展后可以以Partition为粒度进行读写,这样每个broker节点都能独立执行各自分区的读写请求;

Partition下就是Log的消息体,每条消息都只会保存在某一个分区中,而且在每个Partition下消息都是append模式写入的,也就是说,每个Partition下的消息都是顺序性的。

Kafka消息设计方式就是这样的三层结构:主题-分区-消息;说到设计,不同的分布式系统对分区的叫法也不大一样,在Kafka中的概念是Partition(分区),在ES中叫做Shard(分片),而在HBase中被称为Region。从表面上来看实现原理可能不尽相同,但对底层实现的思想却都是一致的。

话题扯回来,今天这篇文章分享的主题是:Kafka消息格式。

消息存储在哪,取决于broker端的log.dirs参数决定,目录文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.
├── __consumer_offsets-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   └── leader-epoch-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
├── replication-offset-checkpoint
├── test1-0
├── test2-0
└── test2-1
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
├── 00000000000000000013.index
├── 00000000000000000013.log
├── 00000000000000000013.timeindex
└── leader-epoch-checkpoint

位移主题

__consumer_offsets是位移主题,老版本的Consumer的位移管理是在Zookeeper,在新版本中位移管理机制中是作为一个内部Topic的方式来记录位移。

新版本位移管理机制其实也比较简单,是将Consumer的位移数据作为一条条普通的 Kafka 消息,提交到__consumer_offsets中,以前读写位移操作就由Zookeeper变成了Kafka自身。在位移主题中的消息包含三类,消费者组注册消息(Group Metadata)、消费者组的已提交位移消息(Offset Commit)和Tombstone消息;其写入提交过程都可以交由Kafka自动管理起来,应用上并没有太大的差异。

注册消息

我们随着获取注册消息的key和value方法一路看下去

1
2
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)

注册消息的key

1
2
3
4
case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
// 保存的是消费者组名称
override def toString: String = key
}

由GroupMetadataManager类中的groupMetadataKey方法而来

1
2
3
4
5
6
7
8
9
10
11
private[group] def groupMetadataKey(group: String): Array[Byte] = {

val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
key.set(GROUP_KEY_GROUP_FIELD, group)
// 构造ByteBuffer对象,容纳version和key
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
// 写入byteBuffer
key.writeTo(byteBuffer)
byteBuffer.array()
}

注册消息的key构造完毕,接下来是value。groupMetadataValue 方法会将消费者组重要的元数据写入到字节数组并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* GroupMetadata 消费者组元数据对象
* assignment 分区消费分配方案
* apiVersion api版本号
*/
private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]],
apiVersion: ApiVersion): Array[Byte] = {
...
// 依次写入消费者组主要的元数据信息
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId)
value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
value.set(LEADER_KEY, groupMetadata.leaderOrNull)

val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
val memberStruct = value.instance(MEMBERS_KEY)
memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
...
// 写入消费订阅信息和费分配方案信息
val metadata = memberMetadata.metadata(protocol)
memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
val memberAssignment = assignment(memberMetadata.memberId)
assert(memberAssignment != null)
memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
memberStruct
}

value.set(MEMBERS_KEY, memberArray.toArray)
// 写入byteBuffer 并返回
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(version)
value.writeTo(byteBuffer)
byteBuffer.array()
}

随后会组装成消息体,通过appendForGroup方法写入到位移主题中。

1
2
3
4
5
6
7
// build records
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
Seq(new SimpleRecord(timestamp, key, value)).asJava))
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
builder.append(timestamp, key, value)
builder.build()

注册消息体主体结构为:<消费者组名称 group.groupId , 消费者组主要的元数据信息 groupMetadata / 消费订阅信息 metadata / 分配方案信息 memberAssignment>。

位移消息

构造位移消息的key

1
2
3
4
5
6
7
8
// 参数是version和GroupTopicPartition
case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {

override def toString: String = key.toString
}

// GroupTopicPartition 类型是 <消费者组名,主题,分区> 的三元组。
OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))

位移消息的value,在offsetCommitValue 方法决定了 value 中都有哪些元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
...
private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
apiVersion: ApiVersion): Array[Byte] = {
// 根据不同的消息格式版本,创建对应的对象
val (version, value) = {

if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) {
...
} else if (apiVersion < KAFKA_2_1_IV1) {
...
} else {
// 依次写入位移值、Leader Epoch值、自定义元数据以及时间戳
val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
value.set(OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
(3, value)
}
}
// 构建ByteBuffer,写入消息格式版本和结构体并返回
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(version.toShort)
value.writeTo(byteBuffer)
byteBuffer.array()
}

随后也会组装消息体SimpleRecord,通过MemoryRecords.builder后,由appendForGroup方法发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}

// build MemoryRecords
val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L, time.milliseconds(),
producerId, producerEpoch, 0, isTxnOffsetCommit, RecordBatch.NO_PARTITION_LEADER_EPOCH)
// 可能会有多个record,遍历append build
records.foreach(builder.append)

// entries终将会传入appendForGroup方法
val entries = Map(offsetTopicPartition -> builder.build())

位移消息的主体格式是:< version / <消费者组名,主题,分区> , 位移值 / Leader Epoch值 / 元数据信息 / 时间戳>。

Tombstone消息

翻译过来就是墓碑消息,Tombstone消息的Value 为 null。Tombstone消息在注册消息和位移消息中都可能出现。如果在注册消息中出现,表示Kafka可以将该消费者组元数据从位移主题中删除;如果在位移消息中出现了,则表示Kafka能够将该消费者组在某主题分区上的位移提交数据删除。这很好的保证了,内部位移主题不会持续增加磁盘占用空间。

Checkpoint文件

Log-start-offset-checkpoint文件是用来标识LogStartOffset(日志的起始偏移量),recovery-point-offset-checkpoint和replication-offset-checkpoint这两个文件分别对应了Log End Offset(日志末端位移)和High Watermark(副本的高水印值)。

会有定时任务负责将所有分区的LogStartOffset,LEO,HW写到以上文件中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.mslog.flush.offset. checkpoint.interval.msreplica.high.watermark.checkpoint.interval.ms决定。

对应的机制是为了解决数据丢失或数据不一致的问题,在Kafka中还需要结合Leader Epoch来共同解决,日后可详细再言。

消息日志

消息存储的目录格式默认为:topic-partition_num;展示的Topic为test1(一个分区)和test2(两个分区):

1
2
3
4
5
6
7
8
9
10
├── test1-0
├── test2-0
└── test2-1
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
├── 00000000000000000013.index
├── 00000000000000000013.log
├── 00000000000000000013.timeindex
└── leader-epoch-checkpoint

文章在开篇谈到,一个分区对应一个日志(Log),为了防止日志过大,引入了日志分段(LogSegment)概念,切分成多个较小文件;Log在物理上是以文件夹的形式存储,而每个LogSegment对应着磁盘上的日志文件:以”.log”为文件后缀,和两个索引文件:偏移量的索引文件(以”.index”为文件后缀)和时间戳的索引文件(以”.timeindex”为文件后缀),而leader-epoch-checkpoint保存的是Leader Epoch的值(解决副本数据一致性需要)。

LogSegment

LogSegment的大小取决于broker端的log.segment.bytes参数决定,我们先看看LogSegment的构造参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* log Kafka消息对象
* lazyOffsetIndex 位移索引文件
* lazyTimeIndex 时间戳索引文件
* txnIndex 已中止事务索引文件
* baseOffset 每个日志段对象的起始位移
* indexIntervalBytes 日志对象新增索引项的频率
*/
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyOffsetIndex,
val lazyTimeIndex: LazyTimeIndex,
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging

...
/**
* largestOffset 待写入消息批次中消息的最大位移值
* largestTimestamp 最大时间戳
* shallowOffsetOfMaxTimestamp 最大时间戳对应消息的位移
* records 待写入的消息体
*/
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {...}

/**
* startOffset 要读取的第一条消息的位移
* maxSize 能读取的最大字节数
* maxPosition 能读到的最大文件位置
* minOneMessage 是否允许在消息体过大时至少返回第一条消息
*/
def read(startOffset:
Long, maxOffset: Option[Long],
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {...}

/**
* leaderEpochCache 在恢复过程中缓存的leaderEpoch值
*/
def recover(producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {...}

通过几项参数,大概可以猜测出这样构造LogSegment的意义。每个segment保存的是消息实体,FileRecords是必不可少,通过位移索引和时间戳索引可以快速定位到该segment的消息。在LogSegment类中,需要读写消息(append和read);需要恢复日志段(recover),broker启动时需要从磁盘文件中加载所有的日志段信息到内存中等功能的实现。

可以通过Kafka client命令来查看消息存储的详细内容:

1
2
3
4
5
6
bin/kafka-run-class.sh kafka.tools.DumpLogSegments \ 
--files /tmp/kafka-logs/test2-1/00000000000000000000.log \
--print-data-log


| offset: 342 CreateTime: 1615706871552 keysize: -1 valuesize: 50 sequence: -1 headerKeys: [] payload: {"type":"bootstrap-insert","is_valid":0,"version":4,"ts":1613980836,"create_time":"2020-05-04 07:41:29","update_time":"2020-07-04 12:40:15"}

根据位移值,我们可以通过.index文件快速查找消息所在文件位置;根据时间戳,我们可以通过.timeindex查找;所以快速的查找的根本原因,是基于消息存储格式和机制来决定的。

Message

接下来继续看下消息Message内部结构,也就是LogSegment构造类中的FileRecords参数是怎么样的格式。

前面有追踪过位移主题内的消息体的定义,都是采用的map格式,这是便于快速取得位移值;但是真正的消息实体和位移主题的消息格式还是有很大的不同,要考虑消息的完整性、元数据信息的表示、消息属性等等,另外我们只是讨论消息未压缩的情景。

Kafka的消息结构在设计时,有过几次设计变化:在Kafka 0.10.0版本之前都是采用的是无timestamp字段(v0版本),在其之后的版本添加了timestamp字段,表示消息的时间戳(v1版本)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
v0
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes

v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
// CRC用于检查代理和消息的完整
Crc => int32
// 版本id,用于允许消息二进制格式的向后兼容演变。当前值为1。
MagicByte => int8
// 保存有关消息的元数据属性
// 最低3位包含用于消息的压缩解码器
// 最低的第4位用于表示时间戳类型,0代表createtime,1代表logappendtime。
Attributes => int8
// 消息时间戳
Timestamp => int64
// 分区分配的可选消息密钥
Key => bytes
// 实际消息内容,可能是包含的一个消息集。
Value => bytes

实现类是message类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object Message {

// crc 消息的校验码,防止消息错误(4B)
val CrcOffset = 0
val CrcLength = 4

// magic 消息格式的版本号(1B)
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1

// attributes 消息的属性,比如压缩类型,时间戳类型,创建时间/追加时间 (1B)
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
// timestamp 时间戳信息(8B)
val TimestampOffset = AttributesOffset + AttributesLength
val TimestampLength = 8
// key (4B)
val KeySizeOffset_V0 = AttributesOffset + AttributesLength
val KeySizeOffset_V1 = TimestampOffset + TimestampLength
val KeySizeLength = 4
val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
// value (4B)
val ValueSizeLength = 4
}

发送一条key=”key”,value=”value”的消息,在v1版本中最小则会占用22+8+12=42B。

在kafka0.11.0版本后,改动是比较大的,在以前的版本,value是一个消息集(Message Set),而在v2版本中,直接把Crc检查校验,attributes的元数据属性等信息提入了Record Batch属性中,还参考了Protocol Buffer引入了变长整型(Varints)和ZigZag编码大大减少消息体大小。

v2版本Record Batch格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 表示当前RecordBatch的起始位移 (8B)
baseOffset: int64
// 计算partition leader epoch到headers之间的长度 (4B)
batchLength: int32
// 用来确保数据可靠性 4B
partitionLeaderEpoch: int32
// magic等于2 (1B)
magic: int8 (current magic value is 2)
// 4B
crc: int32
// 2B
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
// 4B
lastOffsetDelta: int32
// 8B
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
// batch records
records: [Record]

v2版本Record格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 消息总长度 3B
length: varint
// 弃用
attributes: int8
bit 0~7: unused
// 时间戳增量 3B
timestampDelta: varint
// 位移增量 3B
offsetDelta: varint
// key
keyLength: varint
key: byte[]
// value
valueLen: varint
value: byte[]
Headers => [Header]

v2版本格式中增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息:

1
2
3
4
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节;而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节)。

所以发送一条key=”key”,value=”value”的消息,在v2版本中最小则会占9+3+3=15B。在v0和v1版本的消息格式中,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而v2版本的消息格式中与长度有关的字段都是采用Varints的编码,只需要一个字节,这也会节省很多空间大小。


Kafka日志消息存储结构是比较复杂的,底层结构决定着上层功能。从消息结构中,也可以窥得一丝kafka设计的原理和实现的机制。在消息体结构介绍中,对引用的变长整型(Varints)和ZigZag编码了解的较少,所以讲述的不是很详细,不过也需要适可而止的深究技术的深度。

完。

Refer

[1] A Guide To The Kafka Protocol

[2] kafka design documentation