大连响应式网站建设临沂个人做网站

张小明 2026/3/12 18:07:12
大连响应式网站建设,临沂个人做网站,无锡网站制作方案,湛江建设工程交易中心网站今天我们一起来了解 Flink 最后一种执行图#xff0c;ExecutionGraph 的执行过程。 基本概念 在阅读源码之前#xff0c;我们先来了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex: ExecutionJobVertex 是 ExecutionGraph 中的节点#xff0c;对应的是 JobGra…今天我们一起来了解 Flink 最后一种执行图ExecutionGraph 的执行过程。基本概念在阅读源码之前我们先来了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex:ExecutionJobVertex 是 ExecutionGraph 中的节点对应的是 JobGraph 中的 JobVertex。ExecutionVertex:每个 ExecutionJobVertex 都包含了一组 ExecutionVertexExecutionVertex 的数量就是节点对应的并行度。IntermediateResult:IntermediateResult 表示节点的输出结果与之对应的是 JobGraph 中的 IntermediateDataSet。IntermediateResultPartition:IntermediateResultPartition 是每个 ExecutionVertex 的输出。EdgeManager:EdgeManager 主要负责存储 ExecutionGraph 中所有之间的连接包括其并行度。Execution:Execution 可以认为是一次实际的运行尝试。每次执行时Flink 都会将ExecutionVertex 封装成一个 Execution并通过一个 ExecutionAttemptID 来做唯一标识。ExecutionGraph 生成过程了解了这些基本概念之后我们一起来看一下 ExecutionGraph 的具体生成过程。生成 ExecutionGraph 的代码入口是 DefaultExecutionGraphBuilder.build 方法。首先是获取一些基本信息包括 jobInformation、jobStatusChangedListeners 等。接下来就是创建一个 DefaultExecutionGraph 和生成执行计划。// create a new execution graph, if none exists so farfinalDefaultExecutionGraphexecutionGraphnewDefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,executionHistorySizeLimit,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore,isDynamicGraph,executionJobVertexFactory,jobGraph.getJobStatusHooks(),markPartitionFinishedStrategy,taskDeploymentDescriptorFactory,jobStatusChangedListeners,executionPlanSchedulingContext);try{executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwablet){log.warn(Cannot create plan for job,t);// give the graph an empty planexecutionGraph.setPlan(newJobPlanInfo.Plan(,,,newArrayList()));}下面就是两个比较核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。// topologically sort the job vertices and attach the graph to the existing oneListJobVertexsortedTopologyjobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology,jobManagerJobMetricGroup);这两个方法是先将 JobVertex 进行排序然后构建 ExecutionGraph 的拓扑图。getVerticesSortedTopologicallyFromSourcespublicListJobVertexgetVerticesSortedTopologicallyFromSources()throwsInvalidProgramException{// early out on empty listsif(this.taskVertices.isEmpty()){returnCollections.emptyList();}ListJobVertexsortednewArrayListJobVertex(this.taskVertices.size());SetJobVertexremainingnewLinkedHashSetJobVertex(this.taskVertices.values());// start by finding the vertices with no input edges// and the ones with disconnected inputs (that refer to some standalone data set){IteratorJobVertexiterremaining.iterator();while(iter.hasNext()){JobVertexvertexiter.next();if(vertex.isInputVertex()){sorted.add(vertex);iter.remove();}}}intstartNodePos0;// traverse from the nodes that were added until we found all elementswhile(!remaining.isEmpty()){// first check if we have more candidates to start traversing from. if not, then the// graph is cyclic, which is not permittedif(startNodePossorted.size()){thrownewInvalidProgramException(The job graph is cyclic.);}JobVertexcurrentsorted.get(startNodePos);addNodesThatHaveNoNewPredecessors(current,sorted,remaining);}returnsorted;}这段代码是将所有的节点进行排序先将所有的 Source 节点筛选出来然后再将剩余节点假如列表。这样就能构建出最终的拓扑图。attachJobGraphOverridepublicvoidattachJobGraph(ListJobVertexverticesToAttach,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug(Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.,verticesToAttach.size(),tasks.size(),intermediateResults.size());attachJobVertices(verticesToAttach,jobManagerJobMetricGroup);if(!isDynamic){initializeJobVertices(verticesToAttach);}// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopologyDefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategypartitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());}attachJobGraph 方法主要包含两步逻辑第一步是调用 attachJobVertices 方法创建 ExecutionJobVertex 实例第二步是调用 fromExecutionGraph 创建一些其他的核心对象。attachJobVerticesattachJobVertices 方法中就是遍历所有的 JobVertex然后利用 JobVertex 生成 ExecutionJobVertex。/** Attach job vertices without initializing them. */privatevoidattachJobVertices(ListJobVertextopologicallySorted,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{for(JobVertexjobVertex:topologicallySorted){if(jobVertex.isInputVertex()!jobVertex.isStoppable()){this.isStoppablefalse;}VertexParallelismInformationparallelismInfoparallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graphExecutionJobVertexejvexecutionJobVertexFactory.createExecutionJobVertex(this,jobVertex,parallelismInfo,coordinatorStore,jobManagerJobMetricGroup);ExecutionJobVertexpreviousTaskthis.tasks.putIfAbsent(jobVertex.getID(),ejv);if(previousTask!null){thrownewJobException(String.format(Encountered two job vertices with ID %s : previous[%s] / new[%s],jobVertex.getID(),ejv,previousTask));}this.verticesInCreationOrder.add(ejv);this.numJobVerticesTotal;}}initializeJobVertices在 DefaultExecutionGraph.initializeJobVertices 中是遍历了刚刚排好序的 JobVertex获取了 ExecutionJobVertex 之后调用了 ExecutionGraph.initializeJobVertex 方法。我们直接来看 ExecutionGraph.initializeJobVertex 的逻辑。defaultvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp)throwsJobException{initializeJobVertex(ejv,createTimestamp,VertexInputInfoComputationUtils.computeVertexInputInfos(ejv,getAllIntermediateResults()::get));}这里先是调用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法生成了 MapIntermediateDataSetID, JobVertexInputInfo jobVertexInputInfos。它表示的是每个 ExecutionVertex 消费上游 IntermediateResultPartition 的范围。这里有两种模式分别是 POINTWISE 点对点和 ALL_TO_ALL全对全在 POINTWISE 模式中会按照尽量均匀分布的方式处理。例如上游并发度是4下游并发度是2时那么前两个 IntermediateResultPartition 就会被第一个 ExecutionVertex 消费后两个 IntermediateResultPartition 就会被第二个 ExecutionVertex 消费。如果上游并发度是2下游是3时那么下游前两个 IntermediateResultPartition 会被第一个 ExecutionVertex 消费第三个 IntermediateResultPartition 则会被第二个 ExecutionVertex 消费。publicstaticJobVertexInputInfocomputeVertexInputInfoForPointwise(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();if(sourceCounttargetCount){for(intindex0;indextargetCount;index){intstartindex*sourceCount/targetCount;intend(index1)*sourceCount/targetCount;IndexRangepartitionRangenewIndexRange(start,end-1);IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(index,1,()-numOfSubpartitionsRetriever.apply(start),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(index,partitionRange,subpartitionRange));}}else{for(intpartitionNum0;partitionNumsourceCount;partitionNum){intstart(partitionNum*targetCountsourceCount-1)/sourceCount;intend((partitionNum1)*targetCountsourceCount-1)/sourceCount;intnumConsumersend-start;IndexRangepartitionRangenewIndexRange(partitionNum,partitionNum);// Variable used in lambda expression should be final or effectively finalfinalintfinalPartitionNumpartitionNum;for(intistart;iend;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,numConsumers,()-numOfSubpartitionsRetriever.apply(finalPartitionNum),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}}}returnnewJobVertexInputInfo(executionVertexInputInfos);}在 ALL_TO_ALL 模式中每个下游都会消费所有上游的数据。publicstaticJobVertexInputInfocomputeVertexInputInfoForAllToAll(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph,booleanisBroadcast,booleanisSingleSubpartitionContainsAllData){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();IndexRangepartitionRangenewIndexRange(0,sourceCount-1);for(inti0;itargetCount;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,targetCount,()-numOfSubpartitionsRetriever.apply(0),isDynamicGraph,isBroadcast,isSingleSubpartitionContainsAllData);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}returnnewJobVertexInputInfo(executionVertexInputInfos);}生成好了 jobVertexInputInfos 之后我们再回到 DefaultExecutionGraph.initializeJobVertex 方法中。OverridepublicvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp,MapIntermediateDataSetID,JobVertexInputInfojobVertexInputInfos)throwsJobException{checkNotNull(ejv);checkNotNull(jobVertexInputInfos);jobVertexInputInfos.forEach((resultId,info)-this.vertexInputInfoStore.put(ejv.getJobVertexId(),resultId,info));ejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),executionPlanSchedulingContext);ejv.connectToPredecessors(this.intermediateResults);for(IntermediateResultres:ejv.getProducedDataSets()){IntermediateResultpreviousDataSetthis.intermediateResults.putIfAbsent(res.getId(),res);if(previousDataSet!null){thrownewJobException(String.format(Encountered two intermediate data set with ID %s : previous[%s] / new[%s],res.getId(),res,previousDataSet));}}registerExecutionVerticesAndResultPartitionsFor(ejv);// enrich network memory.SlotSharingGroupslotSharingGroupejv.getSlotSharingGroup();if(areJobVerticesAllInitialized(slotSharingGroup)){SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup,this::getJobVertex,shuffleMaster);}}首先来看 ExecutionJobVertex.initialize 方法。这个方法主要是生成 IntermediateResult 和 ExecutionVertex。protectedvoidinitialize(intexecutionHistorySizeLimit,Durationtimeout,longcreateTimestamp,SubtaskAttemptNumberStoreinitialAttemptCounts,ExecutionPlanSchedulingContextexecutionPlanSchedulingContext)throwsJobException{checkState(parallelismInfo.getParallelism()0);checkState(!isInitialized());this.taskVerticesnewExecutionVertex[parallelismInfo.getParallelism()];this.inputsnewArrayList(jobVertex.getInputs().size());// create the intermediate resultsthis.producedDataSetsnewIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for(inti0;ijobVertex.getProducedDataSets().size();i){finalIntermediateDataSetresultjobVertex.getProducedDataSets().get(i);this.producedDataSets[i]newIntermediateResult(result,this,this.parallelismInfo.getParallelism(),result.getResultType(),executionPlanSchedulingContext);}// create all task verticesfor(inti0;ithis.parallelismInfo.getParallelism();i){ExecutionVertexvertexcreateExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,executionHistorySizeLimit,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResultir:this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!this.parallelismInfo.getParallelism()){thrownewRuntimeException(The intermediate results partitions were not correctly assigned.);}}// set up the input splits, if the vertex has anytry{SuppressWarnings(unchecked)InputSplitSourceInputSplitsplitSource(InputSplitSourceInputSplit)jobVertex.getInputSplitSource();if(splitSource!null){ThreadcurrentThreadThread.currentThread();ClassLoaderoldContextClassLoadercurrentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try{inputSplitssplitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits!null){splitAssignersplitSource.getInputSplitAssigner(inputSplits);}}finally{currentThread.setContextClassLoader(oldContextClassLoader);}}else{inputSplitsnull;}}catch(Throwablet){thrownewJobException(Creating the input splits caused an error: t.getMessage(),t);}}在创建 ExecutionVertex 时会创建 IntermediateResultPartition 和 Execution创建 Execution 时会设置 attemptNumber这个值默认是0如果 ExecutionVertex 是重新调度的那么 attemptNumber 会自增加1。ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 与 IntermediateResultPartition 的关联关系。这里设置关联关系也分成了点对点和全对全两种模式处理点对点模式需要计算 ExecutionVertex 对应的 IntermediateResultPartition index 的范围。两种模式最终都调用了 connectInternal 方法。/** Connect all execution vertices to all partitions. */privatestaticvoidconnectInternal(ListExecutionVertextaskVertices,ListIntermediateResultPartitionpartitions,ResultPartitionTyperesultPartitionType,EdgeManageredgeManager){checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());ConsumedPartitionGroupconsumedPartitionGroupcreateAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(),partitions,resultPartitionType,edgeManager);for(ExecutionVertexev:taskVertices){ev.addConsumedPartitionGroup(consumedPartitionGroup);}ListExecutionVertexIDconsumerVerticestaskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroupconsumerVertexGroupConsumerVertexGroup.fromMultipleVertices(consumerVertices,resultPartitionType);for(IntermediateResultPartitionpartition:partitions){partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);}这个方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 负责将 ExecutionVertex 到 IntermediateResultPartition 的关联关系保存在 EdgeManager.vertexConsumedPartitions 中。而 partition.addConsumers(consumerVertexGroup); 则负责将 IntermediateResultPartition 到 ExecutionVertex 的关系保存在 EdgeManager.partitionConsumers 中。总结通过本文我们了解了 Flink 是如何将 JobGraph 转换成 ExecutionGraph 的。其中涉及到的一些核心概念名称比较类似建议认真学习和理解透彻之后再研究其生成方法和对应关系也可以借助前文中 ExecutionGraph 示意图辅助学习。
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

