怎样给网站做seo优化长链接转换成短链接工具
2026/2/11 22:23:53 网站建设 项目流程
怎样给网站做seo优化,长链接转换成短链接工具,湖北省城乡和住房建设厅网站,滕州建设局网站一、项目背景与意义基金净值历史数据是金融分析、投资决策和量化研究的重要基础。无论是进行基金业绩评估、风险分析#xff0c;还是构建投资组合#xff0c;获取准确、完整的净值历史数据都至关重要。然而#xff0c;许多金融数据平台对数据访问设置了各种限制#xff0c;…一、项目背景与意义基金净值历史数据是金融分析、投资决策和量化研究的重要基础。无论是进行基金业绩评估、风险分析还是构建投资组合获取准确、完整的净值历史数据都至关重要。然而许多金融数据平台对数据访问设置了各种限制传统的数据采集方法往往效率低下且不稳定。本文将介绍如何运用Python最新的异步爬虫技术高效、稳定地采集基金净值历史数据。二、技术选型与工具介绍2.1 核心技术栈aiohttp/asyncio: Python异步HTTP客户端库实现高并发数据采集Playwright: 微软推出的现代浏览器自动化工具支持异步操作Pandas: 数据处理与分析库用于数据清洗和存储SQLAlchemy: 数据库ORM框架支持多种数据库BeautifulSoup4: HTML解析库Redis: 分布式缓存用于请求去重和状态管理2.2 环境准备python# requirements.txt aiohttp3.8.5 asyncio3.4.3 playwright1.40.0 pandas2.1.4 sqlalchemy2.0.23 beautifulsoup44.12.2 redis5.0.1 aioredis2.0.1 lxml4.9.3 httpx0.25.2 pydantic2.5.0三、完整爬虫系统架构设计3.1 系统架构图text数据采集系统架构 用户接口层 → 调度层 → 采集层 → 解析层 → 存储层 → 监控层3.2 模块化设计python# 项目结构 fund_crawler/ ├── core/ │ ├── __init__.py │ ├── async_spider.py # 异步爬虫核心 │ ├── playwright_handler.py # 浏览器自动化处理 │ └── proxy_rotator.py # 代理IP管理 ├── parsers/ │ ├── fund_parser.py # 基金数据解析器 │ └── data_cleaner.py # 数据清洗器 ├── storage/ │ ├── database.py # 数据库操作 │ └── file_storage.py # 文件存储 ├── utils/ │ ├── logger.py # 日志配置 │ ├── rate_limiter.py # 速率限制 │ └── exception_handler.py # 异常处理 ├── config/ │ └── settings.py # 配置文件 └── main.py # 主程序四、完整代码实现4.1 配置模块python# config/settings.py import os from dataclasses import dataclass from typing import List, Optional from pydantic import BaseSettings class Settings(BaseSettings): # 爬虫配置 USER_AGENT: str Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 REQUEST_TIMEOUT: int 30 MAX_CONCURRENT_REQUESTS: int 100 RETRY_ATTEMPTS: int 3 # 目标网站配置 FUND_DATA_BASE_URL: str https://fund.eastmoney.com FUND_DETAIL_URL: str https://fund.eastmoney.com/f10/jjjz_{code}.html FUND_HISTORY_API: str https://api.fund.eastmoney.com/f10/lsjz # 数据库配置 DATABASE_URL: str postgresql://user:passwordlocalhost:5432/fund_data REDIS_URL: str redis://localhost:6379/0 # 存储路径 DATA_DIR: str ./data/fund_history LOG_DIR: str ./logs # 代理配置 USE_PROXY: bool False PROXY_POOL_URL: Optional[str] None class Config: env_file .env settings Settings()4.2 异步爬虫核心类python# core/async_spider.py import asyncio import aiohttp import httpx from typing import Dict, List, Optional, Any from dataclasses import dataclass import logging from datetime import datetime import json from urllib.parse import urlencode from config.settings import settings from utils.logger import setup_logger from utils.rate_limiter import RateLimiter from core.proxy_rotator import ProxyRotator logger setup_logger(__name__) dataclass class RequestConfig: 请求配置类 method: str GET headers: Optional[Dict] None params: Optional[Dict] None data: Optional[Dict] None json: Optional[Dict] None timeout: int settings.REQUEST_TIMEOUT retry_count: int settings.RETRY_ATTEMPTS class AsyncFundSpider: 异步基金数据爬虫 def __init__(self): self.session: Optional[aiohttp.ClientSession] None self.rate_limiter RateLimiter(max_requests50, time_window60) self.proxy_rotator ProxyRotator() if settings.USE_PROXY else None self.cookies {} async def __aenter__(self): await self.init_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_session(self): 初始化异步会话 timeout aiohttp.ClientTimeout(totalsettings.REQUEST_TIMEOUT) connector aiohttp.TCPConnector( limitsettings.MAX_CONCURRENT_REQUESTS, force_closeTrue, enable_cleanup_closedTrue ) headers { User-Agent: settings.USER_AGENT, Accept: application/json, text/plain, */*, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Referer: https://fund.eastmoney.com/ } self.session aiohttp.ClientSession( connectorconnector, timeouttimeout, headersheaders ) # 初始化cookies await self._init_cookies() async def _init_cookies(self): 初始化必要的cookies try: async with self.session.get(settings.FUND_DATA_BASE_URL) as response: if response.status 200: self.cookies {c.key: c.value for c in response.cookies.values()} except Exception as e: logger.warning(f初始化cookies失败: {e}) async def fetch_fund_list(self, page: int 1, page_size: int 100) - List[Dict]: 获取基金列表 url f{settings.FUND_DATA_BASE_URL}/DataFundList.aspx params { op: ph, dt: kf, ft: all, rs: , gs: 0, sc: zzf, st: desc, sd: datetime.now().strftime(%Y-%m-%d), ed: datetime.now().strftime(%Y-%m-%d), qdii: , tabSubtype: ,,,,,, pi: page, pn: page_size, dx: 1 } response await self.request( urlurl, configRequestConfig(paramsparams) ) if response: return self._parse_fund_list(response) return [] async def fetch_fund_history(self, fund_code: str, start_date: str 2000-01-01, end_date: str None) - List[Dict]: 获取基金历史净值数据 if not end_date: end_date datetime.now().strftime(%Y-%m-%d) # 构造API请求参数 params { fundCode: fund_code, pageIndex: 1, pageSize: 10000, # 一次性获取所有数据 startDate: start_date, endDate: end_date, _: int(datetime.now().timestamp() * 1000) } headers { Referer: fhttps://fundf10.eastmoney.com/jjjz_{fund_code}.html, Host: api.fund.eastmoney.com } response await self.request( urlsettings.FUND_HISTORY_API, configRequestConfig( paramsparams, headersheaders ) ) if response and Data in response and LSJZList in response[Data]: return response[Data][LSJZList] return [] async def request(self, url: str, config: RequestConfig) - Optional[Any]: 通用异步请求方法 await self.rate_limiter.acquire() for attempt in range(config.retry_count): try: proxy None if self.proxy_rotator: proxy await self.proxy_rotator.get_proxy() async with self.session.request( methodconfig.method, urlurl, headersconfig.headers, paramsconfig.params, dataconfig.data, jsonconfig.json, proxyproxy, cookiesself.cookies ) as response: if response.status 200: content_type response.headers.get(Content-Type, ) if application/json in content_type: data await response.json() elif text/html in content_type: text await response.text() data {html: text} else: data await response.read() logger.info(f请求成功: {url}) return data elif response.status in [403, 429]: logger.warning(f请求被限制: {response.status}) await asyncio.sleep(2 ** attempt) continue else: logger.error(f请求失败: {response.status}) except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.error(f请求异常 (尝试 {attempt 1}/{config.retry_count}): {e}) if attempt config.retry_count - 1: await asyncio.sleep(2 ** attempt) else: raise return None def _parse_fund_list(self, response: Dict) - List[Dict]: 解析基金列表数据 funds [] try: if datas in response: for fund_data in response[datas]: fund_info { fund_code: fund_data[0], fund_name: fund_data[1], fund_type: fund_data[3], nav: float(fund_data[4]) if fund_data[4] else None, acc_nav: float(fund_data[5]) if fund_data[5] else None, daily_return: float(fund_data[6].rstrip(%)) if fund_data[6] else None, update_date: fund_data[9] } funds.append(fund_info) except Exception as e: logger.error(f解析基金列表失败: {e}) return funds async def close(self): 关闭会话 if self.session and not self.session.closed: await self.session.close()4.3 Playwright处理复杂页面python# core/playwright_handler.py from playwright.async_api import async_playwright from typing import Optional, Dict, Any import asyncio import logging logger logging.getLogger(__name__) class PlaywrightHandler: 处理需要JavaScript渲染的复杂页面 def __init__(self, headless: bool True): self.headless headless self.browser None self.context None async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): 初始化浏览器 playwright await async_playwright().start() self.browser await playwright.chromium.launch( headlessself.headless, args[ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox ] ) # 创建上下文 self.context await self.browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ) # 添加stealth插件避免被检测 await self.context.add_init_script( Object.defineProperty(navigator, webdriver, { get: () undefined }); ) async def scrape_fund_detail(self, fund_code: str) - Dict[str, Any]: 获取基金详细信息 page None try: page await self.context.new_page() # 导航到基金详情页 url fhttps://fund.eastmoney.com/{fund_code}.html await page.goto(url, wait_untilnetworkidle) # 等待必要元素加载 await page.wait_for_selector(.dataOfFund, timeout10000) # 提取基本信息 fund_info { fund_code: fund_code, fund_name: await self._extract_text(page, .fundDetail-tit), nav: await self._extract_number(page, .dataItem1 .dataNums span), daily_change: await self._extract_text(page, .dataItem1 .dataNums .ui-font-middle), establishment_date: await self._extract_text(page, .infoOfFund table tr:nth-child(1) td:nth-child(2)), fund_size: await self._extract_text(page, .infoOfFund table tr:nth-child(1) td:nth-child(3)), fund_manager: await self._extract_text(page, .infoOfFund table tr:nth-child(2) td:nth-child(1)), fund_company: await self._extract_text(page, .infoOfFund table tr:nth-child(2) td:nth-child(3)) } # 提取历史净值表格 await page.click(#historyNav) await page.wait_for_selector(.gmHistory, timeout5000) # 获取表格数据 table_data await page.eval_on_selector_all( .gmHistory table tbody tr, rows rows.map(row Array.from(row.querySelectorAll(td)).map(cell cell.textContent.trim())) ) fund_info[history_data] table_data return fund_info except Exception as e: logger.error(fPlaywright抓取失败: {e}) return {} finally: if page: await page.close() async def _extract_text(self, page, selector: str) - str: 提取文本 try: element await page.wait_for_selector(selector, timeout5000) return await element.text_content() if element else except: return async def _extract_number(self, page, selector: str) - Optional[float]: 提取数字 try: text await self._extract_text(page, selector) if text: # 清理非数字字符 cleaned .join(c for c in text if c.isdigit() or c in .-) return float(cleaned) if cleaned else None except: return None async def close(self): 关闭浏览器 if self.context: await self.context.close() if self.browser: await self.browser.close()4.4 数据解析与清洗python# parsers/fund_parser.py import pandas as pd import numpy as np from typing import List, Dict, Any from datetime import datetime import logging logger logging.getLogger(__name__) class FundDataParser: 基金数据解析器 staticmethod def parse_history_data(raw_data: List[Dict]) - pd.DataFrame: 解析历史净值数据 records [] for item in raw_data: try: record { date: pd.to_datetime(item[FSRQ]), nav: float(item[DWJZ]) if item[DWJZ] else None, acc_nav: float(item[LJJZ]) if item[LJJZ] else None, daily_return: item[JZZZL], purchase_status: item[SGZT], redemption_status: item[SHZT], bonus: item[FHFCZ] if item[FHFCZ] else 0.0 } # 处理涨跌幅 if record[daily_return]: try: record[daily_return] float(record[daily_return].rstrip(%)) except: record[daily_return] None records.append(record) except Exception as e: logger.warning(f解析数据项失败: {item}, 错误: {e}) continue df pd.DataFrame(records) if not df.empty: # 按日期排序 df df.sort_values(date) # 计算累计收益 if nav in df.columns: df[cumulative_return] df[nav].pct_change().add(1).cumprod().sub(1) * 100 return df staticmethod def calculate_metrics(df: pd.DataFrame) - Dict[str, Any]: 计算基金指标 if df.empty: return {} metrics {} # 基础统计 metrics[data_points] len(df) metrics[date_range] { start: df[date].min().strftime(%Y-%m-%d), end: df[date].max().strftime(%Y-%m-%d) } # 收益率计算 if nav in df.columns: df[returns] df[nav].pct_change() # 年化收益率 days (df[date].max() - df[date].min()).days if days 0: total_return (df[nav].iloc[-1] / df[nav].iloc[0] - 1) * 100 metrics[total_return] total_return metrics[annualized_return] ((1 total_return/100) ** (365/days) - 1) * 100 # 波动率 if len(df[returns]) 1: daily_volatility df[returns].std() metrics[annualized_volatility] daily_volatility * np.sqrt(252) * 100 # 最大回撤 cumulative (1 df[returns]).cumprod() running_max cumulative.expanding().max() drawdown (cumulative - running_max) / running_max metrics[max_drawdown] drawdown.min() * 100 return metrics4.5 数据存储模块python# storage/database.py from sqlalchemy import create_engine, Column, String, Float, DateTime, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import pandas as pd from typing import List, Dict, Any import logging logger logging.getLogger(__name__) Base declarative_base() class FundHistory(Base): 基金历史净值表 __tablename__ fund_history id Column(String(50), primary_keyTrue) fund_code Column(String(10), indexTrue) date Column(DateTime, indexTrue) nav Column(Float) acc_nav Column(Float) daily_return Column(Float) purchase_status Column(String(10)) redemption_status Column(String(10)) bonus Column(Float) created_at Column(DateTime, defaultpd.Timestamp.now) updated_at Column(DateTime, defaultpd.Timestamp.now, onupdatepd.Timestamp.now) class FundInfo(Base): 基金信息表 __tablename__ fund_info fund_code Column(String(10), primary_keyTrue) fund_name Column(String(100)) fund_type Column(String(50)) establishment_date Column(DateTime) fund_size Column(Float) fund_manager Column(String(100)) fund_company Column(String(100)) created_at Column(DateTime, defaultpd.Timestamp.now) class DatabaseManager: 数据库管理器 def __init__(self, database_url: str): self.engine create_engine(database_url) self.Session sessionmaker(bindself.engine) def init_database(self): 初始化数据库 Base.metadata.create_all(self.engine) def save_fund_history(self, fund_code: str, history_data: pd.DataFrame): 保存基金历史数据 session self.Session() try: for _, row in history_data.iterrows(): record_id f{fund_code}_{row[date].strftime(%Y%m%d)} record FundHistory( idrecord_id, fund_codefund_code, daterow[date], navrow.get(nav), acc_navrow.get(acc_nav), daily_returnrow.get(daily_return), purchase_statusrow.get(purchase_status), redemption_statusrow.get(redemption_status), bonusrow.get(bonus, 0.0) ) # 使用upsert操作 session.merge(record) session.commit() logger.info(f已保存基金{fund_code}的{len(history_data)}条历史数据) except Exception as e: session.rollback() logger.error(f保存数据失败: {e}) raise finally: session.close() def save_fund_info(self, fund_info: Dict[str, Any]): 保存基金信息 session self.Session() try: record FundInfo( fund_codefund_info[fund_code], fund_namefund_info[fund_name], fund_typefund_info.get(fund_type), establishment_datefund_info.get(establishment_date), fund_sizefund_info.get(fund_size), fund_managerfund_info.get(fund_manager), fund_companyfund_info.get(fund_company) ) session.merge(record) session.commit() logger.info(f已保存基金{fund_info[fund_code]}的基本信息) except Exception as e: session.rollback() logger.error(f保存基金信息失败: {e}) raise finally: session.close()4.6 主程序与任务调度python# main.py import asyncio import logging from typing import List, Dict import pandas as pd from datetime import datetime, timedelta from config.settings import settings from core.async_spider import AsyncFundSpider from core.playwright_handler import PlaywrightHandler from parsers.fund_parser import FundDataParser from storage.database import DatabaseManager from utils.logger import setup_logger logger setup_logger(__name__) class FundDataCrawler: 基金数据爬虫主程序 def __init__(self): self.spider None self.playwright None self.db_manager DatabaseManager(settings.DATABASE_URL) self.parser FundDataParser() async def run(self, fund_codes: List[str] None): 运行爬虫 if not fund_codes: fund_codes await self.get_all_fund_codes() logger.info(f开始采集{len(fund_codes)}个基金的数据) async with AsyncFundSpider() as spider: self.spider spider tasks [] for fund_code in fund_codes: task asyncio.create_task( self.process_fund(fund_code) ) tasks.append(task) # 控制并发数量 if len(tasks) 10: await asyncio.gather(*tasks) tasks [] await asyncio.sleep(1) # 避免请求过于频繁 # 处理剩余任务 if tasks: await asyncio.gather(*tasks) logger.info(数据采集完成) async def get_all_fund_codes(self) - List[str]: 获取所有基金代码 logger.info(开始获取基金列表...) all_funds [] page 1 while True: try: funds await self.spider.fetch_fund_list(pagepage, page_size100) if not funds: break all_funds.extend([f[fund_code] for f in funds]) logger.info(f已获取第{page}页共{len(funds)}个基金) page 1 await asyncio.sleep(0.5) # 避免请求过快 except Exception as e: logger.error(f获取基金列表失败: {e}) break return list(set(all_funds)) # 去重 async def process_fund(self, fund_code: str): 处理单个基金的数据采集 try: logger.info(f开始处理基金: {fund_code}) # 获取历史净值数据 end_date datetime.now().strftime(%Y-%m-%d) start_date (datetime.now() - timedelta(days365*5)).strftime(%Y-%m-%d) raw_data await self.spider.fetch_fund_history( fund_codefund_code, start_datestart_date, end_dateend_date ) if not raw_data: logger.warning(f基金{fund_code}无历史数据) return # 解析数据 history_df self.parser.parse_history_data(raw_data) if history_df.empty: logger.warning(f基金{fund_code}数据解析为空) return # 计算指标 metrics self.parser.calculate_metrics(history_df) logger.info(f基金{fund_code}指标计算完成: {metrics}) # 保存到数据库 self.db_manager.save_fund_history(fund_code, history_df) # 获取基金详细信息使用Playwright async with PlaywrightHandler(headlessTrue) as playwright: fund_detail await playwright.scrape_fund_detail(fund_code) if fund_detail: self.db_manager.save_fund_info(fund_detail) # 保存为CSV文件 self.save_to_csv(fund_code, history_df) logger.info(f基金{fund_code}处理完成共{len(history_df)}条记录) except Exception as e: logger.error(f处理基金{fund_code}失败: {e}) def save_to_csv(self, fund_code: str, df: pd.DataFrame): 保存为CSV文件 import os # 创建目录 os.makedirs(settings.DATA_DIR, exist_okTrue) # 保存文件 filename f{settings.DATA_DIR}/fund_{fund_code}_{datetime.now().strftime(%Y%m%d)}.csv df.to_csv(filename, indexFalse, encodingutf-8-sig) logger.info(f数据已保存到CSV文件: {filename}) async def batch_update(self, days: int 7): 批量更新近期数据 logger.info(f开始更新最近{days}天的数据) # 从数据库获取需要更新的基金 session self.db_manager.Session() try: # 获取所有基金代码 from storage.database import FundInfo funds session.query(FundInfo.fund_code).all() fund_codes [f[0] for f in funds] logger.info(f需要更新{len(fund_codes)}个基金的数据) # 更新每个基金的近期数据 update_date (datetime.now() - timedelta(daysdays)).strftime(%Y-%m-%d) async with AsyncFundSpider() as spider: for fund_code in fund_codes: try: raw_data await spider.fetch_fund_history( fund_codefund_code, start_dateupdate_date ) if raw_data: history_df self.parser.parse_history_data(raw_data) self.db_manager.save_fund_history(fund_code, history_df) logger.info(f已更新基金{fund_code}的近期数据) await asyncio.sleep(0.1) # 控制请求频率 except Exception as e: logger.error(f更新基金{fund_code}失败: {e}) continue finally: session.close() logger.info(批量更新完成) async def main(): 主函数 crawler FundDataCrawler() # 初始化数据库 crawler.db_manager.init_database() # 运行爬虫可以指定特定基金代码 # await crawler.run([000001, 110011]) # 或获取所有基金数据 await crawler.run() # 或执行批量更新 # await crawler.batch_update(days30) if __name__ __main__: # 设置事件循环策略Windows需要 try: asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) except: pass # 运行主程序 asyncio.run(main())4.7 实用工具类python# utils/rate_limiter.py import asyncio import time from collections import deque from typing import Deque class RateLimiter: 速率限制器 def __init__(self, max_requests: int, time_window: int): self.max_requests max_requests self.time_window time_window self.requests: Deque[float] deque() self.lock asyncio.Lock() async def acquire(self): 获取请求许可 async with self.lock: now time.time() # 移除过期的请求记录 while self.requests and self.requests[0] now - self.time_window: self.requests.popleft() # 检查是否超过限制 if len(self.requests) self.max_requests: sleep_time self.requests[0] self.time_window - now if sleep_time 0: await asyncio.sleep(sleep_time) # 重新计算 return await self.acquire() # 添加当前请求 self.requests.append(now)五、高级功能与优化5.1 分布式爬虫支持python# core/distributed_spider.py import asyncio import pickle from typing import List, Optional import aioredis from config.settings import settings class DistributedSpider: 分布式爬虫协调器 def __init__(self): self.redis None self.queue_key fund:crawler:queue self.progress_key fund:crawler:progress async def connect_redis(self): 连接Redis self.redis await aioredis.from_url(settings.REDIS_URL) async def distribute_tasks(self, fund_codes: List[str]): 分发任务到队列 for code in fund_codes: task { fund_code: code, status: pending, created_at: time.time() } await self.redis.lpush(self.queue_key, pickle.dumps(task)) async def get_next_task(self) - Optional[dict]: 获取下一个任务 task_data await self.redis.rpop(self.queue_key) if task_data: return pickle.loads(task_data) return None async def update_progress(self, fund_code: str, status: str): 更新任务进度 await self.redis.hset(self.progress_key, fund_code, status)5.2 数据质量监控python# utils/data_monitor.py import pandas as pd from datetime import datetime, timedelta class DataQualityMonitor: 数据质量监控器 staticmethod def check_data_quality(df: pd.DataFrame) - dict: 检查数据质量 report { total_records: len(df), missing_values: {}, date_consistency: True, value_consistency: True } # 检查缺失值 for column in df.columns: missing_count df[column].isnull().sum() if missing_count 0: report[missing_values][column] missing_count # 检查日期连续性 if date in df.columns: dates pd.to_datetime(df[date]).sort_values() date_diff dates.diff().dt.days if (date_diff.iloc[1:] 5).any(): # 允许最多5天的间隔 report[date_consistency] False report[max_date_gap] int(date_diff.max()) # 检查净值合理性 if nav in df.columns: if (df[nav] 0).any() or (df[nav] 100).any(): report[value_consistency] False # 检查异常波动 returns df[nav].pct_change() if (returns.abs() 0.1).any(): # 单日涨跌幅超过10% report[has_extreme_returns] True return report六、部署与运行6.1 Docker部署配置dockerfile# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ unzip \ rm -rf /var/lib/apt/lists/* # 安装Chrome和Playwright RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update \ apt-get install -y google-chrome-stable \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD [python, main.py]6.2 定时任务配置python# scheduler.py import schedule import asyncio from datetime import datetime from main import FundDataCrawler async def scheduled_task(): 定时任务 crawler FundDataCrawler() await crawler.batch_update(days1) print(f[{datetime.now()}] 每日更新完成) def run_scheduler(): 运行调度器 # 每日凌晨2点执行 schedule.every().day.at(02:00).do( lambda: asyncio.run(scheduled_task()) ) # 每小时检查一次 schedule.every().hour.do( lambda: print(f[{datetime.now()}] 调度器运行中...) ) while True: schedule.run_pending() time.sleep(60) if __name__ __main__: run_scheduler()七、注意事项与最佳实践7.1 合规性注意事项遵守robots.txt: 检查目标网站的robots.txt文件控制请求频率: 避免对服务器造成过大压力尊重版权: 仅用于个人学习研究商业用途需获得授权数据使用限制: 遵守网站的数据使用条款7.2 性能优化建议使用连接池: 复用HTTP连接减少开销实施缓存策略: 缓存已获取的数据错误重试机制: 实现指数退避重试分布式采集: 对于大量数据考虑分布式部署7.3 反爬虫应对策略轮换User-Agent: 定期更换请求头使用代理IP池: 避免IP被封禁模拟人类行为: 添加随机延迟和滚动操作处理验证码: 集成验证码识别服务八、总结本文详细介绍了如何使用Python最新的异步爬虫技术采集基金净值历史数据。通过结合aiohttp、Playwright等现代工具我们构建了一个高效、稳定的数据采集系统。该系统具有以下特点高性能: 基于异步IO支持高并发数据采集鲁棒性: 完善的错误处理和重试机制可扩展性: 模块化设计便于功能扩展数据质量: 包含数据清洗和质量检查易部署: 支持容器化部署和定时任务

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询