2026/5/18 23:45:21
网站建设
项目流程
dw网页设计制作网站的成品,锐旗网站建设,国产做网站,烟台网站建设托管问题概述
在运行异步数据库测试时#xff0c;我们遇到了一个典型的 “Event loop is closed” 或 “got Future attached to a different loop” 错误。具体表现为#xff1a;
第一个异步测试用例成功通过第二个及后续异步测试用例立即失败#xff0c;抛出 RuntimeError: Ta…问题概述在运行异步数据库测试时我们遇到了一个典型的 “Event loop is closed” 或 “got Future attached to a different loop” 错误。具体表现为第一个异步测试用例成功通过第二个及后续异步测试用例立即失败抛出RuntimeError: Task got Future attached to a different loop错误发生在数据库连接尝试建立时具体在asyncpg协议层错误堆栈分析RuntimeError:TaskTask pending nameTask-3corotest_create_and_get_conversations()running at/workspace/test/dao/test_conversation_dao.py:39cb[_run_until_complete_cb()at/usr/lib/python3.12/asyncio/base_events.py:182]got FutureFuture pending cb[Protocol._on_waiter_completed()]attached to a different loop asyncpg/protocol/protocol.pyx:374:RuntimeError错误堆栈显示问题发生在asyncpg协议层表明数据库连接尝试使用不同的事件循环。技术背景1. Pytest-Asyncio 的事件循环管理pytest-asyncio是 Pytest 的异步测试插件它管理异步测试的事件循环。关键配置选项asyncio_mode: 控制异步测试的模式auto/strictasyncio_default_fixture_loop_scope: 控制事件循环的作用域function/session2. SQLAlchemy 异步引擎缓存在我们的代码中数据库引擎使用lru_cache装饰器进行缓存# src/configs/db.pyfromfunctoolsimportlru_cachelru_cache()defget_async_engine(): Returns a cached async engine instance. The engine is created on the first call and reused on subsequent calls within the same event loop. logger.info(Creating new async engine instance.)returncreate_async_engine(DATABASE_URL,pool_pre_pingTrue,echoFalse,)3. AsyncSessionFactory 的模块级初始化AsyncSessionFactory在模块导入时就被创建# Create a sessionmaker for creating AsyncSession instances# The bind is deferred until the engine is created.AsyncSessionFactoryasync_sessionmaker(bindget_async_engine(),expire_on_commitFalse,)根本原因分析问题发生序列测试 1 开始Pytest 创建Loop Aget_async_engine()被调用创建Engine 1并绑定到Loop AAsyncSessionFactory绑定到Engine 1测试 1 结束Pytest 可能关闭Loop A取决于配置测试 2 开始Pytest 创建Loop B测试代码尝试使用数据库连接由于get_async_engine()有缓存返回Engine 1Engine 1仍然绑定在已关闭的Loop A上当 SQLAlchemy 尝试使用Engine 1在Loop B中连接数据库时失败关键冲突点缓存与事件循环生命周期的冲突lru_cache导致引擎在第一次创建后被缓存缓存的引擎绑定到创建时的事件循环当新测试使用新事件循环时缓存的引擎仍然绑定到旧循环模块级初始化与测试隔离的冲突AsyncSessionFactory在模块导入时初始化即使清除引擎缓存AsyncSessionFactory仍然引用旧的引擎实例Pytest 配置与应用程序设计的冲突默认的asyncio_default_fixture_loop_scope session意味着所有测试共享同一个事件循环但某些情况下pytest-asyncio 可能为每个测试创建新的事件循环排查过程第一步分析错误模式首先确认错误是否可重现pytest test/dao/test_conversation_dao.py::test_create_and_get_conversations -v错误确实发生且堆栈指向数据库连接层。第二步检查相关配置检查 pytest.ini[pytest] addopts -s pythonpath . asyncio_mode auto asyncio_default_fixture_loop_scope session检查 conftest.pypytest.fixture(scopesession)defevent_loop(): Creates an instance of the default event loop for the session. This fixes the Event loop is closed error in asyncpg during tests. try:loopasyncio.get_running_loop()exceptRuntimeError:loopasyncio.new_event_loop()yieldloop loop.close()第三步分析数据库配置检查src/configs/db.py发现关键问题get_async_engine使用lru_cacheAsyncSessionFactory在模块级别创建第四步验证假设创建简单测试验证事件循环问题# test/test_simple_async.pyimportpytestimportasynciopytest.mark.asyncioasyncdeftest_simple_async():awaitasyncio.sleep(0.1)assertTruepytest.mark.asyncioasyncdeftest_another_simple_async():awaitasyncio.sleep(0.1)assertTrue简单测试通过确认问题特定于数据库操作。解决方案方案一清除引擎缓存初步尝试在测试 fixture 中清除get_async_engine缓存# test/services/test_chat_service_e2e.pypytest.fixtureasyncdefdb_session():Fixture to provide a real database session.# Clear the cache to ensure we get a new engine bound to the current event loopget_async_engine.cache_clear()session_generatorget_db_session()sessionawaitanext(session_generator)try:yieldsessionfinally:awaitsession.close()结果部分缓解但问题仍然存在因为AsyncSessionFactory仍然引用旧的引擎。方案二直接创建引擎改进方案绕过get_async_engine和AsyncSessionFactory直接创建引擎# test/services/test_chat_service_e2e.pypytest.fixtureasyncdefdb_session():Fixture to provide a real database session.# Clear the cache to ensure we get a new engine bound to the current event loopget_async_engine.cache_clear()# Create a new engine bound to the current event loopenginecreate_async_engine(DATABASE_URL,pool_pre_pingTrue,echoFalse)# Create a new session factory with the new engineAsyncSessionFactoryasync_sessionmaker(bindengine,expire_on_commitFalse,)asyncwithAsyncSessionFactory()assession:try:yieldsessionfinally:awaitsession.close()# Dispose the engine after useawaitengine.dispose()方案三调整 Pytest 配置关键修复修改pytest.ini改变事件循环的作用域[pytest] addopts -s pythonpath . asyncio_mode auto asyncio_default_fixture_loop_scope function # 从 session 改为 function方案四全局缓存管理最终方案在conftest.py中添加全局缓存管理 fixture# test/conftest.pypytest.fixture(autouseTrue)asyncdefreset_db_cache(): Fixture to reset the database engine cache before each test. This fixes the Event loop is closed error when running multiple async tests. # Clear the cache of get_async_engineimportsrc.configs.dbifhasattr(src.configs.db.get_async_engine,cache_clear):src.configs.db.get_async_engine.cache_clear()# Also try to reinitialize the AsyncSessionFactory# by forcing a re-import of the moduleimportlib.reload(src.configs.db)yield完整修复代码1. pytest.ini 修改[pytest] addopts -s pythonpath . asyncio_mode auto asyncio_default_fixture_loop_scope function # 关键修改2. conftest.py 更新importpytestimportasyncioimportsysimportimportlibfromloguruimportloggerpytest.fixture(scopesession)defevent_loop(): Creates an instance of the default event loop for the session. This fixture is required by pytest-asyncio to avoid event loop issues. # Try to get the running loop firsttry:loopasyncio.get_running_loop()exceptRuntimeError:loopasyncio.new_event_loop()yieldloop# Dont close the loop here - let pytest-asyncio handle it# Closing the loop can cause Event loop is closed errorspytest.fixture(scopesession,autouseTrue)defconfigure_logging(): Configures Loguru logger for pytest execution. logger.remove()logger.add(sys.stdout,levelDEBUG)pytest.fixture(autouseTrue)asyncdefreset_db_cache(): Fixture to reset the database engine cache before each test. This fixes the Event loop is closed error when running multiple async tests. # Clear the cache of get_async_engineimportsrc.configs.dbifhasattr(src.configs.db.get_async_engine,cache_clear):src.configs.db.get_async_engine.cache_clear()# Also try to reinitialize the AsyncSessionFactory# by forcing a re-import of the moduleimportlib.reload(src.configs.db)yield3. 测试文件优化# test/services/test_chat_service_e2e.pypytest.fixtureasyncdefdb_session():Fixture to provide a real database session.# Clear the cache to ensure we get a new engine bound to the current event loopget_async_engine.cache_clear()# Create a new engine bound to the current event loopenginecreate_async_engine(DATABASE_URL,pool_pre_pingTrue,echoFalse)# Create a new session factory with the new engineAsyncSessionFactoryasync_sessionmaker(bindengine,expire_on_commitFalse,)asyncwithAsyncSessionFactory()assession:try:yieldsessionfinally:awaitsession.close()# Dispose the engine after useawaitengine.dispose()修复原理总结1. 事件循环隔离将asyncio_default_fixture_loop_scope从session改为function确保每个测试函数获得独立的事件循环避免测试间的事件循环污染符合测试隔离的最佳实践2. 缓存一致性通过reset_db_cachefixture 确保每个测试前清除引擎缓存重新加载数据库模块刷新AsyncSessionFactory保证引擎绑定到当前测试的事件循环3. 资源管理在测试 fixture 中显式创建和销毁引擎确保引擎与当前事件循环匹配避免资源泄漏测试验证修复后运行测试验证# 验证数据库DAO测试pytest test/dao/test_conversation_dao.py::test_create_and_get_conversations -v# 验证端到端聊天服务测试pytest test/services/test_chat_service_e2e.py::test_stream_chat_response_gemini_e2e -v pytest test/services/test_chat_service_e2e.py::test_stream_chat_response_deepseek_e2e -v所有测试均通过不再出现事件循环错误。经验教训1. 异步测试设计原则避免在模块级别初始化有状态的对象确保资源生命周期与测试生命周期匹配使用适当的测试隔离策略2. SQLAlchemy 最佳实践谨慎使用缓存特别是在测试环境中考虑测试专用的数据库配置确保引擎和会话的事件循环一致性3. Pytest 配置管理理解pytest-asyncio的事件循环管理策略根据测试需求调整作用域配置使用 fixture 进行资源管理和清理真正原因深度分析核心问题事件循环绑定与缓存的生命周期不匹配问题的本质是资源生命周期管理的冲突SQLAlchemy 引擎的缓存策略lru_cache()defget_async_engine():# 引擎创建时绑定到当前事件循环returncreate_async_engine(DATABASE_URL)lru_cache导致引擎在第一次调用后被缓存引擎在创建时绑定到创建时的事件循环缓存机制假设事件循环在整个应用生命周期中保持不变Pytest-Asyncio 的测试执行模型默认配置 (asyncio_default_fixture_loop_scope session)所有测试共享同一个事件循环但实际行为可能因版本、配置或测试结构而变化某些情况下pytest-asyncio 可能为每个测试创建新的事件循环冲突发生时机测试1开始 → 创建Loop A → 创建Engine 1 → 绑定到Loop A → 测试1结束 测试2开始 → 创建Loop B → 获取缓存的Engine 1 → Engine 1仍绑定到Loop A → 冲突根本原因总结缓存的对象引擎与其依赖的资源事件循环具有不同的生命周期引擎被缓存生命周期跨越多个测试事件循环可能为每个测试重新创建生命周期仅限于单个测试当缓存的引擎尝试在错误的事件循环中操作时发生RuntimeError: got Future attached to a different loop最佳实践指南1. 异步测试配置最佳实践✅ 推荐配置# pytest.ini [pytest] asyncio_mode auto asyncio_default_fixture_loop_scope function # 关键每个测试独立的事件循环❌ 避免的配置asyncio_default_fixture_loop_scope session # 可能导致事件循环冲突 配置说明function作用域每个测试函数获得独立的事件循环提供最好的隔离性session作用域所有测试共享事件循环性能更好但容易出问题对于数据库测试优先选择function作用域以确保稳定性2. SQLAlchemy 异步引擎管理最佳实践✅ 推荐模式测试专用引擎# 在测试fixture中创建专用引擎pytest.fixtureasyncdefdb_session():# 清除可能存在的缓存get_async_engine.cache_clear()# 创建新的引擎确保绑定到当前事件循环enginecreate_async_engine(DATABASE_URL,pool_pre_pingTrue)# 创建会话工厂AsyncSessionFactoryasync_sessionmaker(bindengine,expire_on_commitFalse,)asyncwithAsyncSessionFactory()assession:yieldsession# 测试结束后清理awaitengine.dispose()❌ 避免的模式直接使用缓存的引擎# 不要这样使用sessionAsyncSessionFactory()# 可能使用绑定到旧事件循环的引擎 引擎管理原则测试隔离每个测试应该使用独立的引擎实例生命周期管理引擎的创建和销毁应该与测试的生命周期匹配事件循环一致性确保引擎绑定到当前测试的事件循环3. 测试Fixture设计最佳实践✅ 推荐显式资源管理pytest.fixtureasyncdefdb_session():提供数据库会话确保事件循环一致性# 1. 清理旧状态get_async_engine.cache_clear()# 2. 创建新资源enginecreate_async_engine(DATABASE_URL)# 3. 使用资源asyncwithasync_sessionmaker(bindengine)()assession:yieldsession# 4. 清理资源awaitengine.dispose()✅ 推荐使用上下文管理器pytest.fixtureasyncdefdb_engine():提供数据库引擎自动清理enginecreate_async_engine(DATABASE_URL)try:yieldenginefinally:awaitengine.dispose()pytest.fixtureasyncdefdb_session(db_engine):基于引擎创建会话asyncwithasync_sessionmaker(binddb_engine)()assession:yieldsession4. 生产代码与测试代码的分离✅ 推荐为测试环境提供专用配置# src/configs/db.pydefget_async_engine(use_cache:boolTrue):获取异步引擎支持禁用缓存用于测试ifnotuse_cache:returncreate_async_engine(DATABASE_URL)lru_cache()def_cached_engine():returncreate_async_engine(DATABASE_URL)return_cached_engine()# 测试代码pytest.fixtureasyncdefdb_session():# 使用无缓存的引擎engineget_async_engine(use_cacheFalse)# ...✅ 推荐环境感知的缓存策略# src/configs/db.pyimportosdefget_async_engine():根据环境决定是否使用缓存# 测试环境禁用缓存ifos.getenv(PYTEST_CURRENT_TEST):returncreate_async_engine(DATABASE_URL)# 生产环境使用缓存lru_cache()def_cached_engine():returncreate_async_engine(DATABASE_URL)return_cached_engine()5. 调试与诊断最佳实践✅ 推荐添加诊断日志pytest.fixture(autouseTrue)asyncdefevent_loop_debug():诊断事件循环问题importasyncio loopasyncio.get_event_loop()print(fTest using event loop:{id(loop)})yield✅ 推荐验证事件循环一致性defassert_event_loop_consistency(engine):验证引擎绑定的事件循环与当前事件循环一致importasyncio current_loopasyncio.get_event_loop()# 检查引擎是否绑定到当前循环# 具体实现取决于SQLAlchemy版本经验教训总结1. 异步资源管理的黄金法则“谁创建谁管理谁清理”创建资源的代码应该负责管理其生命周期测试代码应该创建自己专用的资源避免跨测试共享有状态的资源2. 缓存使用的注意事项缓存适合无状态或状态不变的对象避免缓存绑定到特定上下文如事件循环的对象在测试环境中考虑禁用或限制缓存3. 测试隔离的重要性每个测试应该尽可能独立避免测试间的隐式依赖使用适当的fixture作用域function class module session4. 配置明确化显式配置优于隐式行为文档化配置的假设和影响为不同环境提供适当的默认配置结论数据库会话事件循环问题的根本原因是缓存对象的生命周期与其依赖资源事件循环的生命周期不匹配。通过以下措施可以彻底解决和预防此类问题配置层面使用asyncio_default_fixture_loop_scope function确保测试隔离代码层面在测试中创建专用的数据库引擎避免使用缓存的引擎架构层面分离生产代码和测试代码的资源管理策略遵循这些最佳实践可以构建稳定、可靠的异步测试套件避免事件循环相关的难以调试的问题。核心洞察在异步编程中资源的生命周期管理比同步编程更加关键。必须确保每个异步资源如数据库引擎与其执行上下文事件循环的生命周期保持一致。