Kafka数据迁移

date
Jan 6, 2024
slug
kafka-data-placement
status
Published
tags
Kafka
Messaging System
summary
Kafka在进行副本重新平衡或维护活动时,Kafka需要移动分区副本从一个Broker到另一个,此过程中涉及数据迁移。
type
Post
 
Kafka-Kit
  • scale
    • 一定需要有NewBroker,适用于新增节点,进行负载均衡;
    • 无storage-threshold、storage-threshold-gb限制,所有老的Broker都可以进行Offload Partition。
  • rebuild
    • 节点不一定要存在,可以适用于降低配置。
  • rebalance
    • 适用于全局基于存储量进行负载均衡。
    • 无需NewBroker,仅满足storage-threshold、storage-threshold-gb限制的Broker以及老的Broker都可以进行Offload Partition。
 
 
 
rebalance:
  • 适用于节点变化时,全局进行数据负载均衡的场景
  • 参数
    • topics:待重均衡的Topic列表
    • brokers:分区的目标Broker
      • -1 : 所有mapped brokers:当前所有已经存在Replica分布的Broker
      • -2 : 集群中的所有brokers
    • storage-threshold:Broker的freeStorage低于平均值,才需要进行Partition的卸载
    • storage-threshold-gb:Broker的freeStorage低于具体大小,才需要进行Partition的卸载
    • tolerance:被调度的存储空间的限制 / 平均存储空闲量
    • partition-limit:Broker维度的待迁移Partition的限制,每次最多考虑N(配置值)个头部的Partition用于迁移。
    • partition-size-threshold:Partition大小低于此阈值不会进行迁移
    • locality-scoped:所有分区的迁移是否会在rack内
    • optimize-leadership:Broker维度的leader/follower比例是否需要优化,避免Leader聚集倾斜。
  • 流程:params.requireNewBrokers = false
    • brokerMeta = getBrokerMeta → 获取集群中的所有Broker信息
    • partitionMeta = getPartitionMeta → 从ZK获取所有的分区信息
    • partitionMapIn = getPartitionMaps → 基于参数中的Topic获取对应的分区信息
    • excluded = 在partitionMapIn中获取需要被exclude的topic列表
    • brokersIn = 基于partitionMapIn计算出涉及Broker的Replica分布信息,并且基于brokerMeta添加Rack信息和FreeStorage信息。
      • BrokerId : {Used(当前分布的Replica数量,0为新Broker。)}
    • Broker校验:params.broker、brokersIn、brokerMeta、params.requireNewBrokers,存在Missing、Replace、OldMissing则异常;如果requireNewBrokers=true,则必须要有新Broker,否则也异常。
      • b: Replica对应Broker当前的分布
      • bl: params.broker
      • bm : 集群中的全量Broker信息
      • providedBrokers : 参数的Broker列表。分区迁移的目标Broker?
      • 检查一:b中的Broker在bm中不存在时 - broker非法,里面的分区也需要进行迁移
        • b中标记Replace为true、Missing为true
        • 发送消息 - BrokerId为Missing的消息
        • 在providedBrokers中,则BrokerStatus中记录Missing + 1,否则OldMissing +1
      • 检查二:b中的Broker不存在于providedBrokers中 - broker不在目标迁移Broker的列表,里面的分区需要进行迁移
        • BrokerStatus.Replace + 1
        • b中标记Replace为true
        • 发送消息 - Broker标记待Removal
      • 检查三:遍历providedBrokers
        • 在b中不存在,则在判断在bm中是否存在
          • 存在则在b中新增,证明是新Broker
          • 不存在,则BrokerStatus的Missing +1,发送消息 - Broker在ZK中不存在
      • 检查四: 检查 Rack.ID是否存在
    • offloadTargets = 基于params和BrokersIn计算需要带进行分区卸载的Broker列表
      • 优先基于storageThresholdGB、storageThreshold配置,最后看是否老的Broker。
    • results = 计算重分配结果computeReassignmentBundles
      • otm = key为offloadTarget中的Broker列表
      • results = 所有的ReassignmentBundles的结果
      • 基于tolerate进行遍历 0.01 - 0.98,单个分区中的副本迁移完成之后,来源Broker的FreeStorage > (1 + tolerate) * meanFreeStorage不可接受;去向Broker的FreeStorage < (1 - tolerate) * meanFreeStorage不可接受;
        • 为所有OffloadedBroker进行计算,
          • 对于每个Broker,每次选择一个TopPartition进行迁移,直到最均衡没有Partition再满足迁移计划为止。
    • m = 获取最小storageRange(MaxFreeStorageDiff),最小stdDev(标准差)的ressignmentBundle
    • partitionMapOut = m.partitionMap
    • brokersOut = m.brokers
    • relos = m.relocations
    • 如果params.optimizeLeadership = true,执行partitionMapOut.OptimizeLeaderFollower.
    • partitionMapIn、partitionMapOut = 跳过不操作的Assignment,计算出Broker维度需要进的Replica以及需要出的Replica。
    •  
       
       
