Kafka数据迁移
date
Jan 6, 2024
slug
kafka-data-placement
status
Published
tags
Kafka
Messaging System
summary
Kafka在进行副本重新平衡或维护活动时,Kafka需要移动分区副本从一个Broker到另一个,此过程中涉及数据迁移。
type
Post
Kafka-Kit
- scale
- 一定需要有NewBroker,适用于新增节点,进行负载均衡;
- 无storage-threshold、storage-threshold-gb限制,所有老的Broker都可以进行Offload Partition。
- rebuild
- 节点不一定要存在,可以适用于降低配置。
- rebalance
- 适用于全局基于存储量进行负载均衡。
- 无需NewBroker,仅满足storage-threshold、storage-threshold-gb限制的Broker以及老的Broker都可以进行Offload Partition。
rebalance:
- 适用于节点变化时,全局进行数据负载均衡的场景
- 参数
- topics:待重均衡的Topic列表
- brokers:分区的目标Broker
- -1 : 所有mapped brokers:当前所有已经存在Replica分布的Broker
- -2 : 集群中的所有brokers
- storage-threshold:Broker的freeStorage低于平均值,才需要进行Partition的卸载
- storage-threshold-gb:Broker的freeStorage低于具体大小,才需要进行Partition的卸载
- tolerance:被调度的存储空间的限制 / 平均存储空闲量
- partition-limit:Broker维度的待迁移Partition的限制,每次最多考虑N(配置值)个头部的Partition用于迁移。
- partition-size-threshold:Partition大小低于此阈值不会进行迁移
- locality-scoped:所有分区的迁移是否会在rack内
- optimize-leadership:Broker维度的leader/follower比例是否需要优化,避免Leader聚集倾斜。
- 流程:params.requireNewBrokers = false
- brokerMeta = getBrokerMeta → 获取集群中的所有Broker信息
- partitionMeta = getPartitionMeta → 从ZK获取所有的分区信息
- partitionMapIn = getPartitionMaps → 基于参数中的Topic获取对应的分区信息
- excluded = 在partitionMapIn中获取需要被exclude的topic列表
- brokersIn = 基于partitionMapIn计算出涉及Broker的Replica分布信息,并且基于brokerMeta添加Rack信息和FreeStorage信息。
- BrokerId : {Used(当前分布的Replica数量,0为新Broker。)}
- Broker校验:params.broker、brokersIn、brokerMeta、params.requireNewBrokers,存在Missing、Replace、OldMissing则异常;如果requireNewBrokers=true,则必须要有新Broker,否则也异常。
- b: Replica对应Broker当前的分布
- bl: params.broker
- bm : 集群中的全量Broker信息
- providedBrokers : 参数的Broker列表。分区迁移的目标Broker?
- 检查一:b中的Broker在bm中不存在时 - broker非法,里面的分区也需要进行迁移
- b中标记Replace为true、Missing为true
- 发送消息 - BrokerId为Missing的消息
- 在providedBrokers中,则BrokerStatus中记录Missing + 1,否则OldMissing +1。
- 检查二:b中的Broker不存在于providedBrokers中 - broker不在目标迁移Broker的列表,里面的分区需要进行迁移
- BrokerStatus.Replace + 1
- b中标记Replace为true
- 发送消息 - Broker标记待Removal
- 检查三:遍历providedBrokers
- 在b中不存在,则在判断在bm中是否存在
- 存在则在b中新增,证明是新Broker
- 不存在,则BrokerStatus的Missing +1,发送消息 - Broker在ZK中不存在
- 检查四: 检查 Rack.ID是否存在
- offloadTargets = 基于params和BrokersIn计算需要带进行分区卸载的Broker列表
- 优先基于storageThresholdGB、storageThreshold配置,最后看是否老的Broker。
- results = 计算重分配结果computeReassignmentBundles
- otm = key为offloadTarget中的Broker列表
- results = 所有的ReassignmentBundles的结果
- 基于tolerate进行遍历 0.01 - 0.98,单个分区中的副本迁移完成之后,来源Broker的FreeStorage > (1 + tolerate) * meanFreeStorage不可接受;去向Broker的FreeStorage < (1 - tolerate) * meanFreeStorage不可接受;
- 为所有OffloadedBroker进行计算,
- 对于每个Broker,每次选择一个TopPartition进行迁移,直到最均衡没有Partition再满足迁移计划为止。
- m = 获取最小storageRange(MaxFreeStorageDiff),最小stdDev(标准差)的ressignmentBundle
- partitionMapOut = m.partitionMap
- brokersOut = m.brokers
- relos = m.relocations
- 如果params.optimizeLeadership = true,执行partitionMapOut.OptimizeLeaderFollower.
- partitionMapIn、partitionMapOut = 跳过不操作的Assignment,计算出Broker维度需要进的Replica以及需要出的Replica。
rebuild:重建分区信息,支持基于Count和Storage,适用于Topic的分区变化时使用。
参数
- topics:重建的topic列表
- map-string:重建的分区列表
- use-meta:在placement限制中,是否使用broker的元数据?
- force-rebuild:是否强制进行全量重建?
- replication:在所有的副本集合中标准化topic的副本因子
- sub-affinity:Broker替换的置换亲和性
- placement:分区放置策略,基于count还是storage。
- count:不会做分区的排序,直接Broker基于分区数量进行排列。
- min-rack-ids:副本集合的最小要求rack数量(0代表全部需要不一样)。
- optimize:存储防止策略的优化优先级:分布?存储?
- 先进行分区大小的排序,如果是distribution,先选分区存储量大的进行放置,基于Broker的分区均衡性进行放置
- 先进行分区大小的排序,如果是storage,基于Broker的存储大小进行放置
- partition-size-factor:当placement为storage时,乘以分区数量的因子
- brokers:放置所有分区的broker列表
- -1: mapped broker
- -2:集群中所有的broker
- optimize-leadership:优化leader/follower比例
- phase-reassignment:创建两阶段的output map。
- leader-evac-brokers:会移除leadership的broker列表
- leader-evac-topics:会移除leadership的topic列表,和上面leader-evac-brokers的结合适用
- chunk-step-size:一次移动数据到的Broker数量
- 多次进行迁移。
流程:
- evacTopics = 基于leader-evac-topics进行解析,并且在集群中存在
- brokerMeta =
- 如果params.useMetadata为true,则从集群中获取全量的Broker列表
- 否则,为空
- partitionMeta =
- 如果params.placement为storage,从ZK中获取全量的分区元数据
- 否则为空
- partitionMapIn =
- 如果 params.mapString 不为空,则直接反序列化参数中的分区信息
- 如果 params.topics 不为空,则获取topic对应的全部分区信息
- brokers
- brokers:基于partitionMapIn计算出涉及Broker的Replica分布信息,并且基于brokerMeta添加Rack信息和FreeStorage信息。
- BrokerId : {Used(当前分布的Replica数量,0为新Broker。)}
- params.force为true,Used无值;params.force = false, Used为分布的Replica数量。
Kafka官方文档
数据迁移
分区重均衡工具:
bin/kafka-reassign-partitions.sh
。- --generate: 在这个模式下,给定一个主题列表和一个代理列表,该工具生成一个候选重分配,将指定主题的所有分区移动到新的代理。这个选项仅提供了一种方便的方式,根据主题列表和目标代理生成分区重分配计划。
- --execute: 在这个模式下,工具根据用户提供的重分配计划开始分区的重分配(使用 --reassignment-json-file 选项)。这可以是由管理员手工制作的自定义重分配计划,也可以是使用 --generate 选项提供的。
- --verify: 在这个模式下,工具验证在上一次 --execute 期间列出的所有分区的重分配状态。状态可以是成功完成、失败或进行中。
自动化将数据迁移到新的Broker上
定制化数据迁移
Kafka运维操作
Kafka-Kit
Kafka-Kit包括
topicmappr
和autothrottle
等工具。topicmappr
是一个替代Kafka的kafka-reassign-partitions.sh
脚本的--generate
功能的工具。它提供了多种附加功能,如确定性输出、最小移动代理替换、可配置的机架感知分区放置、复制因子更新和清晰的操作总结。此外,
topicmappr
提供了不同的分区副本放置策略,适用于各种工作负载。例如,count
策略旨在跨主题的所有分区均匀分布数据流,而storage
策略则侧重于平衡分区大小而不是数量,适用于分区大小不平衡的主题。Kafka-Kit中的
autothrottle
工具有助于通过动态更新基于观察负载、已知节流率和可配置参数的节流来管理数据复制。随着Kafka集群的增长,面临着不断的故障和数据迁移,这就需要在复制速度和网络影响之间找到平衡。限流 - Autothrottle
Kafka-Kit中的
autothrottle
组件是一个用于自动管理Kafka集群中副本重分配速度的工具。在进行副本重新平衡或维护活动时,Kafka需要移动分区副本从一个Broker到另一个。这些操作可能会产生大量网络和磁盘I/O,进而影响集群的性能。autothrottle
的目的是在保持集群性能的同时,高效地执行这些副本移动操作。功能和特点:
- 自动调节副本移动速率:
autothrottle
能够根据集群的实时性能指标,如网络带宽使用率和磁盘I/O,自动调整副本移动的速率。- 这有助于避免由于副本重分配而导致的集群过载和性能下降。
- 集成Kafka副本重分配工具:
autothrottle
通常与Kafka的副本重分配工具(如kafka-reassign-partitions.sh
)一起使用,以实现高效且平稳的副本移动。
- 动态调整:
- 根据集群的负载和性能指标,
autothrottle
可以动态地调整副本移动的速率,而不是靠静态的速率限制。
- 监控和反馈:
autothrottle
提供了监控副本移动进度的功能,并能根据当前的集群状态实时调整副本移动速度。
- 简化集群管理:
- 通过自动调节副本移动速度,
autothrottle
简化了集群管理,减少了管理员需要手动干预的频率和复杂性。
- 提高数据迁移效率:
- 它能够确保数据迁移过程尽可能高效,同时最小化对生产环境的影响。
使用场景:
- 集群扩展:在添加新Broker或扩展现有集群时,
autothrottle
可以帮助平衡副本分配的速率,减少对集群性能的影响。
- 维护和升级:在执行硬件升级或其他维护任务期间,
autothrottle
有助于确保集群的稳定运行。
- 灾难恢复:在进行灾难恢复操作时,
autothrottle
可以平滑地进行数据复制,提高恢复效率。
总的来说,
autothrottle
是Kafka-Kit中一个非常有用的工具,特别是在需要进行大规模副本迁移和重分配的场合。通过动态调整副本移动的速率,它帮助保持Kafka集群的稳定性和高性能。更多
autothrottle
在 Kafka 中实现 Rebalance 限流的机制主要是通过动态调整 Kafka Broker 配置中与副本移动速率相关的参数。Kafka 提供了一些动态可配置的参数,允许管理员在不重启集群的情况下,调整数据迁移(Rebalance)的速率。Kafka 相关动态配置项:
leader.replication.throttled.rate
和follower.replication.throttled.rate
:- 这些参数控制了领导副本(Leader Replica)和追随副本(Follower Replica)的复制速率。通过设置这些值,可以限制Kafka在Rebalance期间数据迁移的带宽使用。
replica.fetch.max.bytes
:- 这个参数定义了Follower副本从Leader副本那里一次性抓取的最大数据量。调整这个值也可以影响数据复制的速度。
num.replica.fetchers
:- 这个参数确定了每个Broker用于复制数据的线程数。虽然主要用于性能优化,但间接也影响了数据迁移速度。
autothrottle
的工作原理:
- 监控集群性能:
autothrottle
会监控 Kafka 集群的关键性能指标,如网络带宽利用率和磁盘I/O。
- 动态调整:
- 根据当前的集群负载和性能指标,
autothrottle
动态调整上述参数的值来限制副本迁移的速度。 - 这有助于确保集群在 Rebalance 过程中不会因为过高的数据迁移负载而影响正常操作。
- 反馈机制:
- 通过持续监控集群性能和副本迁移的进度,
autothrottle
可以根据需要进一步调整限流参数。
使用场景:
- 在大规模的集群扩展或维护活动中,使用
autothrottle
可以避免因为过度的数据迁移而导致的集群性能下降。
- 在进行副本重分配或Broker迁移时,
autothrottle
确保了平滑的迁移过程,同时最小化对生产环境的影响。
总之,
autothrottle
是通过动态调整 Kafka 的内置配置参数来实现对 Rebalance 操作的限流,从而帮助维护 Kafka 集群在副本迁移期间的稳定性和性能。Ref:
- Kafka-Kit: