分享免费的编程资源和教程

网站首页 > 技术教程 正文

聊聊flink的KvStateRegistryGateway

goqiw 2024-09-04 18:49:38 技术教程 14 ℃ 0 评论

本文主要研究一下flink的KvStateRegistryGateway

KvStateRegistryGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java

public interface KvStateRegistryGateway {
?
 /**
 * Notifies that queryable state has been registered.
 *
 * @param jobId identifying the job for which to register a key value state
 * @param jobVertexId JobVertexID the KvState instance belongs to.
 * @param keyGroupRange Key group range the KvState instance belongs to.
 * @param registrationName Name under which the KvState has been registered.
 * @param kvStateId ID of the registered KvState instance.
 * @param kvStateServerAddress Server address where to find the KvState instance.
 * @return Future acknowledge if the key-value state has been registered
 */
 CompletableFuture<Acknowledge> notifyKvStateRegistered(
 final JobID jobId,
 final JobVertexID jobVertexId,
 final KeyGroupRange keyGroupRange,
 final String registrationName,
 final KvStateID kvStateId,
 final InetSocketAddress kvStateServerAddress);
?
 /**
 * Notifies that queryable state has been unregistered.
 *
 * @param jobId identifying the job for which to unregister a key value state
 * @param jobVertexId JobVertexID the KvState instance belongs to.
 * @param keyGroupRange Key group index the KvState instance belongs to.
 * @param registrationName Name under which the KvState has been registered.
 * @return Future acknowledge if the key-value state has been unregistered
 */
 CompletableFuture<Acknowledge> notifyKvStateUnregistered(
 final JobID jobId,
 final JobVertexID jobVertexId,
 final KeyGroupRange keyGroupRange,
 final String registrationName);
}
  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法

JobMaster

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
?
 /** Default names for Flink's distributed components. */
 public static final String JOB_MANAGER_NAME = "jobmanager";
 public static final String ARCHIVE_NAME = "archive";
?
 // ------------------------------------------------------------------------
?
 private final JobMasterConfiguration jobMasterConfiguration;
?
 private final ResourceID resourceId;
?
 private final JobGraph jobGraph;
?
 private final Time rpcTimeout;
?
 private final HighAvailabilityServices highAvailabilityServices;
?
 private final BlobServer blobServer;
?
 private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
?
 private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
?
 private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
?
 private final ScheduledExecutorService scheduledExecutorService;
?
 private final OnCompletionActions jobCompletionActions;
?
 private final FatalErrorHandler fatalErrorHandler;
?
 private final ClassLoader userCodeLoader;
?
 private final SlotPool slotPool;
?
 private final SlotPoolGateway slotPoolGateway;
?
 private final RestartStrategy restartStrategy;
?
 // --------- BackPressure --------
?
 private final BackPressureStatsTracker backPressureStatsTracker;
?
 // --------- ResourceManager --------
?
 private final LeaderRetrievalService resourceManagerLeaderRetriever;
?
 // --------- TaskManagers --------
?
 private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
?
 // -------- Mutable fields ---------
?
 private ExecutionGraph executionGraph;
?
 @Nullable
 private JobManagerJobStatusListener jobStatusListener;
?
 @Nullable
 private JobManagerJobMetricGroup jobManagerJobMetricGroup;
?
 @Nullable
 private String lastInternalSavepoint;
?
 @Nullable
 private ResourceManagerAddress resourceManagerAddress;
?
 @Nullable
 private ResourceManagerConnection resourceManagerConnection;
?
 @Nullable
 private EstablishedResourceManagerConnection establishedResourceManagerConnection;
?
 //......
?
 @Override
 public CompletableFuture<Acknowledge> notifyKvStateRegistered(
 final JobID jobId,
 final JobVertexID jobVertexId,
 final KeyGroupRange keyGroupRange,
 final String registrationName,
 final KvStateID kvStateId,
 final InetSocketAddress kvStateServerAddress) {
 if (jobGraph.getJobID().equals(jobId)) {
 if (log.isDebugEnabled()) {
 log.debug("Key value state registered for job {} under name {}.",
 jobGraph.getJobID(), registrationName);
 }
?
 try {
 executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
 jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
?
 return CompletableFuture.completedFuture(Acknowledge.get());
 } catch (Exception e) {
 log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
 return FutureUtils.completedExceptionally(e);
 }
 } else {
 if (log.isDebugEnabled()) {
 log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
 }
 return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 }
 }
