【Flink】Flink 的八种分区策略(源码解读)

04-11 1444阅读

Flink 的八种分区策略(源码解读)

  • 1.继承关系图
    • 1.1 接口:ChannelSelector
    • 1.2 抽象类:StreamPartitioner
    • 1.3 继承关系图
    • 2.分区策略
      • 2.1 GlobalPartitioner
      • 2.2 ShufflePartitioner
      • 2.3 BroadcastPartitioner
      • 2.4 RebalancePartitioner
      • 2.5 RescalePartitioner
      • 2.6 ForwardPartitioner
      • 2.7 KeyGroupStreamPartitioner
      • 2.8 CustomPartitionerWrapper

        Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

        【Flink】Flink 的八种分区策略(源码解读)

        • GlobalPartitioner
        • ShufflePartitioner
        • RebalancePartitioner
        • RescalePartitioner
        • BroadcastPartitioner
        • ForwardPartitioner
        • KeyGroupStreamPartitioner
        • CustomPartitionerWrapper

          1.继承关系图

          1.1 接口:ChannelSelector

          public interface ChannelSelector {
              /**
               * 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).
               */
              void setup(int numberOfChannels);
              /**
               *根据当前的record以及Channel总数,
               *决定应将record发送到下游哪个Channel。
               *不同的分区策略会实现不同的该方法。
               */
              int selectChannel(T record);
              /**
              *是否以广播的形式发送到下游所有的算子实例
               */
              boolean isBroadcast();
          }
          

          1.2 抽象类:StreamPartitioner

          public abstract class StreamPartitioner implements
                  ChannelSelector, Serializable {
              private static final long serialVersionUID = 1L;
              protected int numberOfChannels;
              @Override
              public void setup(int numberOfChannels) {
                  this.numberOfChannels = numberOfChannels;
              }
              @Override
              public boolean isBroadcast() {
                  return false;
              }
              public abstract StreamPartitioner copy();
          }
          

          1.3 继承关系图

          【Flink】Flink 的八种分区策略(源码解读)

          2.分区策略

          2.1 GlobalPartitioner

          该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

          【Flink】Flink 的八种分区策略(源码解读)

          /**
           * 发送所有的数据到下游算子的第一个task(ID = 0)
           * @param 
           */
          @Internal
          public class GlobalPartitioner extends StreamPartitioner {
              private static final long serialVersionUID = 1L;
              @Override
              public int selectChannel(SerializationDelegate record) {
                  //只返回0,即只发送给下游算子的第一个task
                  return 0;
              }
              @Override
              public StreamPartitioner copy() {
                  return this;
              }
              @Override
              public String toString() {
                  return "GLOBAL";
              }
          }
          

          2.2 ShufflePartitioner

          随机选择一个下游算子实例进行发送。

          【Flink】Flink 的八种分区策略(源码解读)

          /**
           * 随机的选择一个channel进行发送
           * @param 
           */
          @Internal
          public class ShufflePartitioner extends StreamPartitioner {
              private static final long serialVersionUID = 1L;
              private Random random = new Random();
              @Override
              public int selectChannel(SerializationDelegate record) {
                  //产生[0,numberOfChannels)伪随机数,随机发送到下游的某个task
                  return random.nextInt(numberOfChannels);
              }
              @Override
              public StreamPartitioner copy() {
                  return new ShufflePartitioner();
              }
              @Override
              public String toString() {
                  return "SHUFFLE";
              }
          }
          

          2.3 BroadcastPartitioner

          发送到下游所有的算子实例。

          【Flink】Flink 的八种分区策略(源码解读)

          /**
           * 发送到所有的channel
           */
          @Internal
          public class BroadcastPartitioner extends StreamPartitioner {
              private static final long serialVersionUID = 1L;
              /**
               * Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道
               */
              @Override
              public int selectChannel(SerializationDelegate record) {
                  throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
              }
              @Override
              public boolean isBroadcast() {
                  return true;
              }
              @Override
              public StreamPartitioner copy() {
                  return this;
              }
              @Override
              public String toString() {
                  return "BROADCAST";
              }
          }
          

          2.4 RebalancePartitioner

          通过循环的方式依次发送到下游的 task。

          【Flink】Flink 的八种分区策略(源码解读)

          /**
           *通过循环的方式依次发送到下游的task
           * @param 
           */
          @Internal
          public class RebalancePartitioner extends StreamPartitioner {
              private static final long serialVersionUID = 1L;
              private int nextChannelToSendTo;
              @Override
              public void setup(int numberOfChannels) {
                  super.setup(numberOfChannels);
                  //初始化channel的id,返回[0,numberOfChannels)的伪随机数
                  nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
              }
              @Override
              public int selectChannel(SerializationDelegate record) {
                  //循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2
                  //则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推
                  nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
                  return nextChannelToSendTo;
              }
              public StreamPartitioner copy() {
                  return this;
              }
              @Override
              public String toString() {
                  return "REBALANCE";
              }
          }
          

          2.5 RescalePartitioner

          基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

          举例:

          • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
          • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

            【Flink】Flink 的八种分区策略(源码解读)

            @Internal
            public class RescalePartitioner extends StreamPartitioner {
                private static final long serialVersionUID = 1L;
                private int nextChannelToSendTo = -1;
                @Override
                public int selectChannel(SerializationDelegate record) {
                    if (++nextChannelToSendTo >= numberOfChannels) {
                        nextChannelToSendTo = 0;
                    }
                    return nextChannelToSendTo;
                }
                public StreamPartitioner copy() {
                    return this;
                }
                @Override
                public String toString() {
                    return "RESCALE";
                }
            }
            

            Flink 中的执行图可以分成四层:StreamGraph ➡ JobGraph ➡ ExecutionGraph ➡ 物理执行图。

            • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
            • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
            • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
            • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “图”,并不是一个具体的数据结构。

              而 StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitioner 和 RescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

              if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
                          jobEdge = downStreamVertex.connectNewDataSetAsInput(
                              headVertex,
                              // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)
                              DistributionPattern.POINTWISE,
                              resultPartitionType);
                      } else {
                          jobEdge = downStreamVertex.connectNewDataSetAsInput(
                              headVertex,
                              // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)
                              DistributionPattern.ALL_TO_ALL,
                              resultPartitionType);
                      }
              

              2.6 ForwardPartitioner

              发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

              【Flink】Flink 的八种分区策略(源码解读)

              /**
               * 发送到下游对应的第一个task
               * @param 
               */
              @Internal
              public class ForwardPartitioner extends StreamPartitioner {
                  private static final long serialVersionUID = 1L;
                  @Override
                  public int selectChannel(SerializationDelegate record) {
                      return 0;
                  }
                  public StreamPartitioner copy() {
                      return this;
                  }
                  @Override
                  public String toString() {
                      return "FORWARD";
                  }
              }
              

              在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

              //在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
              if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                  partitioner = new ForwardPartitioner();
              } else if (partitioner == null) {
                  partitioner = new RebalancePartitioner();
              }
              if (partitioner instanceof ForwardPartitioner) {
                  //如果上下游的并行度不一致,会抛出异常
                  if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                      throw new UnsupportedOperationException("Forward partitioning does not allow " +
                          "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                          ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                          " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                  }
              }
              

              2.7 KeyGroupStreamPartitioner

              根据 key 的分组索引选择发送到相对应的下游 subtask。

              【Flink】Flink 的八种分区策略(源码解读)

              • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
                /**
                 * 根据key的分组索引选择发送到相对应的下游subtask
                 * @param 
                 * @param 
                 */
                @Internal
                public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner {
                ...
                    @Override
                    public int selectChannel(SerializationDelegate record) {
                        K key;
                        try {
                            key = keySelector.getKey(record.getInstance().getValue());
                        } catch (Exception e) {
                            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
                        }
                        //调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示
                        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
                    }
                ...
                }
                
                • org.apache.flink.runtime.state.KeyGroupRangeAssignment
                  public final class KeyGroupRangeAssignment {
                  ...
                      /**
                       * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
                       * 即该key发送到哪一个task
                       */
                      public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
                          Preconditions.checkNotNull(key, "Assigned key must not be null!");
                          return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
                      }
                      /**
                       *根据key分配一个分组id(keyGroupId)
                       */
                      public static int assignToKeyGroup(Object key, int maxParallelism) {
                          Preconditions.checkNotNull(key, "Assigned key must not be null!");
                          //获取key的hashcode
                          return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
                      }
                      /**
                       * 根据key分配一个分组id(keyGroupId),
                       */
                      public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
                          //与maxParallelism取余,获取keyGroupId
                          return MathUtils.murmurHash(keyHash) % maxParallelism;
                      }
                      //计算分区index,即该key group应该发送到下游的哪一个算子实例
                      public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
                          return keyGroupId * parallelism / maxParallelism;
                      }
                  ...
                  

                  2.8 CustomPartitionerWrapper

                  通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

                  public class CustomPartitionerWrapper extends StreamPartitioner {
                      private static final long serialVersionUID = 1L;
                      Partitioner partitioner;
                      KeySelector keySelector;
                      public CustomPartitionerWrapper(Partitioner partitioner, KeySelector keySelector) {
                          this.partitioner = partitioner;
                          this.keySelector = keySelector;
                      }
                      @Override
                      public int selectChannel(SerializationDelegate record) {
                          K key;
                          try {
                              key = keySelector.getKey(record.getInstance().getValue());
                          } catch (Exception e) {
                              throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
                          }
                  		//实现Partitioner接口,重写partition方法
                          return partitioner.partition(key, numberOfChannels);
                      }
                      @Override
                      public StreamPartitioner copy() {
                          return this;
                      }
                      @Override
                      public String toString() {
                          return "CUSTOM";
                      }
                  }
                  

                  比如:

                  public class CustomPartitioner implements Partitioner {
                        // key: 根据key的值来分区
                        // numPartitions: 下游算子并行度
                        @Override
                        public int partition(String key, int numPartitions) {
                           return key.length() % numPartitions;//在此处定义分区策略
                        }
                    }
                  
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]