新一年的技术叨叨

intro1:kafk异常io图,kafka rebalance和reassigne区别
intro2: kafka真的只能从leader partition读写吗
intro3: 一个不太正常的elasticsaerch分片分布场景下的扩容

从一些系统监控图能看到些什么

最近在排查线上问题时,留意到几台Kafka机器的基础的zabbix io/cpu监控,图一所示平稳读操作中夹杂一些尖峰,图二是之前放大的图


放大看,io持续还是明显的

从上述图能够看出,Kafka非常少有大量读磁盘的操作,主要还是定期批量写磁盘操作,而且顺序写操作磁盘非常高效。但为什么上图最近读磁盘增加了许多呢?我们知道Kafka在写消息时,消息实际上时写入page cache(内存)中,然后由异步线程刷盘,消息从page cache落地磁盘。而读取消息时,是直接由page cache转入socket发送出去[也即大家说的零拷贝技术]。如果page cache没有命中相应数据(offset),Kafka就会通过idx文件,即索引,定位到对应的数据文件,将整个文件从磁盘加载消息到page cache,然后再从socket发出去,此时就会发生不小的磁盘读操作。
也就是说,page cache常没有命中消费的数据,这通常是消费延迟导致,查看消费监控,确实存在一些groupid的消费者存在严重的消费延迟情况,甚至还有程序经常从头开始消费。
这些都应该是尽量避免的,因为 使用page cahce当遇到问题脏页,内存回收等问题时,易造成消息读写的延迟,而且过多page cache也影响linux系统本身性能,如果未关闭swap问题可能更严重了。 不过据说来自阿里的 RocketMQ 在这方面做了些优化。

Kafka rebalance vs Kafka reassigne

还有一点的是,我们知道Kafka读写的都是partition,partition分为leader/follower角色,在过去对于Kafka的partition来说,只有leader才会进行读写操作,folower仅进行同步/复制/备份(当然,有些例外如测试等, 不过现在更例外了,请看下文),follower对于consumer/producer等client来说是透明的,这也可以认为是kafka数据一致性的方法。
同Mysql的实现不同,Mysql分为主从,通常会针对主从作读写分离操作,实现性能提升,而Kafka则分为多个partition,读写分离到多partition上,他们的负载均衡就是通过 broker(机器)+partition+角色(leader) 的均衡实现的。

1
2
3
4
We assume that there are many partitions in a cluster and try to
spread the leaders evenly among brokers. That way, even for a
particularpartition, all clients have to talk to a single broker,
the overall workload is still balanced.

不过,是否读replica这个做法在多数据中心时是有一定益处的,为此Kafka就做了改进,允许消费者从最近的副本进行获取KIP-392: Allow consumers to fetch from closest replica
KAFKA-8443 Broker support for fetch from followers #6832, Release Plan 2.4.0
这个改进可以说更上层楼,在目前最新的2.4.0版本发布,如果你通过百度查询,国内相关文章不多,所以需要自己阅读官方指导了。
其次,可以看到一些kafka server的监控软件,如Kafka Manager有个leader skew,用来衡量leader倾斜度,简单列举下:

1
2
3
4
Brokers Spread:看作broker使用率,该topic分布的broker占集群broker总量百分比
Brokers Skew:该topic偏离均值的broker占集群broker总量百分比
Brokers Leader Skew:Percentage of brokers having more partitions as leader than the average,
leader partition是否存在倾斜,即没有达到平均leader比重的broker数量占集群broker总量百分比

这是因为Kafka所有读写都在leader上进行, broker leader skew过高会导致读写负载不均衡,
Kafka有个参数 auto.leader.rebalance.enable,默认为true 可以使kafka自动定期做leader的rebalance,消除写不均衡的问题。

那么这个 rebalance 和reassign 有关系吗

