Flink协调器Coordinator及自定义Operator

2024-06-15 1238阅读

Flink协调器Coordinator及自定义Operator

最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。

Flink协调器Coordinator及自定义Operator
(图片来源网络,侵删)

协调器Coordinator

Flink中的协调器是用来协调运行时的算子,运行在JobManager中,通过事件的方式与算子通信。例如Source和Sink算子中的协调器是用来发现和分配工作或者聚合和提交元数据。

线程模型

所有协调器方法都由作业管理器的主线程(邮箱线程)调用。这意味着这些方法在任何情况下都不得执行阻塞操作(如 I/ O 或等待锁或或Futures)。这很有可能使整个 JobManager 瘫痪。

因此,涉及更复杂操作的协调器应生成线程来处理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地从另一个线程调用,而不是从调用协调器方法的线程调用。

一致性

与调度程序的视图相比,协调器对任务执行的视图高度简化,但允许与在并行子任务上运行的操作员进行一致的交互。具体而言,保证严格按顺序调用以下方法:

  1. executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任务就绪的时候调用一次。SubtaskGateway是用来与子任务交互的网关。这是与子任务尝试交互的开始。

    executionAttemptFailed(int, int, Throwable):在尝试失败或取消后立即调用每个子任务。此时,应停止与子任务尝试的交互。

  2. subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦调度程序确定了要还原的检查点,这些方法就会通知协调器。前一种方法在发生区域故障/ 恢复(可能影响子任务的子集)时调用,后一种方法在全局故障/ 恢复的情况下调用。此方法应用于确定要恢复的操作,因为它会告诉要回退到哪个检查点。协调器实现需要恢复自还原的检查点以来与相关任务的交互。只有在子任务的所有尝试被调用后 executionAttemptFailed(int, int, Throwable) ,才会调用它。
  3. executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢复的任务(新尝试)准备就绪后再次调用。这晚于 subtaskReset(int, long),因为在这些方法之间,会计划和部署新的尝试。

接口方法说明

实现自定义的协调器需要实现OperatorCoordinator接口方法,各方法说明如下所示:

public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
    // ------------------------------------------------------------------------
    /**
     * 启动协调器,启动时调用一次当前方法在所有方法之前
     * 此方法抛出的异常都会导致当前作业失败
     */
    void start() throws Exception;
    /**
     * 释放协调器时调用当前方法,此方法应当释放持有的资源
     * 此方法抛出的异常不会导致作业失败
     */
    @Override
    void close() throws Exception;
    // ------------------------------------------------------------------------
    /**
     * 处理来自并行算子实例的事件
     * 此方法抛出的异常会导致作业失败并恢复
     */
    void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
            throws Exception;
    // ------------------------------------------------------------------------
    /**
     * 为协调器做checkpoint,将当前协调器中的状态序列化到checkpoint中,执行成功需要调用CompletableFuture的complete方法,失败需要调用CompletableFuture的completeExceptionally方法
     */
    void checkpointCoordinator(long checkpointId, CompletableFuture resultFuture)
            throws Exception;
    /**
     * We override the method here to remove the checked exception. Please check the Java docs of
     * {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the
     * method.
     */
    @Override
    void notifyCheckpointComplete(long checkpointId);
    /**
     * We override the method here to remove the checked exception. Please check the Java docs of
     * {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the
     * method.
     */
    @Override
    default void notifyCheckpointAborted(long checkpointId) {}
    /**
     * 从checkpoint重置当前的协调器
     */
    void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;
    // ------------------------------------------------------------------------
    /**
     * 子任务重置时调用此方法
     */
    void subtaskReset(int subtask, long checkpointId);
    /**
     * 子任务失败时调用此方法 
     */
    void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);
    /**
     * 子任务就绪时调用此方法
     */
    void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}

算子Operator

Flink中执行计算任务的算子,像使用DataStream API时调用的map、flatmap、process传入的自定义函数最终都会封装为一个一个的算子。使用UDF已经能够满足大多数的开发场景,但涉及到与协调器打交道时需要自定义算子,自定义算子相对比较好简单,具体可以参考org.apache.flink.streaming.api.operators.KeyedProcessOperator的实现。

自定义算子需要实现AbstractStreamOperator和OneInputStreamOperator接口方法

实现定时器功能,需要实现Triggerable接口方法

实现处理协调器的事件功能,需要实现OperatorEventHandler接口方法

示例

自定义算子

这里实现一个自定义的算子,用来处理KeyedStream的数据,它能够接受来自协调器的事件,并且能够给协调器发送事件。

MyKeyedProcessOperator实现代码如下:

package com.examples.operator;
import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义的KeyedProcessOperator
 * @author shirukai
 */
public class MyKeyedProcessOperator extends AbstractStreamOperator
        implements OneInputStreamOperator,
        Triggerable,
        OperatorEventHandler {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);
    private transient TimestampedCollector collector;
    private transient TimerService timerService;
    private final OperatorEventGateway operatorEventGateway;
    public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {
        this.processingTimeService = processingTimeService;
        this.operatorEventGateway = operatorEventGateway;
    }
    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector(output);
        InternalTimerService internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        timerService = new SimpleTimerService(internalTimerService);
    }
    @Override
    public void processElement(StreamRecord element) throws Exception {
        LOG.info("processElement: {}", element);
        collector.setTimestamp(element);
        // 注册事件时间定时器
        timerService.registerEventTimeTimer(element.getTimestamp() + 10);
        // 注册处理时间定时器
        timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);
        // 给协调器发送消息
        operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));
        // 不做任何处理直接发送到下游
        collector.collect((OUT) element.getValue());
    }
    @Override
    public void onEventTime(InternalTimer timer) throws Exception {
        LOG.info("onEventTime: {}", timer);
    }
    @Override
    public void onProcessingTime(InternalTimer timer) throws Exception {
        LOG.info("onProcessingTime: {}", timer);
    }
    @Override
    public void handleOperatorEvent(OperatorEvent evt) {
        LOG.info("handleOperatorEvent: {}", evt);
    }
}

算子工厂类MyKeyedProcessOperatorFactory:

package com.examples.operator;
import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
/**
 * 自定义算子工厂类
 * @author shirukai
 */
public class MyKeyedProcessOperatorFactory extends AbstractStreamOperatorFactory
        implements OneInputStreamOperatorFactory,
        CoordinatedOperatorFactory,
        ProcessingTimeServiceAware {
    @Override
    public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
        return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);
    }
    @Override
    public  T createStreamOperator(StreamOperatorParameters parameters) {
        final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
        final OperatorEventGateway gateway =
                parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
        try {
            final MyKeyedProcessOperator
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]