大连响应式网站建设,临沂个人做网站,无锡网站制作方案,湛江建设工程交易中心网站今天我们一起来了解 Flink 最后一种执行图#xff0c;ExecutionGraph 的执行过程。
基本概念
在阅读源码之前#xff0c;我们先来了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex: ExecutionJobVertex 是 ExecutionGraph 中的节点#xff0c;对应的是 JobGra…今天我们一起来了解 Flink 最后一种执行图ExecutionGraph 的执行过程。基本概念在阅读源码之前我们先来了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex:ExecutionJobVertex 是 ExecutionGraph 中的节点对应的是 JobGraph 中的 JobVertex。ExecutionVertex:每个 ExecutionJobVertex 都包含了一组 ExecutionVertexExecutionVertex 的数量就是节点对应的并行度。IntermediateResult:IntermediateResult 表示节点的输出结果与之对应的是 JobGraph 中的 IntermediateDataSet。IntermediateResultPartition:IntermediateResultPartition 是每个 ExecutionVertex 的输出。EdgeManager:EdgeManager 主要负责存储 ExecutionGraph 中所有之间的连接包括其并行度。Execution:Execution 可以认为是一次实际的运行尝试。每次执行时Flink 都会将ExecutionVertex 封装成一个 Execution并通过一个 ExecutionAttemptID 来做唯一标识。ExecutionGraph 生成过程了解了这些基本概念之后我们一起来看一下 ExecutionGraph 的具体生成过程。生成 ExecutionGraph 的代码入口是 DefaultExecutionGraphBuilder.build 方法。首先是获取一些基本信息包括 jobInformation、jobStatusChangedListeners 等。接下来就是创建一个 DefaultExecutionGraph 和生成执行计划。// create a new execution graph, if none exists so farfinalDefaultExecutionGraphexecutionGraphnewDefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,executionHistorySizeLimit,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore,isDynamicGraph,executionJobVertexFactory,jobGraph.getJobStatusHooks(),markPartitionFinishedStrategy,taskDeploymentDescriptorFactory,jobStatusChangedListeners,executionPlanSchedulingContext);try{executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwablet){log.warn(Cannot create plan for job,t);// give the graph an empty planexecutionGraph.setPlan(newJobPlanInfo.Plan(,,,newArrayList()));}下面就是两个比较核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。// topologically sort the job vertices and attach the graph to the existing oneListJobVertexsortedTopologyjobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology,jobManagerJobMetricGroup);这两个方法是先将 JobVertex 进行排序然后构建 ExecutionGraph 的拓扑图。getVerticesSortedTopologicallyFromSourcespublicListJobVertexgetVerticesSortedTopologicallyFromSources()throwsInvalidProgramException{// early out on empty listsif(this.taskVertices.isEmpty()){returnCollections.emptyList();}ListJobVertexsortednewArrayListJobVertex(this.taskVertices.size());SetJobVertexremainingnewLinkedHashSetJobVertex(this.taskVertices.values());// start by finding the vertices with no input edges// and the ones with disconnected inputs (that refer to some standalone data set){IteratorJobVertexiterremaining.iterator();while(iter.hasNext()){JobVertexvertexiter.next();if(vertex.isInputVertex()){sorted.add(vertex);iter.remove();}}}intstartNodePos0;// traverse from the nodes that were added until we found all elementswhile(!remaining.isEmpty()){// first check if we have more candidates to start traversing from. if not, then the// graph is cyclic, which is not permittedif(startNodePossorted.size()){thrownewInvalidProgramException(The job graph is cyclic.);}JobVertexcurrentsorted.get(startNodePos);addNodesThatHaveNoNewPredecessors(current,sorted,remaining);}returnsorted;}这段代码是将所有的节点进行排序先将所有的 Source 节点筛选出来然后再将剩余节点假如列表。这样就能构建出最终的拓扑图。attachJobGraphOverridepublicvoidattachJobGraph(ListJobVertexverticesToAttach,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug(Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.,verticesToAttach.size(),tasks.size(),intermediateResults.size());attachJobVertices(verticesToAttach,jobManagerJobMetricGroup);if(!isDynamic){initializeJobVertices(verticesToAttach);}// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopologyDefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategypartitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());}attachJobGraph 方法主要包含两步逻辑第一步是调用 attachJobVertices 方法创建 ExecutionJobVertex 实例第二步是调用 fromExecutionGraph 创建一些其他的核心对象。attachJobVerticesattachJobVertices 方法中就是遍历所有的 JobVertex然后利用 JobVertex 生成 ExecutionJobVertex。/** Attach job vertices without initializing them. */privatevoidattachJobVertices(ListJobVertextopologicallySorted,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{for(JobVertexjobVertex:topologicallySorted){if(jobVertex.isInputVertex()!jobVertex.isStoppable()){this.isStoppablefalse;}VertexParallelismInformationparallelismInfoparallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graphExecutionJobVertexejvexecutionJobVertexFactory.createExecutionJobVertex(this,jobVertex,parallelismInfo,coordinatorStore,jobManagerJobMetricGroup);ExecutionJobVertexpreviousTaskthis.tasks.putIfAbsent(jobVertex.getID(),ejv);if(previousTask!null){thrownewJobException(String.format(Encountered two job vertices with ID %s : previous[%s] / new[%s],jobVertex.getID(),ejv,previousTask));}this.verticesInCreationOrder.add(ejv);this.numJobVerticesTotal;}}initializeJobVertices在 DefaultExecutionGraph.initializeJobVertices 中是遍历了刚刚排好序的 JobVertex获取了 ExecutionJobVertex 之后调用了 ExecutionGraph.initializeJobVertex 方法。我们直接来看 ExecutionGraph.initializeJobVertex 的逻辑。defaultvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp)throwsJobException{initializeJobVertex(ejv,createTimestamp,VertexInputInfoComputationUtils.computeVertexInputInfos(ejv,getAllIntermediateResults()::get));}这里先是调用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法生成了 MapIntermediateDataSetID, JobVertexInputInfo jobVertexInputInfos。它表示的是每个 ExecutionVertex 消费上游 IntermediateResultPartition 的范围。这里有两种模式分别是 POINTWISE 点对点和 ALL_TO_ALL全对全在 POINTWISE 模式中会按照尽量均匀分布的方式处理。例如上游并发度是4下游并发度是2时那么前两个 IntermediateResultPartition 就会被第一个 ExecutionVertex 消费后两个 IntermediateResultPartition 就会被第二个 ExecutionVertex 消费。如果上游并发度是2下游是3时那么下游前两个 IntermediateResultPartition 会被第一个 ExecutionVertex 消费第三个 IntermediateResultPartition 则会被第二个 ExecutionVertex 消费。publicstaticJobVertexInputInfocomputeVertexInputInfoForPointwise(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();if(sourceCounttargetCount){for(intindex0;indextargetCount;index){intstartindex*sourceCount/targetCount;intend(index1)*sourceCount/targetCount;IndexRangepartitionRangenewIndexRange(start,end-1);IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(index,1,()-numOfSubpartitionsRetriever.apply(start),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(index,partitionRange,subpartitionRange));}}else{for(intpartitionNum0;partitionNumsourceCount;partitionNum){intstart(partitionNum*targetCountsourceCount-1)/sourceCount;intend((partitionNum1)*targetCountsourceCount-1)/sourceCount;intnumConsumersend-start;IndexRangepartitionRangenewIndexRange(partitionNum,partitionNum);// Variable used in lambda expression should be final or effectively finalfinalintfinalPartitionNumpartitionNum;for(intistart;iend;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,numConsumers,()-numOfSubpartitionsRetriever.apply(finalPartitionNum),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}}}returnnewJobVertexInputInfo(executionVertexInputInfos);}在 ALL_TO_ALL 模式中每个下游都会消费所有上游的数据。publicstaticJobVertexInputInfocomputeVertexInputInfoForAllToAll(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph,booleanisBroadcast,booleanisSingleSubpartitionContainsAllData){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();IndexRangepartitionRangenewIndexRange(0,sourceCount-1);for(inti0;itargetCount;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,targetCount,()-numOfSubpartitionsRetriever.apply(0),isDynamicGraph,isBroadcast,isSingleSubpartitionContainsAllData);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}returnnewJobVertexInputInfo(executionVertexInputInfos);}生成好了 jobVertexInputInfos 之后我们再回到 DefaultExecutionGraph.initializeJobVertex 方法中。OverridepublicvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp,MapIntermediateDataSetID,JobVertexInputInfojobVertexInputInfos)throwsJobException{checkNotNull(ejv);checkNotNull(jobVertexInputInfos);jobVertexInputInfos.forEach((resultId,info)-this.vertexInputInfoStore.put(ejv.getJobVertexId(),resultId,info));ejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),executionPlanSchedulingContext);ejv.connectToPredecessors(this.intermediateResults);for(IntermediateResultres:ejv.getProducedDataSets()){IntermediateResultpreviousDataSetthis.intermediateResults.putIfAbsent(res.getId(),res);if(previousDataSet!null){thrownewJobException(String.format(Encountered two intermediate data set with ID %s : previous[%s] / new[%s],res.getId(),res,previousDataSet));}}registerExecutionVerticesAndResultPartitionsFor(ejv);// enrich network memory.SlotSharingGroupslotSharingGroupejv.getSlotSharingGroup();if(areJobVerticesAllInitialized(slotSharingGroup)){SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup,this::getJobVertex,shuffleMaster);}}首先来看 ExecutionJobVertex.initialize 方法。这个方法主要是生成 IntermediateResult 和 ExecutionVertex。protectedvoidinitialize(intexecutionHistorySizeLimit,Durationtimeout,longcreateTimestamp,SubtaskAttemptNumberStoreinitialAttemptCounts,ExecutionPlanSchedulingContextexecutionPlanSchedulingContext)throwsJobException{checkState(parallelismInfo.getParallelism()0);checkState(!isInitialized());this.taskVerticesnewExecutionVertex[parallelismInfo.getParallelism()];this.inputsnewArrayList(jobVertex.getInputs().size());// create the intermediate resultsthis.producedDataSetsnewIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for(inti0;ijobVertex.getProducedDataSets().size();i){finalIntermediateDataSetresultjobVertex.getProducedDataSets().get(i);this.producedDataSets[i]newIntermediateResult(result,this,this.parallelismInfo.getParallelism(),result.getResultType(),executionPlanSchedulingContext);}// create all task verticesfor(inti0;ithis.parallelismInfo.getParallelism();i){ExecutionVertexvertexcreateExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,executionHistorySizeLimit,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResultir:this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!this.parallelismInfo.getParallelism()){thrownewRuntimeException(The intermediate results partitions were not correctly assigned.);}}// set up the input splits, if the vertex has anytry{SuppressWarnings(unchecked)InputSplitSourceInputSplitsplitSource(InputSplitSourceInputSplit)jobVertex.getInputSplitSource();if(splitSource!null){ThreadcurrentThreadThread.currentThread();ClassLoaderoldContextClassLoadercurrentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try{inputSplitssplitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits!null){splitAssignersplitSource.getInputSplitAssigner(inputSplits);}}finally{currentThread.setContextClassLoader(oldContextClassLoader);}}else{inputSplitsnull;}}catch(Throwablet){thrownewJobException(Creating the input splits caused an error: t.getMessage(),t);}}在创建 ExecutionVertex 时会创建 IntermediateResultPartition 和 Execution创建 Execution 时会设置 attemptNumber这个值默认是0如果 ExecutionVertex 是重新调度的那么 attemptNumber 会自增加1。ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 与 IntermediateResultPartition 的关联关系。这里设置关联关系也分成了点对点和全对全两种模式处理点对点模式需要计算 ExecutionVertex 对应的 IntermediateResultPartition index 的范围。两种模式最终都调用了 connectInternal 方法。/** Connect all execution vertices to all partitions. */privatestaticvoidconnectInternal(ListExecutionVertextaskVertices,ListIntermediateResultPartitionpartitions,ResultPartitionTyperesultPartitionType,EdgeManageredgeManager){checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());ConsumedPartitionGroupconsumedPartitionGroupcreateAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(),partitions,resultPartitionType,edgeManager);for(ExecutionVertexev:taskVertices){ev.addConsumedPartitionGroup(consumedPartitionGroup);}ListExecutionVertexIDconsumerVerticestaskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroupconsumerVertexGroupConsumerVertexGroup.fromMultipleVertices(consumerVertices,resultPartitionType);for(IntermediateResultPartitionpartition:partitions){partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);}这个方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 负责将 ExecutionVertex 到 IntermediateResultPartition 的关联关系保存在 EdgeManager.vertexConsumedPartitions 中。而 partition.addConsumers(consumerVertexGroup); 则负责将 IntermediateResultPartition 到 ExecutionVertex 的关系保存在 EdgeManager.partitionConsumers 中。总结通过本文我们了解了 Flink 是如何将 JobGraph 转换成 ExecutionGraph 的。其中涉及到的一些核心概念名称比较类似建议认真学习和理解透彻之后再研究其生成方法和对应关系也可以借助前文中 ExecutionGraph 示意图辅助学习。