我们知道,写入kafka的消息都需要指定一个Topic(主题),Kafka可以根据Topic来对消息进行区分,每个Topic分为多个Partition(分区)。
Partition的概念是为了实现高伸缩性和提供负载均衡的作用,可以很好的让一个比较大的(数据量级)Topic中的消息可以分布到多台broker机器上。不仅如此,也可以提高并行能力,因为水平扩展后可以以Partition为粒度进行读写,这样每个broker节点都能独立执行各自分区的读写请求;
Partition下就是Log的消息体,每条消息都只会保存在某一个分区中,而且在每个Partition下消息都是append模式写入的,也就是说,每个Partition下的消息都是顺序性的。
Kafka消息设计方式就是这样的三层结构:主题-分区-消息;说到设计,不同的分布式系统对分区的叫法也不大一样,在Kafka中的概念是Partition(分区),在ES中叫做Shard(分片),而在HBase中被称为Region。从表面上来看实现原理可能不尽相同,但对底层实现的思想却都是一致的。
话题扯回来,今天这篇文章分享的主题是:Kafka消息格式。
消息存储在哪,取决于broker端的log.dirs参数决定,目录文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 . ├── __consumer_offsets-0 │   ├── 00000000000000000000.index │   ├── 00000000000000000000.log │   ├── 00000000000000000000.timeindex │   └── leader-epoch-checkpoint ├── log-start-offset-checkpoint ├── meta.properties ├── recovery-point-offset-checkpoint ├── replication-offset-checkpoint ├── test1-0 ├── test2-0 └── test2-1     ├── 00000000000000000000.index     ├── 00000000000000000000.log     ├── 00000000000000000000.timeindex     ├── 00000000000000000013.index     ├── 00000000000000000013.log     ├── 00000000000000000013.timeindex     └── leader-epoch-checkpoint 
位移主题 __consumer_offsets是位移主题,老版本的Consumer的位移管理是在Zookeeper,在新版本中位移管理机制中是作为一个内部Topic的方式来记录位移。
新版本位移管理机制其实也比较简单,是将Consumer的位移数据作为一条条普通的 Kafka 消息,提交到__consumer_offsets中,以前读写位移操作就由Zookeeper变成了Kafka自身。在位移主题中的消息包含三类,消费者组注册消息(Group Metadata)、消费者组的已提交位移消息(Offset Commit)和Tombstone消息;其写入提交过程都可以交由Kafka自动管理起来,应用上并没有太大的差异。
注册消息 
我们随着获取注册消息的key和value方法一路看下去
1 2 val  key = GroupMetadataManager .groupMetadataKey(group.groupId)val  value = GroupMetadataManager .groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
注册消息的key
1 2 3 4 case  class  GroupMetadataKey (version: Short , key: String  ) extends  BaseKey       override  def  toString String  = key } 
由GroupMetadataManager类中的groupMetadataKey方法而来
1 2 3 4 5 6 7 8 9 10 11 private [group] def  groupMetadataKey String ): Array [Byte ] = {         val  key = new  Struct (CURRENT_GROUP_KEY_SCHEMA )     key.set(GROUP_KEY_GROUP_FIELD , group) 		     val  byteBuffer = ByteBuffer .allocate(2   + key.sizeOf)     byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION )   	     key.writeTo(byteBuffer)     byteBuffer.array()   } 
注册消息的key构造完毕,接下来是value。groupMetadataValue 方法会将消费者组重要的元数据写入到字节数组并返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private [group] def  groupMetadataValue GroupMetadata ,                                        assignment: Map [String , Array [Byte ]],                                         apiVersion: ApiVersion ): Array [Byte ] = {   	...   	     value.set(PROTOCOL_TYPE_KEY , groupMetadata.protocolType.getOrElse("" ))     value.set(GENERATION_KEY , groupMetadata.generationId)     value.set(PROTOCOL_KEY , groupMetadata.protocolOrNull)     value.set(LEADER_KEY , groupMetadata.leaderOrNull)        val  memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>       val  memberStruct = value.instance(MEMBERS_KEY )       memberStruct.set(MEMBER_ID_KEY , memberMetadata.memberId)       memberStruct.set(CLIENT_ID_KEY , memberMetadata.clientId)       memberStruct.set(CLIENT_HOST_KEY , memberMetadata.clientHost)       memberStruct.set(SESSION_TIMEOUT_KEY , memberMetadata.sessionTimeoutMs) 			...              val  metadata = memberMetadata.metadata(protocol)       memberStruct.set(SUBSCRIPTION_KEY , ByteBuffer .wrap(metadata))       val  memberAssignment = assignment(memberMetadata.memberId)       assert(memberAssignment != null )       memberStruct.set(ASSIGNMENT_KEY , ByteBuffer .wrap(memberAssignment))       memberStruct     }     value.set(MEMBERS_KEY , memberArray.toArray)   	     val  byteBuffer = ByteBuffer .allocate(2   + value.sizeOf)     byteBuffer.putShort(version)     value.writeTo(byteBuffer)     byteBuffer.array()   } 
随后会组装成消息体,通过appendForGroup方法写入到位移主题中。
1 2 3 4 5 6 7 val  records = {          val  buffer = ByteBuffer .allocate(AbstractRecords .estimateSizeInBytes(magicValue, compressionType,             Seq (new  SimpleRecord (timestamp, key, value)).asJava))           val  builder = MemoryRecords .builder(buffer, magicValue, compressionType, timestampType, 0 L)           builder.append(timestamp, key, value)           builder.build() 
注册消息体主体结构为:<消费者组名称 group.groupId , 消费者组主要的元数据信息 groupMetadata / 消费订阅信息 metadata / 分配方案信息 memberAssignment>。
位移消息 
构造位移消息的key
1 2 3 4 5 6 7 8 case  class  OffsetKey (version: Short , key: GroupTopicPartition  ) extends  BaseKey    override  def  toString String  = key.toString } OffsetKey (version, GroupTopicPartition (group, new  TopicPartition (topic, partition)))
位移消息的value,在offsetCommitValue 方法决定了 value 中都有哪些元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 val  offsetAndMetadata = GroupMetadataManager .readOffsetMessageValue(record.value)... private [group] def  offsetCommitValue OffsetAndMetadata ,                                       apiVersion: ApiVersion ): Array [Byte ] = {   	     val  (version, value) = {              if  (apiVersion < KAFKA_2_1_IV0  || offsetAndMetadata.expireTimestamp.nonEmpty) { 				...       } else  if  (apiVersion < KAFKA_2_1_IV1 ) {         ...       } else  {                  val  value = new  Struct (OFFSET_COMMIT_VALUE_SCHEMA_V3 )         value.set(OFFSET_VALUE_OFFSET_FIELD_V3 , offsetAndMetadata.offset)         value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 ,           offsetAndMetadata.leaderEpoch.orElse(RecordBatch .NO_PARTITION_LEADER_EPOCH ))         value.set(OFFSET_VALUE_METADATA_FIELD_V3 , offsetAndMetadata.metadata)         value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 , offsetAndMetadata.commitTimestamp)         (3 , value)       }     } 		     val  byteBuffer = ByteBuffer .allocate(2   + value.sizeOf)     byteBuffer.putShort(version.toShort)     value.writeTo(byteBuffer)     byteBuffer.array()   } 
随后也会组装消息体SimpleRecord,通过MemoryRecords.builder后,由appendForGroup方法发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 val  records = filteredOffsetMetadata.map { case  (topicPartition, offsetAndMetadata) =>            val  key = GroupMetadataManager .offsetCommitKey(group.groupId, topicPartition)             val  value = GroupMetadataManager .offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)             new  SimpleRecord (timestamp, key, value)           } val  builder = MemoryRecords .builder(buffer, magicValue, compressionType, timestampType, 0 L, time.milliseconds(),            producerId, producerEpoch, 0 , isTxnOffsetCommit, RecordBatch .NO_PARTITION_LEADER_EPOCH ) records.foreach(builder.append) val  entries = Map (offsetTopicPartition -> builder.build())
位移消息的主体格式是:< version / <消费者组名,主题,分区> , 位移值 / Leader Epoch值 / 元数据信息 / 时间戳>。
Tombstone消息 
翻译过来就是墓碑消息,Tombstone消息的Value 为 null。Tombstone消息在注册消息和位移消息中都可能出现。如果在注册消息中出现,表示Kafka可以将该消费者组元数据从位移主题中删除;如果在位移消息中出现了,则表示Kafka能够将该消费者组在某主题分区上的位移提交数据删除。这很好的保证了,内部位移主题不会持续增加磁盘占用空间。
Checkpoint文件 Log-start-offset-checkpoint文件是用来标识LogStartOffset(日志的起始偏移量),recovery-point-offset-checkpoint和replication-offset-checkpoint这两个文件分别对应了Log End Offset(日志末端位移)和High Watermark(副本的高水印值)。
会有定时任务负责将所有分区的LogStartOffset,LEO,HW写到以上文件中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms,log.flush.offset. checkpoint.interval.ms,replica.high.watermark.checkpoint.interval.ms决定。
对应的机制是为了解决数据丢失或数据不一致的问题,在Kafka中还需要结合Leader Epoch来共同解决,日后可详细再言。
消息日志 消息存储的目录格式默认为:topic-partition_num;展示的Topic为test1(一个分区)和test2(两个分区):
1 2 3 4 5 6 7 8 9 10 ├── test1-0 ├── test2-0 └── test2-1     ├── 00000000000000000000.index     ├── 00000000000000000000.log     ├── 00000000000000000000.timeindex     ├── 00000000000000000013.index     ├── 00000000000000000013.log     ├── 00000000000000000013.timeindex     └── leader-epoch-checkpoint 
文章在开篇谈到,一个分区对应一个日志(Log),为了防止日志过大,引入了日志分段(LogSegment)概念,切分成多个较小文件;Log在物理上是以文件夹的形式存储,而每个LogSegment对应着磁盘上的日志文件:以”.log”为文件后缀,和两个索引文件:偏移量的索引文件(以”.index”为文件后缀)和时间戳的索引文件(以”.timeindex”为文件后缀),而leader-epoch-checkpoint保存的是Leader Epoch的值(解决副本数据一致性需要)。
LogSegment 
LogSegment的大小取决于broker端的log.segment.bytes参数决定,我们先看看LogSegment的构造参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class  LogSegment  private [log] (val log: FileRecords ,                                val lazyOffsetIndex: LazyOffsetIndex ,                                val lazyTimeIndex: LazyTimeIndex ,                                val txnIndex: TransactionIndex ,                                val baseOffset: Long ,                                val indexIntervalBytes: Int ,                                val rollJitterMs: Long ,                                val time: Time  ) extends  Logging ... def  append Long ,             largestTimestamp: Long ,              shallowOffsetOfMaxTimestamp: Long ,              records: MemoryRecords ): Unit  = {...} def  read          Long , maxOffset: Option [Long ],           maxSize: Int ,           maxPosition: Long  = size,          minOneMessage: Boolean  = false ): FetchDataInfo  = {...} def  recover ProducerStateManager ,             leaderEpochCache: Option [LeaderEpochFileCache ] = None ): Int  = {...} 
通过几项参数,大概可以猜测出这样构造LogSegment的意义。每个segment保存的是消息实体,FileRecords是必不可少,通过位移索引和时间戳索引可以快速定位到该segment的消息。在LogSegment类中,需要读写消息(append和read);需要恢复日志段(recover),broker启动时需要从磁盘文件中加载所有的日志段信息到内存中等功能的实现。
可以通过Kafka client命令来查看消息存储的详细内容:
1 2 3 4 5 6 bin/kafka-run-class.sh kafka.tools.DumpLogSegments \  	--files /tmp/kafka-logs/test2-1/00000000000000000000.log \ 	--print-data-log  | offset: 342 CreateTime: 1615706871552 keysize: -1 valuesize: 50 sequence: -1 headerKeys: [] payload: {"type" :"bootstrap-insert" ,"is_valid" :0,"version" :4,"ts" :1613980836,"create_time" :"2020-05-04 07:41:29" ,"update_time" :"2020-07-04 12:40:15" } 
根据位移值,我们可以通过.index文件快速查找消息所在文件位置;根据时间戳,我们可以通过.timeindex查找;所以快速的查找的根本原因,是基于消息存储格式和机制来决定的。
Message 
接下来继续看下消息Message内部结构,也就是LogSegment构造类中的FileRecords参数是怎么样的格式。
前面有追踪过位移主题内的消息体的定义,都是采用的map格式,这是便于快速取得位移值;但是真正的消息实体和位移主题的消息格式还是有很大的不同,要考虑消息的完整性、元数据信息的表示、消息属性等等,另外我们只是讨论消息未压缩 的情景。
Kafka的消息结构在设计时,有过几次设计变化:在Kafka 0.10.0版本之前都是采用的是无timestamp字段(v0版本),在其之后的版本添加了timestamp字段,表示消息的时间戳(v1版本)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 v0 Message  => Crc  MagicByte  Attributes  Key  Value   Crc  => int32   MagicByte  => int8   Attributes  => int8   Key  => bytes   Value  => bytes    v1 (supported since 0.10 .0 ) Message  => Crc  MagicByte  Attributes  Key  Value 	   Crc  => int32 	   MagicByte  => int8 	 	 	   Attributes  => int8 	   Timestamp  => int64 	   Key  => bytes 	   Value  => bytes 
实现类是message类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 object  Message       val  CrcOffset  = 0    val  CrcLength  = 4          val  MagicOffset  = CrcOffset  + CrcLength    val  MagicLength  = 1          val  AttributesOffset  = MagicOffset  + MagicLength    val  AttributesLength  = 1       val  TimestampOffset  = AttributesOffset  + AttributesLength    val  TimestampLength  = 8       val  KeySizeOffset_V0  = AttributesOffset  + AttributesLength    val  KeySizeOffset_V1  = TimestampOffset  + TimestampLength    val  KeySizeLength  = 4     val  KeyOffset_V0  = KeySizeOffset_V0  + KeySizeLength    val  KeyOffset_V1  = KeySizeOffset_V1  + KeySizeLength       val  ValueSizeLength  = 4  } 
发送一条key=”key”,value=”value”的消息,在v1版本中最小则会占用22+8+12=42B。
在kafka0.11.0版本后,改动是比较大的,在以前的版本,value是一个消息集(Message Set),而在v2版本中,直接把Crc检查校验,attributes的元数据属性等信息提入了Record Batch属性中,还参考了Protocol Buffer引入了变长整型(Varints)和ZigZag编码大大减少消息体大小。
v2版本Record Batch格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 baseOffset: int64  batchLength: int32 partitionLeaderEpoch: int32 magic: int8 (current magic value is 2 ) crc: int32 attributes: int16 	bit 0 ~2 : 		0 : no compression 		1 : gzip 		2 : snappy 		3 : lz4 	bit 3 : timestampType 	bit 4 : isTransactional (0  means not transactional) 	bit 5 : isControlBatch (0  means not a control batch) 	bit 6 ~15 : unused lastOffsetDelta: int32 firstTimestamp: int64 maxTimestamp: int64 producerId: int64 producerEpoch: int16 baseSequence: int32 records: [Record ] 
v2版本Record格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 length: varint attributes: int8 	bit 0 ~7 : unused timestampDelta: varint offsetDelta: varint keyLength: varint key: byte[] valueLen: varint value: byte[] Headers  => [Header ]
v2版本格式中增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息:
1 2 3 4 headerKeyLength: varint headerKey: String  headerValueLength: varint Value : byte[]
根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节;而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节)。
所以发送一条key=”key”,value=”value”的消息,在v2版本中最小则会占9+3+3=15B。在v0和v1版本的消息格式中,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而v2版本的消息格式中与长度有关的字段都是采用Varints的编码,只需要一个字节,这也会节省很多空间大小。
Kafka日志消息存储结构是比较复杂的,底层结构决定着上层功能。从消息结构中,也可以窥得一丝kafka设计的原理和实现的机制。在消息体结构介绍中,对引用的变长整型(Varints)和ZigZag编码了解的较少,所以讲述的不是很详细,不过也需要适可而止的深究技术的深度。
完。
Refer 
[1] A Guide To The Kafka Protocol 
[2] kafka design documentation