?
 @Override
 public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
 JobID jobId,
 JobVertexID jobVertexId,
 KeyGroupRange keyGroupRange,
 String registrationName) {
 if (jobGraph.getJobID().equals(jobId)) {
 if (log.isDebugEnabled()) {
 log.debug("Key value state unregistered for job {} under name {}.",
 jobGraph.getJobID(), registrationName);
 }
?
 try {
 executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
 jobVertexId, keyGroupRange, registrationName);
?
 return CompletableFuture.completedFuture(Acknowledge.get());
 } catch (Exception e) {
 log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);
 return FutureUtils.completedExceptionally(e);
 }
 } else {
 if (log.isDebugEnabled()) {
 log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
 }
 return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 }
 }
?
 //......
}
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered

KvStateLocationRegistry

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java

public class KvStateLocationRegistry {
?
 /** JobID this coordinator belongs to. */
 private final JobID jobId;
?
 /** Job vertices for determining parallelism per key. */
 private final Map<JobVertexID, ExecutionJobVertex> jobVertices;
?
 /**
 * Location info keyed by registration name. The name needs to be unique
 * per JobID, i.e. two operators cannot register KvState with the same
 * name.
 */
 private final Map<String, KvStateLocation> lookupTable = new HashMap<>();
?
 /**
 * Creates the registry for the job.
 *
 * @param jobId JobID this coordinator belongs to.
 * @param jobVertices Job vertices map of all vertices of this job.
 */
 public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
 this.jobId = Preconditions.checkNotNull(jobId, "JobID");
 this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");
 }
?
 /**
 * Returns the {@link KvStateLocation} for the registered KvState instance
 * or <code>null</code> if no location information is available.
 *
 * @param registrationName Name under which the KvState instance is registered.
 * @return Location information or <code>null</code>.
 */
 public KvStateLocation getKvStateLocation(String registrationName) {
 return lookupTable.get(registrationName);
 }
?
 /**
 * Notifies the registry about a registered KvState instance.
 *
 * @param jobVertexId JobVertexID the KvState instance belongs to
 * @param keyGroupRange Key group range the KvState instance belongs to
 * @param registrationName Name under which the KvState has been registered
 * @param kvStateId ID of the registered KvState instance
 * @param kvStateServerAddress Server address where to find the KvState instance
 *
 * @throws IllegalArgumentException If JobVertexID does not belong to job
 * @throws IllegalArgumentException If state has been registered with same
 * name by another operator.
 * @throws IndexOutOfBoundsException If key group index is out of bounds.
 */
 public void notifyKvStateRegistered(
 JobVertexID jobVertexId,
 KeyGroupRange keyGroupRange,
 String registrationName,
 KvStateID kvStateId,
 InetSocketAddress kvStateServerAddress) {
?
 KvStateLocation location = lookupTable.get(registrationName);
?
 if (location == null) {
 // First registration for this operator, create the location info
 ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
?
 if (vertex != null) {
 int parallelism = vertex.getMaxParallelism();
 location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
 lookupTable.put(registrationName, location);
 } else {
 throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);
 }
 }
?
 // Duplicated name if vertex IDs don't match
 if (!location.getJobVertexId().equals(jobVertexId)) {
 IllegalStateException duplicate = new IllegalStateException(
 "Registration name clash. KvState with name '" + registrationName +
 "' has already been registered by another operator (" +
 location.getJobVertexId() + ").");
?
 ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
 if (vertex != null) {
 vertex.fail(new SuppressRestartsException(duplicate));
 }
?
 throw duplicate;
 }
 location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
 }
?
 /**
 * Notifies the registry about an unregistered KvState instance.
 *
 * @param jobVertexId JobVertexID the KvState instance belongs to
 * @param keyGroupRange Key group index the KvState instance belongs to
 * @param registrationName Name under which the KvState has been registered
 * @throws IllegalArgumentException If another operator registered the state instance
 * @throws IllegalArgumentException If the registration name is not known
 */
 public void notifyKvStateUnregistered(
 JobVertexID jobVertexId,
 KeyGroupRange keyGroupRange,
 String registrationName) {
?
 KvStateLocation location = lookupTable.get(registrationName);
?
 if (location != null) {
 // Duplicate name if vertex IDs don't match
 if (!location.getJobVertexId().equals(jobVertexId)) {
 throw new IllegalArgumentException("Another operator (" +
 location.getJobVertexId() + ") registered the KvState " +
 "under '" + registrationName + "'.");
 }
?
 location.unregisterKvState(keyGroupRange);
?
 if (location.getNumRegisteredKeyGroups() == 0) {
 lookupTable.remove(registrationName);
 }
 } else {
 throw new IllegalArgumentException("Unknown registration name '" +
 registrationName + "'. " + "Probably registration/unregistration race.");
 }
 }
?
}
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
  • notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
  • notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

小结

  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

doc

  • KvStateRegistryGateway

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表