序
本文主要研究一下flink的KvStateRegistryGateway
KvStateRegistryGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
public interface KvStateRegistryGateway { ? /** * Notifies that queryable state has been registered. * * @param jobId identifying the job for which to register a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group range the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @param kvStateId ID of the registered KvState instance. * @param kvStateServerAddress Server address where to find the KvState instance. * @return Future acknowledge if the key-value state has been registered */ CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress); ? /** * Notifies that queryable state has been unregistered. * * @param jobId identifying the job for which to unregister a key value state * @param jobVertexId JobVertexID the KvState instance belongs to. * @param keyGroupRange Key group index the KvState instance belongs to. * @param registrationName Name under which the KvState has been registered. * @return Future acknowledge if the key-value state has been unregistered */ CompletableFuture<Acknowledge> notifyKvStateUnregistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName); }
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
JobMaster
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway { ? /** Default names for Flink's distributed components. */ public static final String JOB_MANAGER_NAME = "jobmanager"; public static final String ARCHIVE_NAME = "archive"; ? // ------------------------------------------------------------------------ ? private final JobMasterConfiguration jobMasterConfiguration; ? private final ResourceID resourceId; ? private final JobGraph jobGraph; ? private final Time rpcTimeout; ? private final HighAvailabilityServices highAvailabilityServices; ? private final BlobServer blobServer; ? private final JobManagerJobMetricGroupFactory jobMetricGroupFactory; ? private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager; ? private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; ? private final ScheduledExecutorService scheduledExecutorService; ? private final OnCompletionActions jobCompletionActions; ? private final FatalErrorHandler fatalErrorHandler; ? private final ClassLoader userCodeLoader; ? private final SlotPool slotPool; ? private final SlotPoolGateway slotPoolGateway; ? private final RestartStrategy restartStrategy; ? // --------- BackPressure -------- ? private final BackPressureStatsTracker backPressureStatsTracker; ? // --------- ResourceManager -------- ? private final LeaderRetrievalService resourceManagerLeaderRetriever; ? // --------- TaskManagers -------- ? private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers; ? // -------- Mutable fields --------- ? private ExecutionGraph executionGraph; ? @Nullable private JobManagerJobStatusListener jobStatusListener; ? @Nullable private JobManagerJobMetricGroup jobManagerJobMetricGroup; ? @Nullable private String lastInternalSavepoint; ? @Nullable private ResourceManagerAddress resourceManagerAddress; ? @Nullable private ResourceManagerConnection resourceManagerConnection; ? @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection; ? //...... ? @Override public CompletableFuture<Acknowledge> notifyKvStateRegistered( final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug("Key value state registered for job {} under name {}.", jobGraph.getJobID(), registrationName); } ? try { executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); ? return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug("Notification about key-value state registration for unknown job {} received.", jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } ? @Override public CompletableFuture<Acknowledge> notifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { if (jobGraph.getJobID().equals(jobId)) { if (log.isDebugEnabled()) { log.debug("Key value state unregistered for job {} under name {}.", jobGraph.getJobID(), registrationName); } ? try { executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( jobVertexId, keyGroupRange, registrationName); ? return CompletableFuture.completedFuture(Acknowledge.get()); } catch (Exception e) { log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e); return FutureUtils.completedExceptionally(e); } } else { if (log.isDebugEnabled()) { log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId); } return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } } ? //...... }
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
public class KvStateLocationRegistry { ? /** JobID this coordinator belongs to. */ private final JobID jobId; ? /** Job vertices for determining parallelism per key. */ private final Map<JobVertexID, ExecutionJobVertex> jobVertices; ? /** * Location info keyed by registration name. The name needs to be unique * per JobID, i.e. two operators cannot register KvState with the same * name. */ private final Map<String, KvStateLocation> lookupTable = new HashMap<>(); ? /** * Creates the registry for the job. * * @param jobId JobID this coordinator belongs to. * @param jobVertices Job vertices map of all vertices of this job. */ public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) { this.jobId = Preconditions.checkNotNull(jobId, "JobID"); this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices"); } ? /** * Returns the {@link KvStateLocation} for the registered KvState instance * or <code>null</code> if no location information is available. * * @param registrationName Name under which the KvState instance is registered. * @return Location information or <code>null</code>. */ public KvStateLocation getKvStateLocation(String registrationName) { return lookupTable.get(registrationName); } ? /** * Notifies the registry about a registered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @param kvStateId ID of the registered KvState instance * @param kvStateServerAddress Server address where to find the KvState instance * * @throws IllegalArgumentException If JobVertexID does not belong to job * @throws IllegalArgumentException If state has been registered with same * name by another operator. * @throws IndexOutOfBoundsException If key group index is out of bounds. */ public void notifyKvStateRegistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) { ? KvStateLocation location = lookupTable.get(registrationName); ? if (location == null) { // First registration for this operator, create the location info ExecutionJobVertex vertex = jobVertices.get(jobVertexId); ? if (vertex != null) { int parallelism = vertex.getMaxParallelism(); location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName); lookupTable.put(registrationName, location); } else { throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId); } } ? // Duplicated name if vertex IDs don't match if (!location.getJobVertexId().equals(jobVertexId)) { IllegalStateException duplicate = new IllegalStateException( "Registration name clash. KvState with name '" + registrationName + "' has already been registered by another operator (" + location.getJobVertexId() + ")."); ? ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { vertex.fail(new SuppressRestartsException(duplicate)); } ? throw duplicate; } location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress); } ? /** * Notifies the registry about an unregistered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to * @param keyGroupRange Key group index the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @throws IllegalArgumentException If another operator registered the state instance * @throws IllegalArgumentException If the registration name is not known */ public void notifyKvStateUnregistered( JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { ? KvStateLocation location = lookupTable.get(registrationName); ? if (location != null) { // Duplicate name if vertex IDs don't match if (!location.getJobVertexId().equals(jobVertexId)) { throw new IllegalArgumentException("Another operator (" + location.getJobVertexId() + ") registered the KvState " + "under '" + registrationName + "'."); } ? location.unregisterKvState(keyGroupRange); ? if (location.getNumRegisteredKeyGroups() == 0) { lookupTable.remove(registrationName); } } else { throw new IllegalArgumentException("Unknown registration name '" + registrationName + "'. " + "Probably registration/unregistration race."); } } ? }
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
- notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
- notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
小结
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
doc
- KvStateRegistryGateway
本文暂时没有评论,来添加一个吧(●'◡'●)