建站之星破解版手机网站设计步骤及图解

在无线网络中有一种故障极具欺骗性:终端显示Wi-Fi已连接,信号强、速率高、无漫游,但所有业务访问失败,重连、换AP、重启终端都无效。这类问题90%不在射频也不在链路,而是在AC与AP的控制 / 数据平面关系被悄悄切断 一、故障现象:无线看起来一切正常但完全不能用 用户侧反…

张小明 2026/3/5 2:22:27 网站建设

北京国贸网站建设公司怎么上传做 好的网站

第一章:Open-AutoGLM社交动态整理近期,围绕开源项目 Open-AutoGLM 的社区讨论持续升温,开发者在多个平台分享了其在自动化自然语言处理任务中的实践成果。该项目凭借模块化设计和对多模态输入的灵活支持,吸引了广泛关注。核心功能…

张小明 2026/3/5 2:36:32 网站建设

纸箱 技术支持 东莞网站建设凡科送审平台学生不能登录

第一章:农业物联网中PHP设备认证的现状与挑战 在农业物联网(Agri-IoT)快速发展的背景下,大量传感器、执行器和边缘计算设备通过网络接入中央管理系统,实现环境监测、智能灌溉和病虫害预警等功能。PHP作为广泛应用的服务…

张小明 2026/3/5 2:25:08 网站建设

锐奇智能手机网站建设用php做网站上传图片的代码

第一章:多 Agent 系统的架构演进与趋势随着人工智能技术的不断突破,多 Agent 系统(Multi-Agent System, MAS)在分布式决策、智能协作和复杂环境建模中展现出强大的适应能力。其架构经历了从集中式控制到去中心化自治的演变&#x…

张小明 2026/3/5 2:21:04 网站建设

做网站在哪里租服务器网站留言板怎么做phpsql

还在为JIRA、Confluence、Bitbucket等Atlassian产品的许可证配置而头疼吗?传统的复杂设置方式让很多技术人员望而却步,现在通过这款开源工具,您可以轻松实现Atlassian产品的一键式配置,让繁琐的许可证管理变得简单高效。 【免费下…

张小明 2026/3/5 2:21:07 网站建设

专业企业网站制作做设计一般在那个网站找图

文章目录具体实现截图主要技术与实现手段关于我本系统开发思路java类核心代码部分展示结论源码lw获取/同行可拿货,招校园代理 :文章底部获取博主联系方式!具体实现截图 同行可拿货,招校园代理 ssm基于微信小程序的大学校园失物招领系统的设计与实现 …

张小明 2026/3/5 2:21:07 网站建设