网站开发实例百度云网站建设+深圳+凡科
2026/4/17 2:08:49 网站建设 项目流程
网站开发实例百度云,网站建设+深圳+凡科,网站建设推广谷得网络,wordpress制作相册原文#xff1a;towardsdatascience.com/mastering-data-streaming-in-python-a88d4b3abf8b 在本文中#xff0c;我将讨论数据工程师在设计流数据管道时可能遇到的关键挑战。我们将探讨用例场景#xff0c;提供 Python 代码示例#xff0c;讨论使用流式框架进行的窗口计算towardsdatascience.com/mastering-data-streaming-in-python-a88d4b3abf8b在本文中我将讨论数据工程师在设计流数据管道时可能遇到的关键挑战。我们将探讨用例场景提供 Python 代码示例讨论使用流式框架进行的窗口计算并分享与这些主题相关的最佳实践。在许多应用中访问实时和持续更新的数据至关重要。欺诈检测、客户流失预防和推荐是流式处理的最佳候选者。这些数据管道实时处理来自各种源到多个目标目的地的数据捕捉事件的发生并使它们的转换、丰富和分析成为可能。流数据管道在我之前的一篇文章中我描述了最常见的数据管道设计模式和何时使用它们 [1]。数据管道设计模式数据管道是一系列数据处理步骤的序列其中每个阶段的输出成为下一个阶段的输入创建一个逻辑数据流。当数据在两点之间被处理时就存在数据管道例如从源到目的地。数据管道的三个关键组件是源、处理步骤和目的地。例如从外部 API源提取的数据可以加载到数据仓库目的地这说明了源和目的地不同的常见场景。在流式处理中源通常是发布者服务而目的地是消费者– 例如应用程序或另一个处理数据的端点。这些数据通常通过窗口计算进行转换。一个很好的例子是定义在最后事件之后的非活动期间的会话窗口例如 Google Analytics 4 等。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/05ebed7b97a127a78e91f09e48399498.png概念性数据管道设计。图片由作者提供实际应用示例考虑以下使用 Python、Kafka 和 Faust 构建的简单流数据处理应用程序示例。高级应用逻辑如下API 服务app/app_main.py允许将有效的用户参与事件 POST 到 Kafka 的producer主题。这些事件可以来自网站、移动应用程序或由其他服务如某种类型的数据发布者发送。{event_type:page_view,user_id:e659e3e7-22e1-4a6b,action:alternative_handset,timestamp:2024-06-27T15:43:43.315342,metadata:{session_id:4b481fd1-9973-4498-89fb,page:/search,item_id:05efee91,user_agent:Opera/8.81.(X11; Linux x86_64; hi-IN) Presto/2.9.181 Version/12.00}}事件验证可以通过pydantic进行并接受具有有效类型和操作等的事件 [2]我们的应用程序不断从consumer主题消费处理过的事件并将它们发送到WebSocket以便可视化实时处理。Python 数据工程师数据处理服务从producer主题消费原始事件然后每 10 秒应用一个窗口计算滚动表并将按user分组的聚合结果发送到consumer主题。这可能是一种流式框架如kafka-streams或faust。这不断处理的数据流可以立即通过 API 服务在localhost:8000/monitor上可视化。例如我们可以每 10 秒处理一次原始用户参与事件根据每个用户的简单事件计数生成用户排行榜。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8b110767140eead76ebd39d44d7fd75e.png流式排行榜应用示例。图片由作者提供。importosimportjsonimportloggingimportasyncioimportuvicornfromcontextlibimportasynccontextmanagerfromfastapiimportFastAPI,WebSocket,HTTPExceptionfromfastapi.staticfilesimportStaticFilesfromaiokafkaimportAIOKafkaConsumer,AIOKafkaProducerfromapp.modelsimportEventfrom.httpimportevents,usersfromdotenvimportload_dotenv load_dotenv()logging.basicConfig(levellogging.INFO)# kafka_brokers os.getenv(REDPANDA_BROKERS)kafka_brokers(redpanda-0:9092ifos.getenv(RUNTIME_ENVIRONMENT)DOCKERelselocalhost:19092)consumer_topicos.getenv(CONSUMER_TOPIC)producer_topicos.getenv(PRODUCER_TOPIC)error_topicos.getenv(ERROR_TOPIC)defkafka_json_deserializer(serialized):returnjson.loads(serialized)asynccontextmanagerasyncdefstartup(app):app.producerAIOKafkaProducer(bootstrap_servers[kafka_brokers],)awaitapp.producer.start()app.consumerAIOKafkaConsumer(consumer_topic,# agg-events,# group_iddemo-group,# looploop,bootstrap_servers[kafka_brokers],enable_auto_commitTrue,auto_commit_interval_ms1000,# commit every secondauto_offset_resetearliest,# If committed offset not found, start from beginningvalue_deserializerkafka_json_deserializer,)awaitapp.consumer.start()yieldappFastAPI(lifespanstartup)app.mount(/static,StaticFiles(directorystatic),namestatic)app.include_router(events.router)app.include_router(users.router)# WebSocket endpointapp.websocket(/ws)asyncdefwebsocket_endpoint(websocket:WebSocket):awaitwebsocket.accept()asyncdefsend_message_to_websocket(msg):textstr(msg.value)awaitwebsocket.send_text(text)asyncdefconsume_from_topic(topic,callback):print(fConsuming from{topic})asyncformsginapp.consumer:print(fReceived message:{msg.value})awaitcallback(msg)# Start consumingasyncio.create_task(consume_from_topic(consumer_topic,send_message_to_websocket))# Keep the connection openwhileTrue:awaitasyncio.sleep(3)app.post(/track)asyncdefsend_event_to_topic(event:Event):try:dataevent.model_dump_json()datadata.encode()# # Validate the presence of required fields# We could do something like this but Pydantic will do# everything for us.# if user_id not in data or action not in data:# raise HTTPException(# status_code422, detailIncomplete data provided)user_idevent.user_id# Send filename to Redpandaawaitapp.producer.send(producer_topic,data)# Returning a confirmation messagereturn{message:User data submitted successfully!,user_data:{user_id:user_id}}exceptHTTPExceptionase:# Re-raise HTTPException to return the specified# status code and detailprint(e)raiseeexceptExceptionase:# Handle other unexpected exceptions and return a# 500 Internal Server Errorprint(e)raiseHTTPException(status_code500,detailfAn error occurred:{str(e)})if__name____main__:uvicorn.run(app.app_main:app,host0.0.0.0,port8000)# Run:# uvicorn app.app_main:app --reload --host 0.0.0.0 --port 8000# python -m app.app_main api -l info流处理服务的代码可以在下面的部分找到。Kafka、Kinesis、RabbitMQ 以及其他流处理工具让我们来看看在过去几年中证明自己最有用的流行数据流平台和框架。Apache Spark– 一个用于大规模分析和复杂数据转换的分布式数据计算框架。Apache Kafka– 一个具有分布式消息系统的实时数据管道工具用于应用程序。它使用发布-订阅模型其中生产者向主题发送数据消费者从这些主题中拉取数据。每个主题都被分割成分区这些分区在不同的服务器上复制以提高可用性和平衡负载。此外Kafka 具有内置的容错功能因此你可以为每个主题设置复制因子和所需的同步副本ISR数量。这意味着即使某些服务器宕机数据仍然可访问。AWS Kinesis是一个用于分析和应用的实时流平台。我之前在这里写过关于它的内容 [3]。使用 Redshift Serverless 和 Kinesis 构建流数据管道Google Cloud Dataflow– Google 的实时事件处理和分析管道流平台。Apache Flink– 一个专为低延迟数据处理设计的分布式流数据平台。RabbitMQ是一个开源的消息代理它促进了应用程序之间的通信。它使用基于高级消息队列协议AMQP的排队系统允许你高效地发送、接收和路由消息。RabbitMQ 有助于解耦应用程序组件使它们能够独立工作并轻松扩展。Kafka 是我最喜欢的分布式流平台之一它允许你发布和订阅数据流实时处理它们并可靠地存储它们。它最初是为 Java 构建的但现在也适用于 Python 开发者kafka-python。我喜欢它的一个特点是内置的窗口方法这简化了会话计算。例如使用faust-streaming框架这可以轻松实现。考虑以下代码。它展示了我们的应用程序如何为每个用户计算窗口聚合importosimportrandomfromdatetimeimportdatetime,timedeltaimportfaust SINKos.getenv(CONSUMER_TOPIC)TOPICos.getenv(PRODUCER_TOPIC)BROKER(redpanda-0:9092ifos.getenv(RUNTIME_ENVIRONMENT)DOCKERelselocalhost:19092)TABLEtumbling-eventsCLEANUP_INTERVAL1.0WINDOW10# 10 seconds windowWINDOW_EXPIRES10PARTITIONS1appfaust.App(event-stream,brokerfkafka://{BROKER})app.conf.table_cleanup_intervalCLEANUP_INTERVAL sourceapp.topic(TOPIC,value_typeEvent)sinkapp.topic(SINK,value_typeUserStats)app.timer(interval3.0,on_leaderTrue)asyncdefgenerate_event_data():events_topicapp.topic(TOPIC,key_typestr,value_typeEvent)allowed_events[e.valueforeinAllowedEvents]allowed_actions[e.valueforeinAllowedActions]# Create a loop to send data to the Redpanda topic# Send 20 messages every time the timer is triggered (every 5 seconds)foriinrange(20):# Send the data to the Redpanda topicawaitevents_topic.send(keyrandom.choice([User1,User2,User3,User4,User5]),valueEvent(event_typerandom.choice([page_view,scroll]),user_idrandom.choice([User1,User2,User3,User4,User5]),# noqa: E501actionrandom.choice([action_1,action_2]),timestampdatetime.now().strftime(%Y-%m-%d %H:%M:%S.%f),),)print(Producer is sleeping for 3 seconds )defwindow_processor(key,events):try:timestampkey[1][0]# key[1] is the tuple (ts, ts window)print(fkey::{key})users[event.user_idforeventinevents]countlen(users)user_counterCounter(event.user_idforeventinevents)fori,(user,count)inenumerate(user_counter.items()):print(f{i}.{user}:{count})aggregated_eventUserStats(timestamptimestamp,user_iduser,countcount)print(fProcessing window:{len(users)}events, Aggreged results:{aggregated_event}# noqa: E501)sink.send_soon(valueaggregated_event)exceptExceptionase:print(e)tumbling_table(app.Table(TABLE,defaultlist,key_typestr,value_typeEvent,partitionsPARTITIONS,on_window_closewindow_processor,).tumbling(WINDOW,expirestimedelta(secondsWINDOW_EXPIRES)).relative_to_field(Event.timestamp))app.agent(app.topic(TOPIC,key_typestr,value_typeEvent))asyncdefcalculate_tumbling_events(events):asyncforeventinevents:value_listtumbling_table[events].value()value_list.append(event)tumbling_table[events]value_listif__name____main__:app.main()现在在命令行中输入以下内容以启动我们的流模块faust-A streamer.event_stream worker-l info因此我们应该看到事件聚合在实时发生https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8290d2b708ebbd08872a83c59ea85480.png作者图片选择合适的 Python 客户端一些流行的 Kafka Python 客户端包括confluent-kafka、pykafka和kafka-python。confluent-kafka是高性能librdkafkaC 库的 Python 包装器提供了完整的 Kafka 功能支持以及额外的 AVRO 序列化和模式注册表集成。它包括高级和低级 API以提供更大的灵活性和对 Kafka 应用程序的控制。大数据文件格式解释pykafka和kafka-python客户端是最广泛使用的它们的设计考虑了性能和简单性。它们支持所有 Kafka 功能并包括平衡消费者、Zookeeper 集成和分区管理等功能。每个 Python 库都有其自身的优缺点。使用简单性对于初学者来说可能很重要。这主要涉及文档、代码可读性、API 设计和错误处理。Pykafka通常被认为是简单的具有清晰的 API、良好的文档和用户友好的错误消息。相反kafka-python和confluent-kafka-python则不那么简单具有更复杂的 API、更不全面的文档和不太清晰的错误消息。另一方面confluent-kafka-python和kafka-python提供了最全面的Kafka 功能支持涵盖了所有 Kafka 功能并提供了高级和低级 API。相比之下pykafka的功能支持有限仅提供高级 API。一般来说confluent-kafka-python由于其基于优化的librdkafkaC 库而提供最佳性能而kafka-python和pykafka由于是纯 Python 实现且开销更大性能较低。您可以使用 pip 安装kafka-pythonpip install kafka-python我们的应用程序文件将如下所示# app.pyfromkafkaimportKafkaProducer,KafkaConsumerimportjson producerKafkaProducer(bootstrap_servers[brokerA:9092,brokerB:9092],key_serializerlambdak:json.dumps(k).encode(),value_serializerlambdav:json.dumps(v).encode())# Send data to Kafka topic my_topicfutureproducer.send(my_topic,keyhello,valueworld)try:resultfuture.get(timeout10)print(result)exceptExceptionase:print(e)要从主题中消费数据我们希望使用poll方法。它将返回包含我们的 JSON 记录的主题分区列表importjson consumerKafkaConsumer(bootstrap_servers[brokerA:9092,brokerB:9092],group_idmy-group,topics[my_topic],key_deserializerlambdak:json.loads(k.decode()),value_deserializerlambdav:json.loads(v.decode()),auto_offset_resetearliest,enable_auto_commitTrue,max_poll_records10)forrecordinconsumer:print(record.key,record.value)Kafka 最佳实践那么我们如何提高我们的 Kafka 应用程序的可靠性和性能优化 Kafka 应用程序没有通用的解决方案我们 Kafka 应用程序的性能和可靠性可能受到各种因素的影响包括以下方面配置和生产者与消费者的数量消息大小和频率网络带宽和延迟硬件和软件资源可能的故障场景。通常以下是一般性的容错和性能设计建议使用复制复制在多个代理之间创建多个数据副本确保在故障期间的可访问性并通过负载均衡提高性能。然而它可能导致磁盘使用量和网络流量增加。根据您的可用性和一致性要求设置复制因子和同步副本ISR的数量。启用批处理批处理将多个消息组合成一个请求减少网络开销并提高吞吐量。它还提高了压缩效率。在生产和消费端都启用批处理调整批大小和 linger 时间以满足您的延迟和吞吐量目标。明智地分区每个主题的分区数量会影响可伸缩性、并行性和性能。更多的分区允许更大的生产者和消费者容量有效地在代理之间分配负载。然而过多的分区会增加网络流量。因此根据您预期的吞吐量和延迟选择分区数量。启用压缩压缩可以减少消息大小。就是这样简单。它可以减少磁盘空间使用量和数据传输。虽然它会产生一些处理压缩的 CPU 开销但选择您的压缩算法和级别也被认为是最佳实践。错误处理Python Kafka 客户端提供了一系列方法和选项来管理错误和异常包括错误处理策略策略涉及在错误发生时重试、忽略或快速失败。数据工程师可以确定如何处理不同的错误类型同时确保可靠性和数据一致性。例如可以使用max_in_flight_requests_per_connection、retries和retry_backoff_ms等参数来配置kafka-python生产者。在confluent-kafka-python消费者中我们可以使用enable.auto.commit、auto.commit.interval.ms和enable.auto.offset.store等参数。异常处理可以使用try、except和finally块来处理各种异常类型并执行清理或恢复程序。这是标准的 Python 异常处理。例如在kafka-python和confluent-kafka-python客户端中有一个KafkaError类。在pykafka客户端中我们有PyKafkaException类。错误回调当我们需要记录错误、引发异常或实现恢复操作时回调函数非常有用。回调函数可以被分配给生产者和消费者的构造函数在客户端级别发生错误或异常时触发。例如error_cb参数在kafka-python和confluent-kafka-python客户端中可用而on_error参数在pykafka客户端中使用。例如最常见的一种错误处理模式是死信队列。来自源主题的事件可以分成两条独立的路径应用程序成功处理源主题中的每个事件并在一切正常的情况下将它们发布到目标主题一个汇点。无法处理的事件例如缺少预期格式或缺少所需属性的事件将被定向到错误主题。另一个有用的错误处理模式是实现重试队列。例如如果此时项目的参数不可用事件可以被路由到重试主题。例如如果该功能正在由另一个服务处理并且请求时不可用这种情况可能会发生。这种可恢复的状态不应被归类为错误相反它应该定期重试直到满足所需条件。引入重试主题允许立即处理大多数事件同时将某些事件的处理推迟到满足必要条件。结论流式处理技术如滚动和跳跃窗口、会话等与 Python 结合是一种多功能的解决方案。像 Kafka 这样的工具帮助数据工程师创建能够支持实时分析、消息传递和事件驱动系统的流式应用程序。例如Kafka 的关键优势包括高吞吐量和可伸缩性使其在其用途上极为强大**。** Kafka 通过批量写入和读取降低磁盘 I/O 和网络开销并利用数据压缩以低延迟处理每秒数百万条消息。它可以通过添加代理进行水平扩展通过增加分区和副本进行垂直扩展。它支持消费者组以有效地分配工作负载。Kafka 通过领导者-跟随者模型确保数据可用性即使在故障期间跟随者也会复制数据。如果领导者失败跟随者将接管Kafka 的提交日志允许进行持久和可重放的数据存储。选择正确的 Python 库将是首要任务。如果您最关心的是性能指标如吞吐量、延迟、CPU 使用率和内存消耗那么请选择confluent-kafka-python。如果您在寻找简单性那么pykafka或kafka-python将是更好的解决方案。适当的错误处理以及以最大化延迟、吞吐量和可用性的方式设计我们的流式应用程序也是需要考虑的另一件事。我希望遵循这个故事中的提示能帮助您正确配置 Kafka 集群并启用最佳设置以进行复制、分区、批处理和压缩。数据工程中的流式处理推荐阅读[1]towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3[2]towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd[3]towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603[4]towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603[5]medium.com/towards-data-science/big-data-file-formats-explained-275876dc1fc9

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询