需要指出,这里的Kafka rebalance,不是指kafka consumer的rebalance,而是kafka partition的rebalance,也就是 kafka leader rebalance,即针对partition的 均衡,与此相关的还有一个 partition reassign,都是实现kafka server的各个broker之间的负载均衡的方式。
有的文章没有注意把二者混为一谈,认为partition rebalance 会引起partition重分配, 导致对于大吞吐量的应用在leader rebalance 时,可能导致partition不可用的,无法读写数据,在生产环境关闭该功能,或者 rebalance 可能短暂导致如partition不可写等,其实无此必要。
partition reblance 和 partition reassign 是两个概念,reassign 是在监听ZK事件时触发的,比如我们通过ReassignPartitionsCommand命令操作分片迁移。
Kafka rebalance判断依据是 imbalanceRatio 因子的大小,而扩展节点(broker)不会引起imbalanceRatio的变化,因为 totalTopicPartitionsForBroker不变,totalTopicPartitionsNotLedByBroker 和新的broker无关,除非已经手动设置作修改。
rebalance是否会引起reasign呢?官方没有详细涉及,从笔者也正在阅读这块代码,尚未发现rebalance会导致resign的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
ControllerContext.scala
...
private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
// get all the active brokers
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
case(topicAndPartition, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.foreach {
case(leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.foreach {
case(topicPartition, replicas) => {
inLock(controllerContext.controllerLock) {
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
controllerContext.partitionsBeingReassigned.size == 0 &&
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
}
}
}
}
}
}
/**
* This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
* reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
* Reassigning replicas for a partition goes through a few steps listed in the code.
* RAR = Reassigned replicas
* OAR = Original list of replicas for partition
* AR = current assigned replicas
*
* 1. Update AR in ZK with OAR + RAR.
* 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
* of the leader epoch in zookeeper.
* 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
* 4. Wait until all replicas in RAR are in sync with the leader.
* 5 Move all replicas in RAR to OnlineReplica state.
* 6. Set AR to RAR in memory.
* 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
* will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
* In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
* RAR - OAR back in the isr.
* 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
* isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
* After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
* 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
* the replicas in OAR - RAR to physically delete the replicas on disk.
* 10. Update AR in ZK with RAR.
* 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
* 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
*
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR leader/isr
* {1,2,3} 1/{1,2,3} (initial state)
* {1,2,3,4,5,6} 1/{1,2,3} (step 2)
* {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
* {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
* {1,2,3,4,5,6} 4/{4,5,6} (step 8)
* {4,5,6} 4/{4,5,6} (step 10)
*
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
case false =>
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
//1. Update AR in ZK with OAR + RAR.
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
//3. replicas in RAR - OAR -> NewReplica
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
case true =>
//4. Wait until all replicas in RAR are in sync with the leader.
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
// a new AR (using RAR) and same isr to every broker in RAR
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
//10. Update AR in ZK with RAR.
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}

partition reassign又具体做了些什么,上文 onPartitionReassignment 部分可以看出大概,不必追踪代码具体细节,只通过上述代码里方法名字和注释,大概可以了解到,在controller接收到reassign指令,并updateLeaderEpochAndSendRequest后,会新分配一个副本,并努力追上 Leader 的位移,从而加入ISR(In Sync Replicas),待全部的RAR副本(Reassigned replicas)都加入 ISR 之后,就会进行分区 Leader 选举,选举完删除原有的副本,在Kafka扩容broker时,kafka-reassign-partitions.sh脚本的–generate/–execute 配合即可完成上述变更。

Elasticsearch 的扩容

同Kafka partition reassign一样,Elasticseach在扩展节点时也会遇到shards的平衡问题,此时通过master节点手动/自动指定shards之间的 rebalance,如通过控制集群的 cluster.routing.allocation.enable/cluster.routing.rebalance.enable来决定哪类分片可被分配/哪类分片可被再平衡,尽量做到不影响线上的读写。
这在主节点都具备至少一个从节点时,机制运行良好,但在些极端情况下,有些shards因不得已原因没有从shards,如何扩容而不影响线上读写呢?
下一篇文章里笔者会总结下,这里贴一张elasticsearch某个数据节点的cpu负载图,同时告知磁盘io/网络io图走势都和该cpu走势类似,你能在图上指出正常、磁盘超85%、磁盘超95%、挂载MFS存储、卸载MFS存储、扩容所对应的时间点吗(假设索引是daily的,只需指出天级别的即可)

其他

最后是,例行夹带私货部分。
设计模式至今仍被国内许多编程人奉为圭臬,也成了面试青睐的问题,若你从事架构/中间件,那可能必考题了。
十年前,笔者还是在校生的时候,是设计模式的拥趸,那一年笔者读完了从 HeadFirst的《设计模式》到一千多页《Java与模式》( https://book.douban.com/subject/1214074/ ,阎宏博士著 ),再到斥巨资买来GoF的《设计模式:可复用面向对象软件的基础》看完。
但经历越久,越觉得设计模式可以扩展一个人的编程世界,但亦有其局限性。
为什么这么说?
23个设计模式是Erich Gamma等四位博士(Gang of four), 从前人的经验中总结出来的,从面向对象的设计中精选出23种构建健壮/灵活代码设计的技巧,正如其前言所述 本书假设你至少已经比较熟悉一种面向对象编程语言,并且有一定的面向对象设计经验。当我们提及“类型”和“多态”,或“接口”继承与“实现”继承的关系时,你应该对这些概念了然于胸”
23种模式比如工厂模式里分简单工厂/抽象工厂模式是否算设计模式,这也曾经是个争论点。
如今笔者认为设计模式是不存在的,如果类似主题的话,不如看《面向模式的软件架构》上下卷,后者更接近怎么去设计软件。
甚至,对于JAVA研发来说,背诵设计模式不如背诵阿里巴巴的《阿里巴巴Java规范》,尽管笔者对其评价是负面为主,但同意其在实际开发中的实用性。
不赞同上述观点的人可能会以JAVA的BIO就通过装饰者模式实现了各种便捷的reader/stream或者dubbo里的策略模式反驳笔者,但请问下,读小文件或配置之类的文件时大家的选择是各种stream装饰读,还是IOUtils.tostrings或者java 7后提供的Files.listAsstring之类方法“一言以蔽之”略过这些stream?高性能拷贝文件时大家又会选择各种stream组装吗?
今人有二十三种设计模式,古人也有三十六计