无限站点建站系统济南智能网站建设费用
2026/6/7 0:37:49 网站建设 项目流程
无限站点建站系统,济南智能网站建设费用,免费招聘模板下载,网站容易被百度收录你好#xff0c;我是程序员贵哥。 今天我要与你分享的主题是“WordCount Beam Pipeline实战”。 前面我们已经学习了Beam的基础数据结构PCollection#xff0c;基本数据转换操作Transform#xff0c;还有Pipeline等技术。你一定跃跃欲试#xff0c;想要在实际项目中使用了…你好我是程序员贵哥。今天我要与你分享的主题是“WordCount Beam Pipeline实战”。前面我们已经学习了Beam的基础数据结构PCollection基本数据转换操作Transform还有Pipeline等技术。你一定跃跃欲试想要在实际项目中使用了。这一讲我们就一起学习一下怎样用Beam解决数据处理领域的教科书级案例——WordCount。WordCount你一定不陌生在第18讲中我们就已经接触过了。WordCount问题是起源于MapReduce时代就广泛使用的案例。顾名思义WordCount想要解决的问题是统计一个文本库中的词频。比如你可以用WordCount找出莎士比亚最喜欢使用的单词那么你的输入是莎士比亚全集输出就是每个单词出现的次数。举个例子比如这一段HAMLET ACT I SCENE I Elsinore. A platform before the castle. [FRANCISCO at his post. Enter to him BERNARDO] BERNARDO Whos there? FRANCISCO Nay, answer me: stand, and unfold yourself. BERNARDO Long live the king! FRANCISCO Bernardo? BERNARDO He. FRANCISCO You come most carefully upon your hour. BERNARDO Tis now struck twelve; get thee to bed, Francisco. FRANCISCO For this relief much thanks: tis bitter cold, And I am sick at heart. BERNARDO Have you had quiet guard? FRANCISCO Not a mouse stirring. BERNARDO Well, good night. If you do meet Horatio and Marcellus, The rivals of my watch, bid them make haste. FRANCISCO I think I hear them. Stand, ho! Whos there?在这个文本库中我们用“the: 数字”表示the出现了几次数字就是单词出现的次数。The: 3 And: 3 Him: 1 ...那么我们怎样在Beam中处理这个问题呢结合前面所学的知识我们可以把Pipeline分为这样几步用Pipeline IO读取文本库参考第27讲用Transform对文本进行分词和词频统计操作参考第25讲用Pipeline IO输出结果参考第27讲所有的步骤会被打包进一个Beam Pipeline参考第26讲。整个过程就如同下图所示。创建Pipeline首先我们先用代码创建一个PipelineOptions的实例。PipelineOptions能够让我们对Pipeline进行必要的配置比如配置执行程序的Runner和Runner所需要的参数。我们在这里先采用默认配置。记得第30讲中我们讲过Beam Pipeline可以配置在不同的Runner上跑比如SparkRunnerFlinkRunner。如果PipelineOptions不配置的情况下默认的就是DirectRunner也就是说会在本机执行。JavaPipelineOptions options PipelineOptionsFactory.create();接下来我们就可以用这个PipelineOptions去创建一个Pipeline了。一个Pipeline实例会去构建一个数据处理流水线所需要的数据处理DAG以及这个DAG所需要进行的Transform。JavaPipeline p Pipeline.create(options);应用Transform在上面的设计框图中我们可以看到我们需要进行好几种Transform。比如TextIO.Read、ParDo、Count去读取数据操纵数据以及存储数据。每一种Transform都需要一些参数并且会输出特定的数据。输入和输出往往会用PCollection的数据结构表示。简单回顾一下PCollection是Beam对于数据集的抽象表示任意大小、无序的数据甚至可以是无边界的Streaming数据。在我们这个WordCount例子中我们的Transform依次是这样几个。第一个Transform是先要用TextIO.Read来读取一个外部的莎士比亚文集生成一个PCollection包含这个文集里的所有文本行。这个PCollection中的每个元素都是文本中的一行。JavaPCollectionString lines p.apply(TextIO.read().from(gs://apache-beam-samples/shakespeare/*));第二个Transform我们要把文本行中的单词提取出来也就是做分词tokenization。这一步的输入PCollection中的每个元素都表示了一行。那么输出呢输出还是一个PCollection但是每个元素变成了单词。你可以留意一下我们这里做分词时用的正则表达式[^\p{L}]意思是非Unicode Letters所以它会按空格或者标点符号等把词分开。JavaPCollectionString words lines.apply(ExtractWords, FlatMapElements .into(TypeDescriptors.strings()) .via((String word) - Arrays.asList(word.split([^\\p{L}]))));第三个Transform我们就会使用Beam SDK提供的Count Transform。Count Transform会把任意一个PCollection转换成有key/value的组合每一个key是原来PCollection中的非重复的元素value则是元素出现的次数。JavaPCollectionKVString, Long counts words.apply(Count.StringperElement());第四个Transform会把刚才的key/value组成的PCollection转换成我们想要的输出格式方便我们输出词频。因为大部分的时候我们都是想要把输出存储到另一个文件里的。JavaPCollectionString formatted counts.apply(FormatResults, MapElements .into(TypeDescriptors.strings()) .via((KVString, Long wordCount) - wordCount.getKey() : wordCount.getValue()));最后一个Transform就是TextIO.Write用来把最终的PCollection写进文本文档。PCollection中的每一个元素都会被写为文本文件中的独立一行。运行Pipeline调用Pipeline的run()方法会把这个Pipeline所包含的Transform优化并放到你指定的Runner上执行。这里你需要注意run()方法是异步的如果你想要同步等待Pipeline的执行结果需要调用waitUntilFinish()方法。Javap.run().waitUntilFinish();改进代码的建议代码看起来都完成了不过我们还可以对代码再做些改进。编写独立的DoFn在上面的示例代码中我们把Transform都inline地写在了apply()方法里。Javalines.apply(ExtractWords, FlatMapElements .into(TypeDescriptors.strings()) .via((String word) - Arrays.asList(word.split([^\\p{L}]))));但是这样的写法在实际工作中很难维护。一是因为真实的业务逻辑往往比较复杂很难用一两行的代码写清楚强行写成inline的话可读性非常糟糕。二是因为这样inline的Transform几乎不可复用和测试。所以实际工作中我们更多地会去继承DoFn来实现我们的数据操作。这样每个DoFn我们都可以单独复用和测试。接下来我们看看怎样用用DoFn来实现刚才的分词Transform其实很简单我们继承DoFn作为我们的子类ExtracrtWordsFn然后把单词的拆分放在DoFn的processElement成员函数里。Javastatic class ExtractWordsFn extends DoFnString, String { private final Counter emptyLines Metrics.counter(ExtractWordsFn.class, emptyLines); private final Distribution lineLenDist Metrics.distribution(ExtractWordsFn.class, lineLenDistro); ProcessElement public void processElement(Element String element, OutputReceiverString receiver) { lineLenDist.update(element.length()); if (element.trim().isEmpty()) { emptyLines.inc(); // Split the line into words. String[] words element.split(“[^\\p{L}]”, -1); // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { receiver.output(word); } } } }创建PTransform合并相关联的TransformPTransform类可以用来整合一些相关联的Transform。比如你有一些数据处理的操作包含几个Transform或者ParDo你可以把他们封装在一个PTransform里。我们这里试着把上面的ExtractWordsFn和Count两个Transform封装起来。这样可以对这样一整套数据处理操作复用和测试。当定义PTransform的子类时它的输入输出类型就是一连串Transform的最初输入和最终输出。那么在这里输入类型是String输出类型是KVString, Long。就如同下面的代码一样。Java/** * A PTransform that converts a PCollection containing lines of text into a PCollection of * formatted word counts. * * pThis is a custom composite transform that bundles two transforms (ParDo and * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, * modular testing, and an improved monitoring experience. */ public static class CountWords extends PTransformPCollectionString, PCollectionKVString, Long { Override public PCollectionKVString, Long expand(PCollectionString lines) { // Convert lines of text into individual words. PCollectionString words lines.apply(ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollectionKVString, Long wordCounts words.apply(Count.perElement()); return wordCounts; } }参数化PipelineOptions刚才我们把输入文件的路径和输出文件的路径都写在了代码中。但实际工作中我们很少会这样做。因为这些文件的路径往往是运行时才会决定比如测试环境和生产环境会去操作不同的文件。在真正的实际工作中我们往往把它们作为命令行参数放在PipelineOptions里面。这就需要去继承PipelineOptions。比如我们创建一个WordCountOptions把输出文件作为参数output。Javapublic static interface WordCountOptions extends PipelineOptions { Description(Path of the file to write to) Required String getOutput(); void setOutput(String value); }完成上面两个方面的改进后我们最终的数据处理代码会是这个样子Javapublic static void main(String[] args) { WordCountOptions options PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class); Pipeline p Pipeline.create(options); p.apply(ReadLines, TextIO.read().from(options.getInputFile())) .apply(new CountWords()) .apply(ParDo.of(new FormatAsTextFn())) .apply(WriteCounts, TextIO.write().to(options.getOutput())); p.run().waitUntilFinish(); }DoFn和PTransform的单元测试如同第29讲“如何测试Pipeline”中所讲的那样我们用PAssert测试Beam Pipeline。具体在我们这个例子中我一再强调要把数据处理操作封装成DoFn和PTransform因为它们可以独立地进行测试。什么意思呢比如ExtractWordsFn我们想要测试它能把一个句子分拆出单词比如“ some input words 我们期待的输出是[“some”, “input”, “words”]。在测试中我们可以这样表达/** Example test that tests a specific {link DoFn}. */ Test public void testExtractWordsFn() throws Exception { DoFnTesterString, String extractWordsFn DoFnTester.of(new ExtractWordsFn()); Assert.assertThat( extractWordsFn.processBundle( some input words ), CoreMatchers.hasItems(some, input, words)); Assert.assertThat(extractWordsFn.processBundle( ), CoreMatchers.hasItems()); Assert.assertThat( extractWordsFn.processBundle( some , input, words), CoreMatchers.hasItems(some, input, words)); }

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询