rebuild:重建分区信息,支持基于Count和Storage,适用于Topic的分区变化时使用。
参数
  • topics:重建的topic列表
  • map-string:重建的分区列表
  • use-meta:在placement限制中,是否使用broker的元数据?
  • force-rebuild:是否强制进行全量重建?
  • replication:在所有的副本集合中标准化topic的副本因子
  • sub-affinity:Broker替换的置换亲和性
  • placement:分区放置策略,基于count还是storage。
    • count:不会做分区的排序,直接Broker基于分区数量进行排列。
  • min-rack-ids:副本集合的最小要求rack数量(0代表全部需要不一样)。
  • optimize:存储防止策略的优化优先级:分布?存储?
    • 先进行分区大小的排序,如果是distribution,先选分区存储量大的进行放置,基于Broker的分区均衡性进行放置
    • 先进行分区大小的排序,如果是storage,基于Broker的存储大小进行放置
  • partition-size-factor:当placement为storage时,乘以分区数量的因子
  • brokers:放置所有分区的broker列表
    • -1: mapped broker
    • -2:集群中所有的broker
  • optimize-leadership:优化leader/follower比例
  • phase-reassignment:创建两阶段的output map。
  • leader-evac-brokers:会移除leadership的broker列表
  • leader-evac-topics:会移除leadership的topic列表,和上面leader-evac-brokers的结合适用
  • chunk-step-size:一次移动数据到的Broker数量
    • 多次进行迁移。
 
流程:
  • evacTopics = 基于leader-evac-topics进行解析,并且在集群中存在
  • brokerMeta =
    • 如果params.useMetadata为true,则从集群中获取全量的Broker列表
    • 否则,为空
  • partitionMeta =
    • 如果params.placement为storage,从ZK中获取全量的分区元数据
    • 否则为空
  • partitionMapIn =
    • 如果 params.mapString 不为空,则直接反序列化参数中的分区信息
    • 如果 params.topics 不为空,则获取topic对应的全部分区信息
  • brokers
    • brokers:基于partitionMapIn计算出涉及Broker的Replica分布信息,并且基于brokerMeta添加Rack信息和FreeStorage信息。
      • BrokerId : {Used(当前分布的Replica数量,0为新Broker。)}
      • params.force为true,Used无值;params.force = false, Used为分布的Replica数量。
 
 

Kafka官方文档

数据迁移

分区重均衡工具:bin/kafka-reassign-partitions.sh
  • --generate: 在这个模式下,给定一个主题列表和一个代理列表,该工具生成一个候选重分配,将指定主题的所有分区移动到新的代理。这个选项仅提供了一种方便的方式,根据主题列表和目标代理生成分区重分配计划。
  • --execute: 在这个模式下,工具根据用户提供的重分配计划开始分区的重分配(使用 --reassignment-json-file 选项)。这可以是由管理员手工制作的自定义重分配计划,也可以是使用 --generate 选项提供的。
  • --verify: 在这个模式下,工具验证在上一次 --execute 期间列出的所有分区的重分配状态。状态可以是成功完成、失败或进行中。
 

自动化将数据迁移到新的Broker上

定制化数据迁移

 

Kafka运维操作

Kafka-Kit

Kafka-Kit包括topicmapprautothrottle等工具。topicmappr是一个替代Kafka的kafka-reassign-partitions.sh脚本的--generate功能的工具。它提供了多种附加功能,如确定性输出、最小移动代理替换、可配置的机架感知分区放置、复制因子更新和清晰的操作总结。
此外,topicmappr提供了不同的分区副本放置策略,适用于各种工作负载。例如,count策略旨在跨主题的所有分区均匀分布数据流,而storage策略则侧重于平衡分区大小而不是数量,适用于分区大小不平衡的主题。
Kafka-Kit中的autothrottle工具有助于通过动态更新基于观察负载、已知节流率和可配置参数的节流来管理数据复制。随着Kafka集群的增长,面临着不断的故障和数据迁移,这就需要在复制速度和网络影响之间找到平衡。
 

