Apache Flink 是一个流处理框架,它的 checkpoin机制可以保证应用程序的容错性。在本文中,我们将对 Flink 的 checkpoint 机制进行深入讲解,同时通过源码分析,了解它的实现细节。
Checkpoint 机制
Checkpoint 是指将应用程序的状态保存到外部存储介质中,以便在应用程序失败时恢复状态。在 Flink 中,checkpoint 机制由 JobManager 和 TaskManager 共同协作完成。JobManager 是 Flink 中的控制节点,负责管理应用程序的任务执行、数据分发等工作,TaskManager 则是具体执行任务的节点。当应用程序进行 checkpoint 时,JobManager 会向所有 TaskManager 发送消息,通知它们进行 checkpoint。每个 TaskManager 会将它所管理的任务的状态保存到外部存储介质中。
Checkpoint 机制可以保证 Flink 应用程序的容错性。当应用程序执行过程中发生故障时,它可以通过从外部存储介质中读取 checkpoint 来恢复状态。这种机制可以有效地减少应用程序的停机时间,提高应用程序的可用性。
在 Flink 中,checkpoint 机制是可配置的。应用程序可以设置 checkpoint 的频率和持久化方式。例如,应用程序可以设置每隔 1 分钟进行一次 checkpoint,并将状态保存到 HDFS 中。下面我们将详细讲解 Flink 的 checkpoint 机制的实现细节。
Checkpoint 的实现细节
Checkpoint 生命周期
在 Flink 中,一个 checkpoint 包括三个阶段:
- 前置操作(Pre-checkpoint):在此阶段,JobManager 向 TaskManager 发送消息,通知它们准备进行 checkpoint。TaskManager 会对其管理的任务进行一些准备工作,如将任务的状态保存到内存中,暂停任务的执行等。
- Checkpoint 操作:在此阶段,TaskManager 会将其管理的任务的状态保存到外部存储介质中。JobManager 会等待所有 TaskManager 完成此操作。
- 后置操作(Post-checkpoint):在此阶段,JobManager 会将所有 TaskManager 保存的状态合并起来,并将其保存到外部存储介质中。完成后,任务会继续执行。
Checkpoint 的实现方式
Flink 的 checkpoint 机制有两种实现方式:异步快照(Asynchronous Checkpointing)和同步快照(Synchronous Checkpointing)。两者的区别在于 TaskManager 执行 checkpoint 的时间点。
在异步快照中,TaskManager 可以在任意时间点执行 checkpoint。JobManager 在收到所有 TaskManager 的 checkpoint 后,会将状态合并并保存到外部存储介质中。异步快照可以减少应用程序的停机时间,但是可能会导致数据不
一致性问题。例如,如果一个任务在 TaskManager 执行 checkpoint 前完成了一些操作,但在 checkpoint 完成后失败了,那么这些操作将会丢失。
在同步快照中,TaskManager 需要在一个全局的时间点同时执行 checkpoint。JobManager 会等待所有 TaskManager 完成 checkpoint,然后将状态合并并保存到外部存储介质中。同步快照可以保证数据的一致性,但是可能会导致应用程序的停机时间增加。
在 Flink 中,默认使用异步快照实现 checkpoint。应用程序可以通过设置参数来切换到同步快照模式。例如,应用程序可以设置:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
来开启同步快照模式。
Checkpoint 存储介质
Flink 支持将 checkpoint 存储在多种存储介质中,如本地文件系统、HDFS、S3 等。在 Flink 中,checkpoint 存储介质由 StateBackend 管理。StateBackend 定义了如何将应用程序的状态保存到外部存储介质中。
Flink 提供了三种 StateBackend:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。其中 MemoryStateBackend 会将状态保存在 TaskManager 的内存中,适合小型应用程序。FsStateBackend 会将状态保存在本地文件系统或 HDFS 中,适合中型应用程序。RocksDBStateBackend 会将状态保存在 RocksDB 中,适合大型应用程序。
应用程序可以通过设置参数来选择 StateBackend。例如,应用程序可以设置:
env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/flink/checkpoints"));
来将状态保存在 RocksDB 中,并将 checkpoint 存储在 HDFS 中。
Checkpoint 配置参数
Flink 提供了许多参数来配置 checkpoint。下面列出了一些常用的参数:
- checkpoint.interval:指定 checkpoint 的时间间隔。
- checkpoint.timeout:指定 checkpoint 的超时时间。
- checkpoint.max-concurrent-checkpoints:指定同时进行 checkpoint 的最大数量。
- checkpointing.mode:指定 checkpoint 的实现方式,可以是 EXACTLY_ONCE 或 AT_LEAST_ONCE。
- state.backend:指定 StateBackend 的类型。
- state.checkpoints.dir:指定 checkpoint 存储的目录。
应用程序可以通过设置这些参数来调整 checkpoint 的性能和容错性。
Checkpoint 源码分析
下面我们将通过分析 Flink 的源码,了解 checkpoint 的实现细节。
JobManager 和 TaskManager 的协作
在 Flink 中,JobManager 和 TaskManager 通过 Actor 模型进行通信。JobManager 和 TaskManager 之间的通信由两个 Actor 承担:
- CheckpointCoordinator:负责管理 checkpoint 的执行,包括发送 checkpoint 请求、处理 checkpoint 请求、合并状态等工作。
- Task:负责处理具体的任务,包括执行任务、保存任务的
状态、处理 checkpoint 等工作。
CheckpointCoordinator 和 Task 之间的通信是通过 CheckpointMessages 实现的。CheckpointMessages 是一个包含了所有与 checkpoint 相关的消息类型的枚举类型。例如,CheckpointMessages.TriggerCheckpoint 消息表示触发 checkpoint,CheckpointMessages.AcknowledgeCheckpoint 消息表示确认 checkpoint。
在 Flink 中,当应用程序启动时,JobManager 会创建 CheckpointCoordinator,并向 TaskManager 发送注册消息。TaskManager 收到注册消息后,会创建一个 CheckpointResponder,并将自己的地址发送给 CheckpointCoordinator。
CheckpointResponder 负责响应 checkpoint 请求。当 CheckpointCoordinator 向 TaskManager 发送 checkpoint 请求时,CheckpointResponder 会在 TaskManager 上执行 checkpoint,并将结果返回给 CheckpointCoordinator。
Checkpoint 触发
Flink 支持两种方式触发 checkpoint:基于时间的触发和基于数据量的触发。基于时间的触发会在固定的时间间隔内触发 checkpoint,而基于数据量的触发会在处理一定数量的数据后触发 checkpoint。
当应用程序使用基于时间的触发时,JobManager 会定期向 TaskManager 发送 TriggerCheckpoint 消息。TaskManager 收到 TriggerCheckpoint 消息后,会向 CheckpointCoordinator 发送 TriggerCheckpoint 消息。CheckpointCoordinator 收到 TriggerCheckpoint 消息后,会向所有 TaskManager 发送 TriggerCheckpoint message,并等待所有 TaskManager 的响应。
当应用程序使用基于数据量的触发时,TaskManager 会维护一个计数器,记录处理的数据量。当计数器达到指定值时,TaskManager 会向 CheckpointCoordinator 发送 TriggerCheckpoint 消息。CheckpointCoordinator 收到 TriggerCheckpoint 消息后,会向所有 TaskManager 发送 TriggerCheckpoint 消息,并等待所有 TaskManager 的响应。
Checkpoint 处理
当 TaskManager 收到 TriggerCheckpoint 消息后,会执行 checkpoint,并将 checkpoint 结果发送给 CheckpointCoordinator。CheckpointCoordinator 收到 checkpoint 结果后,会将结果存储在 StateBackend 中。
在异步快照模式下,TaskManager 可能会在完成 checkpoint 后立即开始执行下一个任务。因此,CheckpointCoordinator 需要等待所有 TaskManager 的响应,以确保所有 TaskManager 都已经完成 checkpoint。
当所有 TaskManager 的响应都到达后,CheckpointCoordinator 会将状态合并,并将结果保存在外部存储介质中。在同步快照模式下,CheckpointCoordinator 会等待所有 TaskManager 完成 checkpoint 后再进行合并和保存操作。
Checkpoint 恢复
在 Flink 中,当应用程序发生故障时,可以通过 checkpoint 来恢复状态。Flink 支持两种恢复模式:精确一次(exactly-once)和至少一次(at-least-once)。
在精确一次模式下,Flink 可以保证在任何情况下都只会恢复一次状态。在这种模式下,Flink 使用两阶段提交来实现精确一次。Flink 在第一阶段将所有 TaskManager 的状态从内存中写入到外部存储介质中。如果写入成功,则在第二阶段将状态从外部存储介质中恢复到内存中。如果写入失败,则不会进行恢复操作。
在至少一次模式下,Flink 可以在应用程序发生故障时恢复状态,但是可能会出现重复计算。在这种模式下,Flink 不会使用两阶段提交来保证恢复一次,而是在每个 checkpoint 中将状态写入外部存储介质中。在恢复时,Flink 会选择最新的可用 checkpoint 来进行恢复。
Checkpoint 算法
Flink 中的 checkpoint 算法是基于 Chandy-Lamport 算法的。Chandy-Lamport 算法是一种基于消息传递的分布式快照算法,用于捕获分布式系统的全局状态。在 Chandy-Lamport 算法中,每个节点都会向其它节点发送 Marker 消息,并记录收到的 Marker 消息。当节点接收到 Marker 消息后,它会暂停消息传递,并生成一个快照,然后继续消息传递。
Flink 中的 checkpoint 算法是在 Chandy-Lamport 算法的基础上进行扩展的。在 Flink 中,CheckpointCoordinator 会向所有 TaskManager 发送 TriggerCheckpoint 消息,并等待所有 TaskManager 的响应。当 TaskManager 收到 TriggerCheckpoint 消息后,它会停止处理任务,并将状态写入外部存储介质中。然后,TaskManager 会向 CheckpointCoordinator 发送 AcknowledgeCheckpoint 消息,表示已经完成 checkpoint。
当所有 TaskManager 的 AcknowledgeCheckpoint 消息都到达后,CheckpointCoordinator 会将状态合并,并将结果保存到外部存储介质中。在恢复时,Flink 会选择最新的可用 checkpoint 进行恢复,并从外部存储介质中读取 checkpoint 状态并将其合并到内存中。
Flink 中的 checkpoint 算法是一种轻量级的算法,因为它只在任务开始时向所有 TaskManager 发送 TriggerCheckpoint 消息,并且只在任务结束时等待所有 TaskManager 的响应。因此,Flink 可以在运行时持续生成 checkpoint,而不会对任务的性能产生重大影响。
Checkpoint 并发控制
Flink 中的 checkpoint 算法是一种并发算法,因此需要对并发访问进行控制。在 Flink 中,CheckpointCoordinator 使用一个 ConcurrentLinkedQueue 来保存 TriggerCheckpoint 消息,该队列是线程安全的。当 TriggerCheckpoint 消息进入队列时,CheckpointCoordinator 会在队列中插入一个 BarrierMarker,用于标记 TriggerCheckpoint 消息之前的所有消息都已经被处理。当所有 TaskManager 的状态都被写入到外部存储介质中并且 AcknowledgeCheckpoint 消息被接收到时,CheckpointCoordinator 会删除 BarrierMarker 并继续处理 TriggerCheckpoint 消息。
在 Flink 中,每个任务都有一个单独的 checkpoint 算法实例,用于控制任务的 checkpoint。在任务开始时,checkpoint 算法会向其它任务发送 BarrierMarker 消息,并等待 BarrierMarker 消息的响应。当收到 BarrierMarker 消息时,任务会停止处理新消息,并将状态写入外部存储介质中。然后,任务会向发送 BarrierMarker 消息的任务发送 AcknowledgeBarrier 消息,表示已经完成 checkpoint。
当所有任务的 AcknowledgeBarrier 消息都到达后,checkpoint 算法会将状态合并,并将结果保存到外部存储介质中。在恢复时,Flink 会选择最新的可用 checkpoint 进行恢复,并从外部存储介质中读取 checkpoint 状态并将其合并到内存中。
Checkpoint 存储
Flink 支持多种外部存储介质,包括文件系统、HDFS、S3 和 RocksDB 等。在存储 checkpoint 时,Flink 会将 checkpoint 的 metadata 和状态分别保存到不同的文件中。
checkpoint metadata 包括 checkpoint 的 ID、触发时间、状态大小、状态文件列表等信息。状态文件列表是一个包含了状态的文件路径和大小的列表。
checkpoint 状态包括所有已经注册的状态,并且可以通过 KeyGroupPartitioner 将状态分成多个片段,以便在处理大型状态时进行分布式处理。每个状态片段都会被保存到单独的文件中。
在使用外部存储介质时,Flink 会使用 StateBackend 接口来访问外部存储介质。StateBackend 接口包括
如下方法:
- CheckpointStreamFactory#createCheckpointStateOutputStream(long, long)
: 用于创建用于写入 checkpoint 状态的输出流。
- CheckpointStreamFactory#createCheckpointStateInputStream(long, long)
: 用于创建用于读取 checkpoint 状态的输入流。
- CheckpointStreamFactory#createTaskOwnedStateOutputStream(long, long)
: 用于创建用于写入 task-owned 状态的输出流。
- CheckpointStreamFactory#createSharedStateOutputStream(String, long, long)
: 用于创建用于写入 shared 状态的输出流。
- CheckpointStreamFactory#supportsHighlyAvailableStreams()
: 用于指示该 StateBackend 是否支持高可用的流式存储。
对于支持高可用的流式存储介质(如 HDFS),Flink 会在写入 checkpoint 状态时使用可重入的文件锁来避免多个任务同时写入同一个文件。对于不支持高可用的流式存储介质(如本地文件系统),Flink 会在写入 checkpoint 状态时使用不同的文件名,并在恢复时按照先后顺序逐个读取文件。
Checkpoint 回退
在某些情况下,Flink 可能需要回退到先前的 checkpoint。例如,当出现故障时,Flink 可能会回退到最近的 checkpoint。在 Flink 中,CheckpointCoordinator 可以回退到先前的 checkpoint,并重新启动任务。Flink 支持两种类型的回退:任务级别的回退和全局级别的回退。
任务级别的回退是指只回退单个任务的状态,而全局级别的回退是指回退所有任务的状态。在任务级别的回退中,CheckpointCoordinator 将发送一条 CancelCheckpoint 消息来中止当前的 checkpoint,并将状态恢复到上一个 checkpoint 的状态。在全局级别的回退中,CheckpointCoordinator 将回退到指定的 checkpoint,并将状态恢复到该 checkpoint 的状态。
源码分析
Flink 的 checkpoint 算法的实现主要在 CheckpointCoordinator 和 CheckpointBarrierHandler 类中。
CheckpointCoordinator 是 Flink 中的核心组件之一,用于协调 checkpoint。它负责触发 checkpoint、接收任务的 checkpoint 状态、处理 checkpoint 的确认信息等。CheckpointCoordinator 还负责管理多个 checkpoint,并在任务失败时协调回退到先前的 checkpoint。CheckpointCoordinator 实现了 CheckpointCoordinatorInterface 接口,并通过调用 TriggerCheckpoint 方法触发 checkpoint。
CheckpointBarrierHandler 是一个用于处理 BarrierMarker 和 AcknowledgeBarrier 消息的线程。它负责在任务中发送 BarrierMarker 消息,处理收到的 BarrierMarker 消息,并向发送 BarrierMarker 消息的任务发送 AcknowledgeBarrier 消息。
在 Flink 中,TaskManager 用于执行任务。每个任务都有一个 CheckpointCoordinator 和一个 CheckpointBarrierHandler。当 TaskManager 收到 TriggerCheckpoint 消息时,CheckpointCoordinator 将触发 checkpoint,并将状态写入外部存储介质中。当任务收到 BarrierMarker 消息时,CheckpointBarrierHandler 将停止任务的消息处理,并将任务的状态写入外部存储介质中。在所有任务都收到了 BarrierMarker 消息并将状态写入外部存储介质中之后,CheckpointCoordinator 将开始确认 checkpoint,并在所有任务都确认了 checkpoint 之后将该 checkpoint 标记为已完成。
下面我们来分析 Flink 的 checkpoint 算法的源代码实现。
CheckpointCoordinator
CheckpointCoordinator 是 Flink 的核心组件之一,用于协调 checkpoint。它负责触发 checkpoint、接收任务的 checkpoint 状态、处理 checkpoint 的确认信息等。CheckpointCoordinator 还负责管理多个 checkpoint,并在任务失败时协调回退到先前的 checkpoint。
CheckpointCoordinator 还负责管理多个 checkpoint,并在任务失败时协调回退到先前的 checkpoint。
首先,我们来看一下 CheckpointCoordinator 的主要字段和构造函数:
public class CheckpointCoordinator implements CheckpointCoordinatorInterface { private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); private final Object lock = new Object(); private final JobID jobId; private final CheckpointIDCounter checkpointIDCounter; private final long checkpointTimeout; private final long minPauseBetweenCheckpoints; private final int maxConcurrentCheckpoints; private final int checkpointsCleanUpInterval; private final CheckpointFailureManager failureManager; private final ScheduledExecutorService timer; private final ScheduledFuture<?> periodicScheduler; private final Map<Long, PendingCheckpoint> pendingCheckpoints; private final Map<Long, CompletedCheckpoint> completedCheckpoints; private final Set<ExecutionAttemptID> tasksToCommitTo; private final Map<ExecutionAttemptID, Execution> executions; private final Map<JobVertexID, Integer> numSubtasksPerTask; private final CheckpointStorage coordinatorCheckpointStorage; private final CheckpointIDCounter completedCheckpointCounter; private final CompletedCheckpointStore completedCheckpointStore; private final CheckpointStatsTracker statsTracker; private volatile boolean shutdown; public CheckpointCoordinator( JobID jobId, CheckpointIDCounter checkpointIDCounter, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, int checkpointsCleanUpInterval, CheckpointFailureManager failureManager, ScheduledExecutorService timer, Map<ExecutionAttemptID, Execution> executions, Map<JobVertexID, Integer> numSubtasksPerTask, CheckpointStorage coordinatorCheckpointStorage, CompletedCheckpointStore completedCheckpointStore, CheckpointStatsTracker statsTracker) { this.jobId = Preconditions.checkNotNull(jobId); this.checkpointIDCounter = Preconditions.checkNotNull(checkpointIDCounter); this.checkpointTimeout = checkpointTimeout; this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointsCleanUpInterval = checkpointsCleanUpInterval; this.failureManager = Preconditions.checkNotNull(failureManager); this.timer = Preconditions.checkNotNull(timer); this.pendingCheckpoints = new HashMap<>(); this.completedCheckpoints = new HashMap<>(); this.tasksToCommitTo = new HashSet<>(); this.executions = Preconditions.checkNotNull(executions); this.numSubtasksPerTask = Preconditions.checkNotNull(numSubtasksPerTask); this.coordinatorCheckpointStorage = Preconditions.checkNotNull(coordinatorCheckpointStorage); this.completedCheckpointCounter = new CheckpointIDCounter(); this.completedCheckpointStore = Preconditions.checkNotNull(completedCheckpointStore); this.statsTracker = Preconditions.checkNotNull(statsTracker); this.shutdown = false; // 启动周期性调度器 this.periodicScheduler = timer.scheduleAtFixedRate( new Runnable() { @Override public void run() { try { triggerPeriodicCheckpoint(true); } catch (Exception e) { LOG.error("Exception while triggering periodic checkpoint", e); } } }, minPauseBetweenCheckpoints, minPauseBetweenCheckpoints, TimeUnit.MILLISECONDS); } ... }
我们可以看到,CheckpointCoordinator 中有许多字段,其中包括:
- jobId:该作业的 JobID。
- checkpointIDCounter:用于生成 checkpoint ID 的计数器。
- checkpointTimeout:checkpoint 的超时时间。
- minPauseBetweenCheckpoints:两次 checkpoint 之间的最小间隔时间。
- maxConcurrentCheckpoints:最大并发 checkpoint 数量。
- checkpointsCleanUpInterval:清理过期 checkpoint 的时间间隔。
- failureManager:用于处理 checkpoint 失败的 CheckpointFailureManager。
- timer:用于周期性执行 checkpoint 的 ScheduledExecutorService。
- periodicScheduler:周期性执行 checkpoint 的 ScheduledFuture。
- pendingCheckpoints:尚未完成的 checkpoint 集合。
- completedCheckpoints:已完成的 checkpoint 集合。
- tasksToCommitTo:需要将 checkpoint commit 给的任务集合。
- executions:当前正在运行的任务 Execution 的集合。
- numSubtasksPerTask:每个任务(JobVertex)的子任务数量。
- coordinatorCheckpointStorage:协调员 checkpoint 存储。
- completedCheckpointCounter:已完成 checkpoint 的计数器。
- completedCheckpointStore:已完成 checkpoint 的存储。
- statsTracker:Checkpoint 统计信息的跟踪器。
- shutdown:是否已关闭。
在构造函数中,我们可以看到 CheckpointCoordinator 启动了一个周期性调度器 periodicScheduler,用于周期性触发 checkpoint。这里传入了一个 Runnable 对象,其中的 triggerPeriodicCheckpoint(true) 方法用于触发周期性 checkpoint。这里的 true 参数表示该 checkpoint 是周期性的。
CheckpointCoordinator 主要有三个方法:
- triggerCheckpoint:触发一个 checkpoint。
- triggerPeriodicCheckpoint:触发周期性 checkpoint。
- receiveAcknowledgeMessage:接收 checkpoint 完成的确认消息。
其中,receiveAcknowledgeMessage 方法是接收 checkpoint 完成确认消息的方法,因此,我们先来看一下这个方法:
@Override public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String senderAddress) throws Exception { if (shutdown) { LOG.debug("Coordinator is shutdown. Discarding message."); return; } ExecutionAttemptID executionAttemptId = message.getTaskExecutionId(); // 将接收到的 AcknowledgeCheckpoint 消息的 Execution 标记为已确认 checkpoint final Execution execution = executions.get(executionAttemptId); if (execution != null) { execution.acknowledgeCheckpoint(message.getCheckpointId(), message.getSubtaskState(), senderAddress); } else { LOG.warn("Received AcknowledgeCheckpoint message for non-existing execution " + executionAttemptId); } PendingCheckpoint p = pendingCheckpoints.get(message.getCheckpointId()); if (p == null) { // checkpoint 已过期,直接返回 if (isCheckpointExpired(message.getCheckpointId())) { LOG.debug("Received late message for now expired checkpoint attempt " + message.getCheckpointId() + " : " + message); return; } else { // checkpoint 尚未过期,可能是因为 Master 节点和 Worker 节点网络不稳定导致消息延迟,将消息存储起来等待 checkpoint 完成 // 如果还没有 PendingCheckpoint 对象,则创建一个新的 PendingCheckpoint 对象并将 AcknowledgeCheckpoint 消息存储到其中。 // 由于 AcknowledgeCheckpoint 消息可能会先于 TriggerCheckpoint 消息到达,因此 PendingCheckpoint 对象可能还不存在。 LOG.debug("Received AcknowledgeCheckpoint message for an unknown checkpoint attempt " + message.getCheckpointId() + " : " + message); p = new PendingCheckpoint( jobId, message, numSubtasksPerTask, checkpointTimeout, checkpointProperties, alignmentTimeout);
// 将消息存储到 PendingCheckpoint 对象中 p.addSubtaskAcknowledgement(message.getTaskExecutionId(), message.getSubtaskState(), senderAddress); pendingCheckpoints.put(p.getCheckpointId(), p); } } else { // checkpoint 尚未完成,将 AcknowledgeCheckpoint 消息存储到对应的 PendingCheckpoint 对象中 // 添加一个子任务的确认消息 p.addSubtaskAcknowledgement(message.getTaskExecutionId(), message.getSubtaskState(), senderAddress); // 如果该 PendingCheckpoint 对象已收到所有子任务的确认消息,则将该 checkpoint 切换为完成状态 if (p.isFullyAcknowledged()) { pendingCheckpoints.remove(p.getCheckpointId()); try { finalizeCheckpoint(p); } catch (Throwable t) { // 处理 checkpoint 失败 handleCheckpointException(p, t); } } }
}
receiveAcknowledgeMessage 方法主要实现了接收 checkpoint 完成确认消息的逻辑。当接收到 AcknowledgeCheckpoint 消息时,首先根据 ExecutionAttemptID 找到对应的 Execution,然后将该 Execution 的 checkpoint 状态设置为已完成。 接着,如果该 checkpoint 已经完成或过期,就直接返回。如果该 checkpoint 尚未完成,就将 AcknowledgeCheckpoint 消息存储到对应的 PendingCheckpoint 对象中,并判断该 checkpoint 是否已经完成。如果该 PendingCheckpoint 对象已经收到所有子任务的确认消息,则将该 checkpoint 切换为完成状态,并调用 finalizeCheckpoint 方法进行后续处理。 finalizeCheckpoint 方法的具体实现如下: ```java private void finalizeCheckpoint(PendingCheckpoint checkpoint) throws Exception { // Checkpoints have to be locked while they are being confirmed. Otherwise, we risk that // messages get sent between the the confirmation of the checkpoint and the execution of // the subsequent commands, and the messages are not covered by the checkpoint. synchronized (checkpoint) { // 检查 checkpoint 是否过期 if (isCheckpointExpired(checkpoint.getCheckpointId())) { discardCheckpoint(checkpoint); return; } // 将 PendingCheckpoint 转换为 CompletedCheckpoint CompletedCheckpoint completed = checkpoint.finalizeCheckpoint(); // 将 CompletedCheckpoint 存储到 CompletedCheckpointStore 中 try { completedCheckpointStore.addCheckpoint(completed); } catch (Exception e) { // 存储失败,触发检查点回滚 handleCheckpointException(checkpoint, new Exception("Could not complete the checkpoint: " + e.getMessage(), e)); return; } // 将 CompletedCheckpoint 发送给需要将其 commit 的任务 for (TaskStateSnapshot subtaskState : completed.getTaskStates()) { tasksToCommitTo.add(new TaskStateSnapshotEntry(subtaskState)); } // 更新 checkpoint 计数器 completedCheckpointCounter.incrementAndGet(); // 清理过期的 checkpoint cleanUpCheckpoints(true); // 发送 CompletedCheckpoint 给外部 sink notifyCheckpointComplete(completed); } }
finalizeCheckpoint 方法主要实现了 checkpoint 完成时的后续处理逻辑。该方法首先检查 checkpoint 是否过期,如果已经过期,则将该 checkpoint 删除。
如果 checkpoint 没有过期,则将 PendingCheckpoint 对象转换为 CompletedCheckpoint 对象,并将其存储到 CompletedCheckpointStore 中。
CompletedCheckpointStore 是 Flink 内部用来存储已完成的 checkpoint 的存储系统,它默认使用 Flink 的内存状态后端进行存储。Flink 提供了多种状态后端,可以将已完成的 checkpoint 存储在内存、文件系统、HDFS、RocksDB 等不同的存储系统中。
在存储 CompletedCheckpoint 之后,finalizeCheckpoint 方法会将 CompletedCheckpoint 发送给需要将其 commit 的任务,并更新 checkpoint 计数器和清理过期的 checkpoint。最后,该方法会发送 CompletedCheckpoint 给外部 sink,通知 sink 已经完成该 checkpoint。
Checkpoint 的恢复和恢复流程
当 Flink 任务发生故障时,需要将其恢复到故障发生时的状态,以便于继续运行。Checkpoint 提供了一种实现任务恢复的机制。Flink 的恢复流程一般如下:
- 首先,Flink 会根据 CompletedCheckpointStore 中存储的 checkpoint 信息来恢复任务的状态。
- 然后,Flink 会将从 checkpoint 中恢复的状态重新分配给任务,以便于任务继续执行。
- 最后,Flink 会使用数据源中的数据来恢复任务的计算结果。
在 Flink 中,任务的恢复流程是由 JobManager 控制的。具体来说,当任务发生故障时,JobManager 会尝试从最近的一个 CompletedCheckpoint 中恢复任务状态。如果恢复成功,则任务可以从故障发生时的状态继续执行;否则,任务会被停止。
下面我们来看一下 Flink 的恢复流程具体是如何实现的。当任务启动时,JobManager 会创建一个 ExecutionGraph 对象,用来描述整个任务的执行图。ExecutionGraph 中包含了所有的 Execution 和它们之间的依赖关系。同时,JobManager 也会创建一个 ExecutionEnvironment 对象,用来管理整个任务的执行过程。
当任务发生故障时,JobManager 会根据 ExecutionGraph 和 ExecutionEnvironment 来尝试恢复任务的状态。具体来说,JobManager 会根据 ExecutionGraph 中存储的任务执行信息和 CompletedCheckpointStore 中存储的 checkpoint 信息,来创建需要恢复的 Execution 和它们的任务状态。然后,JobManager 会将这些任务状态重新分配给对应的 Execution,并使用数据源中的数据来恢复任务的计算结果。如果恢复成功,则任务可以从故障发生时的状态继续执行;否则,任务会被停止。
Flink 支持两种恢复模式:精确一次和至少一次。精确一次恢复模式要求所有的数据都要被恰好一次地处理,因此在恢复时需要保证所有的数据都只被处理一次,从而保证结果的准确性。至少一次恢复模式则不需要保证所有的数据只被处理一次,但可以在数据重复处理时快速恢复任务的状态。
在精确一次恢复模式下,Flink 会将已完成的 checkpoint 的所有状态都恢复到任务中。Flink 还提供了恢复状态时的两种模式:重放模式和合并模式。在重放模式下,Flink 会将所有的状态都恢复到任务中,然后重新计算所有的数据;在合并模式下,Flink 会将已完成的 checkpoint 中的状态合并到任务中,然后继续从故障发生点继续执行。重放模式需要更多的时间和资源来恢复状态,但它能够保证数据只被处理一次,从而保证结果的准确性。而合并模式则可以快速地恢复任务的状态,但可能会导致数据重复处理,从而影响结果的准确性。
在至少一次恢复模式下,Flink 会尝试恢复任务的状态,但不会保证数据只被处理一次。在这种模式下,Flink 会将已完成的 checkpoint 的状态合并到任务中,然后继续从故障发生点继续执行。当 Flink 接收到一条新的数据时,它会先检查该数据是否已经被处理过。如果该数据已经被处理过,则 Flink 会将该数据丢弃,否则 Flink 会将该数据处理并将其输出。由于数据可能会重复处理,因此在至少一次恢复模式下,Flink 不会保证结果的准确性,但可以快速地恢复任务的状态。
Checkpoint 的并发和容错性
Flink 的 checkpoint 机制具有很好的并发和容错性。当 Flink 处理大量数据时,它可以将数据分成多个分区,并将每个分区分配给不同的任务来处理。由于每个任务都可以在不同的线程中执行,因此可以利用多核 CPU 来提高处理速度。在这种情况下,每个任务都会在它自己的时间轴上执行,因此它们不需要相互协调。
同时,Flink 的 checkpoint 机制还具有很好的容错性。当任务发生故障时,Flink 可以通过恢复 checkpoint 来恢复任务的状态。如果一个任务执行失败,那么它的所有状态都会被恢复到 checkpoint 的状态,从而保证任务的执行结果的正确性。
总结
Checkpoint 是 Flink 中非常重要的一个机制,它可以保证 Flink 的任务在发生故障时能够快速恢复到故障发生前的状态。Checkpoint 不仅能够提高 Flink 的容错性,还能够提高任务的吞吐量和性能。在 checkpoint 机制中,Flink 通过周期性地创建 checkpoint 来保存任务的状态。Flink 支持两种恢复模式:精确一次恢复模式和至少一次恢复模式。精确一次恢复模式需要保证所有的数据只被处理一次,从而保证结果的准确性。至少一次恢复模式不需要保证所有的数据只被处理一次,但可以在数据重复处理时快速恢复任务的状态。在 checkpoint 机制中,Flink 还具有很好的并发性和容错性,它可以将数据分成多个分区,并将每个分区分配给不同的任务来处理,同时它还能够通过恢复 checkpoint 来恢复任务的状态。
下面是 checkpoint 的源码分析:
Flink 中 checkpoint 的实现主要是由 CheckpointCoordinator 和 CheckpointBarrierHandler 两个类实现的。
CheckpointCoordinator
CheckpointCoordinator 是 Flink 中 checkpoint 的核心类。它负责周期性地创建 checkpoint 并管理 checkpoint 的状态。CheckpointCoordinator 主要有以下几个重要的方法:
- triggerCheckpoint():触发一个新的 checkpoint。
- receiveAcknowledgeMessage():处理任务对 checkpoint 的确认消息。
- receiveDeclineMessage():处理任务对 checkpoint 的拒绝消息。
- receiveBarrier():处理任务发送的 checkpoint barrier。
- abortPendingCheckpoints():取消所有未完成的 checkpoint。
在 CheckpointCoordinator 中,Flink 使用了一个双重循环的算法来实现 checkpoint。该算法中,Flink 首先创建一个新的 checkpoint,然后向所有任务发送一个 checkpoint barrier。当任务收到 checkpoint barrier 后,它会停止处理新的数据,并将已处理的数据缓存起来,然后向 CheckpointCoordinator 发送确认消息。当 CheckpointCoordinator 收到所有任务的确认消息后,它会将 checkpoint 标记为完成,并将 checkpoint 的状态保存到持久化存储中。如果任务发生故障,Flink 可以通过恢复 checkpoint 来恢复任务的状态。
CheckpointBarrierHandler
CheckpointBarrierHandler 是 Flink 中处理 checkpoint barrier 的核心类。它负责接收和处理任务发送的 checkpoint barrier,然后将 checkpoint barrier 发送给下游任务。
CheckpointBarrierHandler 主要有以下几个重要的方法:
- processBarrier():处理任务发送的 checkpoint barrier。
- processSubtaskState():处理任务发送的子任务状态。
在 CheckpointBarrierHandler 中,Flink 使用了一种基于链表的数据结构来实现任务之间的通信。该数据结构中,每个任务都维护了一个链表,该链表包含了所有已接收但未处理的 checkpoint barrier。当任务处理完一个 checkpoint barrier 后,它会将该 checkpoint barrier 从链表中删除,并将下一个 checkpoint barrier 发送给下游任务。如果任务发生故障,Flink 可以通过恢复 checkpoint来恢复任务的状态,并使用链表来重放已接收但未处理的 checkpoint barrier。
Checkpoint 的配置
在 Flink 中,可以通过配置来调整 checkpoint 的行为和性能。以下是一些常用的配置参数:
- state.checkpoints.dir:指定 checkpoint 的保存目录。
- state.backend: 设置 state backend,支持的 state backend 有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend 等。
- state.checkpoints.num-retained:设置保留的最大 checkpoint 数量。
- state.checkpoints.interval:设置两个 checkpoint 之间的间隔时间。
- state.checkpoints.timeout:设置 checkpoint 的超时时间。
当然,这只是部分常用的配置参数,具体的配置参数可以参考 Flink 的官方文档。
总结
在分布式计算中,容错性和恢复能力是非常重要的,而 checkpoint 机制正是解决这个问题的关键。Flink 通过 checkpoint 机制实现了高效的容错和恢复,不仅可以提高任务的吞吐量和性能,还能够保证结果的准确性和一致性。本文通过对 Flink checkpoint 的源码分析,介绍了 checkpoint 的实现原理和核心类,以及常用的配置参数。
本文暂时没有评论,来添加一个吧(●'◡'●)