暖通毕业设计代做网站上海公司建立网站吗
2026/4/17 8:26:50 网站建设 项目流程
暖通毕业设计代做网站,上海公司建立网站吗,wordpress 代码块样式,漂亮企业网站Flink 中有四种执行图#xff0c;分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。 在开始读代码之前#xff0c;我们先来简单介绍一下四种图之间的关系和区别。StreamGraph 是根据用户…Flink 中有四种执行图分别是 StreamGraph、JobGraph、ExecutionGraph 和 Physical Graph。今天我们来看下我们编写的 Flink 程序代码是如何生成 StreamGraph 的。在开始读代码之前我们先来简单介绍一下四种图之间的关系和区别。StreamGraph 是根据用户用 Stream API 编写的代码生成的图用来表示整个程序的拓扑结构。JobGraph 是由 StreamGraph 生成的它在 StreamGraph 的基础上对链化了部分算子将其合并成为一个节点减少数据在节点之间传输时序列化和反序列化这些消耗。ExecutionGraph 是由 JobGraph 生成的它的主要特点是并行将多并发的节点拆分。PhysicalGraph 是 ExecutionGraph 实际部署后的图它并不是一种数据结构。StreamExecutionEnvironmentOK了解了 Flink 四种执行图之后我们就正式开始源码探索了。首先从 StreamExecutionEnvironment 入手在编写 Flink 程序时它是必不可少的一个类。它提供了一系列方法来配置流处理程序的执行环境如并行度、Checkpoint 配置、时间属性等。本文我们主要关注 StreamGraph 的生成首先是数据流的入口即 Source 节点。在 StreamExecutionEnvironment 中有 addSource 和 fromSource 等方法它们用来定义从哪个数据源读取数据然后返回一个 DataStreamSource 继承自 DataStream得到 DataStream 之后它会在各个算子之间流转最终到 Sink 端输出。我们从 addSource 方法入手addSource 方法中主要做了三件事1、处理数据类型优先使用用户执行的数据类型也可以自动推断2、闭包清理使用户传入的 function 能被序列化并发布到分布式环境执行3、创建 DataStreamSource 并返回privateOUTDataStreamSourceOUTaddSource(finalSourceFunctionOUTfunction,finalStringsourceName,NullablefinalTypeInformationOUTtypeInfo,finalBoundednessboundedness){checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);TypeInformationOUTresolvedTypeInfogetTypeInfo(function,sourceName,SourceFunction.class,typeInfo);booleanisParallelfunctioninstanceofParallelSourceFunction;clean(function);finalStreamSourceOUT,?sourceOperatornewStreamSource(function);returnnewDataStreamSource(this,resolvedTypeInfo,sourceOperator,isParallel,sourceName,boundedness);}现在我们有了 DataStream 了那如何知道后续要进行哪些转换逻辑呢答案在 transformations 这个变量中它保存了后续所有的转换。protectedfinalListTransformation?transformationsnewArrayList();Transformation我们来看 Transformation 是如何生成和描述 DataStream 的转换流程的。以最常见的 map 方法为例。publicRSingleOutputStreamOperatorRmap(MapFunctionT,Rmapper,TypeInformationRoutputType){returntransform(Map,outputType,newStreamMap(clean(mapper)));}它调用了 transform 方法transform 又调用了 doTransform 方法。protectedRSingleOutputStreamOperatorRdoTransform(StringoperatorName,TypeInformationRoutTypeInfo,StreamOperatorFactoryRoperatorFactory){// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();OneInputTransformationT,RresultTransformnewOneInputTransformation(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism(),false);SuppressWarnings({unchecked,rawtypes})SingleOutputStreamOperatorRreturnStreamnewSingleOutputStreamOperator(environment,resultTransform);getExecutionEnvironment().addOperator(resultTransform);returnreturnStream;}在 doTransform 方法中就是创建 Transformation 和 SingleOutputStreamOperatorDataStream 的一个子类然后调用 addOperator 方法将 transform 存到 StreamExecutionEnviroment 中的 transformations 变量中。每个 Transformation 都有 id、name、parallelism 和 slotSharingGroup 等信息。其子类也记录有输入信息如 OneInputTransformation 和 TwoInputTransformation。StreamOperator我们在调用 map 方法时会传入一个自定义的处理函数它也会保存在 Transformation 中。在 Flink 中定义了 StreamOperator 方法来抽象这类处理函数。在 map 方法中它将我们传入的函数转成了 StreamMap它继承了 AbstractUdfStreamOperator同时实现了 OneInputStreamOperator 接口。StreamOperator 定义了对算子生命周期管理的函数。voidopen()throwsException;voidfinish()throwsException;voidclose()throwsException;OperatorSnapshotFuturessnapshotState(longcheckpointId,longtimestamp,CheckpointOptionscheckpointOptions,CheckpointStreamFactorystorageLocation)throwsException;voidinitializeState(StreamTaskStateInitializerstreamTaskStateManager)throwsException;OneInputStreamOperator 是 StreamOperator 的子接口。在其基础上增加了对具体元素的处理主要是对 key 的提取。defaultvoidsetKeyContextElement(StreamRecordINrecord)throwsException{setKeyContextElement1(record);}AbstractUdfStreamOperator 则是提供了对自定义函数生命周期管理的实现。Overridepublicvoidopen()throwsException{super.open();FunctionUtils.openFunction(userFunction,DefaultOpenContext.INSTANCE);}Overridepublicvoidfinish()throwsException{super.finish();if(userFunctioninstanceofSinkFunction){((SinkFunction?)userFunction).finish();}}Overridepublicvoidclose()throwsException{super.close();FunctionUtils.closeFunction(userFunction);}到这里我们就知道了 Flink 中 DataStream 是如何转换的。处理逻辑保存在 Transformation 中。下面我们来看一组 Transformation 是如何生成 StreamGraph 的。StreamGraph生成 StreamGraph 的入口在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#generateStreamGraph。在 generate 方法中会遍历所有 Transformation 并调用 transform 方法。在调用节点的 transform 方法之前会先确保它的输入节点都已经转换成功。目前定义了以下 Transformationstatic{SuppressWarnings(rawtypes)MapClass?extendsTransformation,TransformationTranslator?,?extendsTransformationtmpnewHashMap();tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator());tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator());tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator());tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator());tmp.put(SourceTransformation.class,newSourceTransformationTranslator());tmp.put(SinkTransformation.class,newSinkTransformationTranslator());tmp.put(GlobalCommitterTransform.class,newGlobalCommitterTransformationTranslator());tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator());tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator());tmp.put(UnionTransformation.class,newUnionTransformationTranslator());tmp.put(StubTransformation.class,newStubTransformationTranslator());tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator());tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator());tmp.put(ReduceTransformation.class,newReduceTransformationTranslator());tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator());tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator());tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator());tmp.put(CacheTransformation.class,newCacheTransformationTranslator());translatorMapCollections.unmodifiableMap(tmp);}Flink 会根据不同的 Transformation 类调用其 translateInternal 方法。在 translateInternal 方法中就会去添加节点和边。streamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());for(IntegerinputId:context.getStreamNodeIds(parentTransformations.get(0))){streamGraph.addEdge(inputId,transformationId,0);}在 addOperator 方法中它通过调用 addNode 来创建 StreamNode。protectedStreamNodeaddNode(IntegervertexID,NullableStringslotSharingGroup,NullableStringcoLocationGroup,Class?extendsTaskInvokablevertexClass,NullableStreamOperatorFactory?operatorFactory,StringoperatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException(Duplicate vertexID vertexID);}StreamNodevertexnewStreamNode(vertexID,slotSharingGroup,coLocationGroup,operatorFactory,operatorName,vertexClass);streamNodes.put(vertexID,vertex);isEmptyfalse;returnvertex;}在 addEdgeInternal 方法中对于 sideOutput 和 partition 这类虚拟节点会先解析出原始节点再建立实际的边。privatevoidaddEdgeInternal(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner?partitioner,ListStringoutputNames,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){intvirtualIdupStreamVertexID;upStreamVertexIDvirtualSideOutputNodes.get(virtualId).f0;if(outputTagnull){outputTagvirtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,null,outputTag,exchangeMode,intermediateDataSetId);}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){intvirtualIdupStreamVertexID;upStreamVertexIDvirtualPartitionNodes.get(virtualId).f0;if(partitionernull){partitionervirtualPartitionNodes.get(virtualId).f1;}exchangeModevirtualPartitionNodes.get(virtualId).f2;addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,exchangeMode,intermediateDataSetId);}else{createActualEdge(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputTag,exchangeMode,intermediateDataSetId);}}最后根据两个物理节点创建 StreamEdge 进行连接。privatevoidcreateActualEdge(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitioner?partitioner,OutputTagoutputTag,StreamExchangeModeexchangeMode,IntermediateDataSetIDintermediateDataSetId){StreamNodeupstreamNodegetStreamNode(upStreamVertexID);StreamNodedownstreamNodegetStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitionernullupstreamNode.getParallelism()downstreamNode.getParallelism()){partitionerdynamic?newForwardForUnspecifiedPartitioner():newForwardPartitioner();}elseif(partitionernull){partitionernewRebalancePartitionerObject();}if(partitionerinstanceofForwardPartitioner){if(upstreamNode.getParallelism()!downstreamNode.getParallelism()){if(partitionerinstanceofForwardForConsecutiveHashPartitioner){partitioner((ForwardForConsecutiveHashPartitioner?)partitioner).getHashPartitioner();}else{thrownewUnsupportedOperationException(Forward partitioning does not allow change of parallelism. Upstream operation: upstreamNode parallelism: upstreamNode.getParallelism(), downstream operation: downstreamNode parallelism: downstreamNode.getParallelism() You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.);}}}if(exchangeModenull){exchangeModeStreamExchangeMode.UNDEFINED;}/** * Just make sure that {link StreamEdge} connecting same nodes (for example as a result of * self unioning a {link DataStream}) are distinct and unique. Otherwise it would be * difficult on the {link StreamTask} to assign {link RecordWriter}s to correct {link * StreamEdge}. */intuniqueIdgetStreamEdges(upstreamNode.getId(),downstreamNode.getId()).size();StreamEdgeedgenewStreamEdge(upstreamNode,downstreamNode,typeNumber,partitioner,outputTag,exchangeMode,uniqueId,intermediateDataSetId);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}通过 StreamNode 和 StreamEdge就可以得到所有的节点和边也就是我们的 StreamGraph 就创建完成了。总结本文先介绍了 Flink 的四种执行图以及它们之间的关系。接着又通过源码探索了 StreamGraph 的生成逻辑Flink 将处理 逻辑保存在 Transformation 中又由 Transformation 生成了 StreamGraph。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询