限流 - Autothrottle

Kafka-Kit中的autothrottle组件是一个用于自动管理Kafka集群中副本重分配速度的工具。在进行副本重新平衡或维护活动时,Kafka需要移动分区副本从一个Broker到另一个。这些操作可能会产生大量网络和磁盘I/O,进而影响集群的性能。autothrottle的目的是在保持集群性能的同时,高效地执行这些副本移动操作。

功能和特点:

  1. 自动调节副本移动速率
      • autothrottle能够根据集群的实时性能指标,如网络带宽使用率和磁盘I/O,自动调整副本移动的速率。
      • 这有助于避免由于副本重分配而导致的集群过载和性能下降。
  1. 集成Kafka副本重分配工具
      • autothrottle通常与Kafka的副本重分配工具(如kafka-reassign-partitions.sh)一起使用,以实现高效且平稳的副本移动。
  1. 动态调整
      • 根据集群的负载和性能指标,autothrottle可以动态地调整副本移动的速率,而不是靠静态的速率限制。
  1. 监控和反馈
      • autothrottle提供了监控副本移动进度的功能,并能根据当前的集群状态实时调整副本移动速度。
  1. 简化集群管理
      • 通过自动调节副本移动速度,autothrottle简化了集群管理,减少了管理员需要手动干预的频率和复杂性。
  1. 提高数据迁移效率
      • 它能够确保数据迁移过程尽可能高效,同时最小化对生产环境的影响。

使用场景:

  • 集群扩展:在添加新Broker或扩展现有集群时,autothrottle可以帮助平衡副本分配的速率,减少对集群性能的影响。
  • 维护和升级:在执行硬件升级或其他维护任务期间,autothrottle有助于确保集群的稳定运行。
  • 灾难恢复:在进行灾难恢复操作时,autothrottle可以平滑地进行数据复制,提高恢复效率。
总的来说,autothrottle是Kafka-Kit中一个非常有用的工具,特别是在需要进行大规模副本迁移和重分配的场合。通过动态调整副本移动的速率,它帮助保持Kafka集群的稳定性和高性能。
 

更多

autothrottle 在 Kafka 中实现 Rebalance 限流的机制主要是通过动态调整 Kafka Broker 配置中与副本移动速率相关的参数。Kafka 提供了一些动态可配置的参数,允许管理员在不重启集群的情况下,调整数据迁移(Rebalance)的速率。

Kafka 相关动态配置项:

  1. leader.replication.throttled.ratefollower.replication.throttled.rate
      • 这些参数控制了领导副本(Leader Replica)和追随副本(Follower Replica)的复制速率。通过设置这些值,可以限制Kafka在Rebalance期间数据迁移的带宽使用。
  1. replica.fetch.max.bytes
      • 这个参数定义了Follower副本从Leader副本那里一次性抓取的最大数据量。调整这个值也可以影响数据复制的速度。
  1. num.replica.fetchers
      • 这个参数确定了每个Broker用于复制数据的线程数。虽然主要用于性能优化,但间接也影响了数据迁移速度。

autothrottle 的工作原理:

  1. 监控集群性能
      • autothrottle 会监控 Kafka 集群的关键性能指标,如网络带宽利用率和磁盘I/O。
  1. 动态调整
      • 根据当前的集群负载和性能指标,autothrottle 动态调整上述参数的值来限制副本迁移的速度。
      • 这有助于确保集群在 Rebalance 过程中不会因为过高的数据迁移负载而影响正常操作。
  1. 反馈机制
      • 通过持续监控集群性能和副本迁移的进度,autothrottle 可以根据需要进一步调整限流参数。

使用场景:

  • 在大规模的集群扩展或维护活动中,使用 autothrottle 可以避免因为过度的数据迁移而导致的集群性能下降。
  • 在进行副本重分配或Broker迁移时,autothrottle 确保了平滑的迁移过程,同时最小化对生产环境的影响。
总之,autothrottle 是通过动态调整 Kafka 的内置配置参数来实现对 Rebalance 操作的限流,从而帮助维护 Kafka 集群在副本迁移期间的稳定性和性能。
 
 
Ref:
  • Kafka-Kit:

© IBeyondy 2024