2026/2/21 12:46:49
网站建设
项目流程
访问国外网站dns,wordpress超简洁自适应html5博客主题:read,百度移动版,做视频点播网站一、概述#xff1a;行业数据报告采集的重要性与挑战在当今数据驱动的商业环境中#xff0c;行业数据报告对于市场分析、竞品研究、战略规划具有至关重要的作用。然而#xff0c;这些宝贵的数据往往分散在各个网站、平台和PDF文档中#xff0c;手动收集耗时耗力。本文将介绍…一、概述行业数据报告采集的重要性与挑战在当今数据驱动的商业环境中行业数据报告对于市场分析、竞品研究、战略规划具有至关重要的作用。然而这些宝贵的数据往往分散在各个网站、平台和PDF文档中手动收集耗时耗力。本文将介绍如何使用最新的Python爬虫技术构建一个高效、稳定的行业数据报告采集系统。本文将重点介绍使用Playwright进行现代化网页数据采集异步编程提升采集效率智能解析PDF报告内容数据清洗与存储方案反爬虫策略与伦理考虑二、技术栈选择为什么选择这些最新技术2.1 Playwright vs Selenium vs RequestsPlaywright是微软开发的现代化浏览器自动化工具相比传统的Selenium和Requests具有以下优势支持所有现代浏览器Chromium、Firefox、WebKit自动等待元素加载减少等待时间更强大的选择器和事件处理内置截图、录屏功能更好的TypeScript/Python支持2.2 异步编程asyncio使用异步IO可以同时处理多个网页请求显著提升采集效率特别是在需要处理大量页面时。2.3 其他关键技术Pandas数据处理与分析PyPDF2 / pdfplumberPDF内容提取MongoDB / PostgreSQL数据存储FastAPI构建数据API服务三、完整爬虫系统架构设计python 行业数据报告采集系统 - 完整实现 支持网页爬取、PDF解析、数据清洗、持久化存储 import asyncio import re import json import pandas as pd from datetime import datetime from typing import List, Dict, Optional, Any from dataclasses import dataclass, asdict import logging from urllib.parse import urljoin, urlparse import hashlib # PDF处理相关 import pdfplumber from PyPDF2 import PdfReader # 数据库相关 from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 异步爬虫核心 from playwright.async_api import async_playwright, Browser, Page, Response import aiohttp from aiohttp import ClientSession, TCPConnector import aiofiles # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(industry_data_crawler.log), logging.StreamHandler() ] ) logger logging.getLogger(__name__) # 数据模型定义 Base declarative_base() class IndustryReport(Base): 行业报告数据模型 __tablename__ industry_reports id Column(String(64), primary_keyTrue) title Column(String(500), nullableFalse) source_url Column(String(1000), nullableFalse) publish_date Column(DateTime) industry_category Column(String(200)) data_source Column(String(200)) file_url Column(String(1000)) file_type Column(String(50)) # pdf, doc, html等 content_summary Column(Text) full_content Column(Text) keywords Column(JSON) metadata Column(JSON) created_at Column(DateTime, defaultdatetime.now) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) def __repr__(self): return fIndustryReport(title{self.title[:50]}..., source{self.data_source}) dataclass class ReportData: 报告数据结构类 title: str source_url: str publish_date: Optional[datetime] industry_category: str data_source: str file_url: Optional[str] file_type: str content_summary: str full_content: str keywords: List[str] metadata: Dict[str, Any] def generate_id(self): 生成唯一ID content f{self.source_url}{self.title}{self.publish_date} return hashlib.sha256(content.encode()).hexdigest() def to_dict(self): 转换为字典 data asdict(self) data[id] self.generate_id() data[publish_date] self.publish_date.isoformat() if self.publish_date else None return data class IndustryDataCrawler: 行业数据报告采集器主类 def __init__(self, config_path: str config.json): 初始化采集器 Args: config_path: 配置文件路径 self.config self._load_config(config_path) self.db_engine None self.db_session None self._init_database() self.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, ] def _load_config(self, config_path: str) - Dict: 加载配置文件 default_config { database: { url: sqlite:///industry_reports.db, echo: False }, crawler: { max_concurrent: 5, request_timeout: 30, retry_times: 3, delay_range: [1, 3] }, target_sites: { 艾瑞咨询: https://www.iresearch.com.cn, 易观分析: https://www.analysys.cn, 艾媒网: https://www.iimedia.cn, 199IT: http://www.199it.com, 数据局: https://www.shujuju.cn }, proxies: [], # 代理配置 keywords: [行业报告, 白皮书, 市场分析, 趋势报告] } try: with open(config_path, r, encodingutf-8) as f: user_config json.load(f) # 合并配置 for key in default_config: if key in user_config: if isinstance(default_config[key], dict) and isinstance(user_config[key], dict): default_config[key].update(user_config[key]) else: default_config[key] user_config[key] except FileNotFoundError: logger.warning(f配置文件 {config_path} 不存在使用默认配置) return default_config def _init_database(self): 初始化数据库连接 db_url self.config[database][url] self.db_engine create_engine( db_url, echoself.config[database][echo] ) Base.metadata.create_all(self.db_engine) Session sessionmaker(bindself.db_engine) self.db_session Session() logger.info(f数据库初始化完成: {db_url}) async def crawl_site(self, site_name: str, site_url: str): 爬取指定网站的报告 Args: site_name: 网站名称 site_url: 网站URL logger.info(f开始爬取网站: {site_name} ({site_url})) async with async_playwright() as p: # 启动浏览器 browser await p.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) # 创建上下文 context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentself.user_agents[0] ) # 创建页面 page await context.new_page() try: # 访问网站 await page.goto(site_url, timeout60000) # 根据不同网站采取不同的爬取策略 if iresearch in site_url: reports await self._crawl_iresearch(page, site_url) elif analysys in site_url: reports await self._crawl_analysys(page, site_url) elif iimedia in site_url: reports await self._crawl_iimedia(page, site_url) else: reports await self._crawl_general_site(page, site_url) # 处理获取到的报告 for report_info in reports: try: report_data await self._process_report(report_info) if report_data: self._save_to_database(report_data) logger.info(f成功保存报告: {report_data.title}) except Exception as e: logger.error(f处理报告失败: {report_info.get(url, 未知)}, 错误: {str(e)}) logger.info(f网站 {site_name} 爬取完成共获取 {len(reports)} 个报告) except Exception as e: logger.error(f爬取网站 {site_name} 失败: {str(e)}) finally: await browser.close() async def _crawl_iresearch(self, page: Page, base_url: str) - List[Dict]: 爬取艾瑞咨询网站 reports [] try: # 查找报告链接 report_links await page.query_selector_all( a[href*report], a:has-text(报告), a:has-text(白皮书) ) for link in report_links[:20]: # 限制数量 href await link.get_attribute(href) title await link.text_content() or await link.get_attribute(title) or if href and any(kw in title for kw in self.config[keywords]): full_url urljoin(base_url, href) reports.append({ url: full_url, title: title.strip(), source: 艾瑞咨询 }) # 尝试搜索报告列表页 await page.goto(f{base_url}/report.shtml, timeout30000) # 等待内容加载 await page.wait_for_selector(.report-list, .list-content, timeout10000) # 提取更多报告 items await page.query_selector_all(.report-item, .list-item) for item in items[:30]: try: link await item.query_selector(a) if link: href await link.get_attribute(href) title await link.text_content() or if href and any(kw in title for kw in self.config[keywords]): full_url urljoin(base_url, href) # 提取日期 date_elem await item.query_selector(.date, .time) date_str await date_elem.text_content() if date_elem else reports.append({ url: full_url, title: title.strip(), date: self._parse_date(date_str.strip()), source: 艾瑞咨询 }) except Exception as e: logger.debug(f提取报告项失败: {str(e)}) continue except Exception as e: logger.error(f爬取艾瑞咨询失败: {str(e)}) return reports async def _crawl_analysys(self, page: Page, base_url: str) - List[Dict]: 爬取易观分析网站 reports [] try: # 导航到报告页面 await page.goto(f{base_url}/report, timeout30000) await page.wait_for_load_state(networkidle) # 提取报告信息 report_elements await page.query_selector_all(.report-card, .article-item) for elem in report_elements[:25]: try: # 提取标题和链接 title_elem await elem.query_selector(h3, h4, .title) link_elem await elem.query_selector(a) if not link_elem: continue href await link_elem.get_attribute(href) title await title_elem.text_content() if title_elem else await link_elem.text_content() if href and title: full_url urljoin(base_url, href) # 提取日期和摘要 date_elem await elem.query_selector(.date, .time, .publish-date) date_str await date_elem.text_content() if date_elem else summary_elem await elem.query_selector(.summary, .description, .abstract) summary await summary_elem.text_content() if summary_elem else reports.append({ url: full_url, title: title.strip(), date: self._parse_date(date_str.strip()), summary: summary.strip(), source: 易观分析 }) except Exception as e: logger.debug(f提取报告元素失败: {str(e)}) continue except Exception as e: logger.error(f爬取易观分析失败: {str(e)}) return reports async def _crawl_iimedia(self, page: Page, base_url: str) - List[Dict]: 爬取艾媒网 reports [] try: # 直接访问报告列表页 await page.goto(f{base_url}/report, timeout30000) # 使用更通用的选择器 await page.wait_for_selector(a[href*report]:visible, timeout10000) # 查找所有包含报告的链接 links await page.query_selector_all( a[href*report]:visible, a:has-text(报告):visible, a:has-text(Research):visible ) seen_urls set() for link in links: try: href await link.get_attribute(href) if not href or href in seen_urls: continue title await link.text_content() or await link.get_attribute(title) or # 过滤非报告链接 if not any(kw in title for kw in self.config[keywords]): continue full_url urljoin(base_url, href) seen_urls.add(href) reports.append({ url: full_url, title: title.strip()[:200], source: 艾媒网 }) except Exception as e: continue except Exception as e: logger.error(f爬取艾媒网失败: {str(e)}) return reports async def _crawl_general_site(self, page: Page, base_url: str) - List[Dict]: 通用网站爬取方法 reports [] try: # 搜索报告相关页面 search_selectors [ a[href*report], a[href*white-paper], a[href*research], a:has-text(报告), a:has-text(白皮书), a:has-text(研究) ] for selector in search_selectors: try: links await page.query_selector_all(selector) for link in links: try: href await link.get_attribute(href) title await link.text_content() or if href and title: full_url urljoin(base_url, href) # 检查是否已存在 if not any(r[url] full_url for r in reports): reports.append({ url: full_url, title: title.strip()[:300], source: urlparse(base_url).netloc }) except: continue except: continue # 去重 unique_reports [] seen_urls set() for report in reports: if report[url] not in seen_urls: seen_urls.add(report[url]) unique_reports.append(report) except Exception as e: logger.error(f通用爬取失败: {str(e)}) return unique_reports[:50] # 限制数量 async def _process_report(self, report_info: Dict) - Optional[ReportData]: 处理单个报告提取详细信息 Args: report_info: 报告基本信息 Returns: ReportData对象或None try: url report_info[url] # 判断是否是PDF文件 if url.lower().endswith(.pdf): return await self._process_pdf_report(url, report_info) else: return await self._process_html_report(url, report_info) except Exception as e: logger.error(f处理报告失败 {report_info.get(url, 未知)}: {str(e)}) return None async def _process_pdf_report(self, pdf_url: str, report_info: Dict) - Optional[ReportData]: 处理PDF报告 try: logger.info(f处理PDF报告: {pdf_url}) # 下载PDF文件 pdf_content await self._download_file(pdf_url) if not pdf_content: return None # 解析PDF内容 pdf_info await self._parse_pdf_content(pdf_content, pdf_url) # 构建报告数据 report_data ReportData( titlereport_info.get(title) or pdf_info.get(title, 未知标题), source_urlpdf_url, publish_datereport_info.get(date) or pdf_info.get(date), industry_categoryself._categorize_industry(report_info.get(title, )), data_sourcereport_info.get(source, 未知来源), file_urlpdf_url, file_typepdf, content_summarypdf_info.get(summary, )[:500], full_contentpdf_info.get(full_text, )[:10000], # 限制长度 keywordsself._extract_keywords(pdf_info.get(full_text, )), metadata{ page_count: pdf_info.get(page_count, 0), file_size: len(pdf_content), extracted_time: datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f处理PDF报告失败 {pdf_url}: {str(e)}) return None async def _process_html_report(self, url: str, report_info: Dict) - Optional[ReportData]: 处理HTML报告页面 try: async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) context await browser.new_context( user_agentself.user_agents[1] ) page await context.new_page() await page.goto(url, timeout60000) await page.wait_for_load_state(networkidle) # 提取页面内容 title await self._extract_page_title(page) content await self._extract_main_content(page) date await self._extract_publish_date(page) # 查找PDF下载链接 pdf_links await page.query_selector_all(a[href$.pdf]) pdf_url None if pdf_links: pdf_href await pdf_links[0].get_attribute(href) pdf_url urljoin(url, pdf_href) if pdf_href else None await browser.close() # 构建报告数据 report_data ReportData( titletitle or report_info.get(title, 未知标题), source_urlurl, publish_datedate or report_info.get(date), industry_categoryself._categorize_industry(title or ), data_sourcereport_info.get(source, 未知来源), file_urlpdf_url, file_typehtml if not pdf_url else pdf, content_summaryself._generate_summary(content)[:500], full_contentcontent[:15000], # 限制长度 keywordsself._extract_keywords(content), metadata{ has_pdf: bool(pdf_url), word_count: len(content), extracted_time: datetime.now().isoformat() } ) return report_data except Exception as e: logger.error(f处理HTML报告失败 {url}: {str(e)}) return None async def _download_file(self, url: str) - Optional[bytes]: 下载文件 try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout30) as response: if response.status 200: return await response.read() else: logger.warning(f下载文件失败 {url}: HTTP {response.status}) return None except Exception as e: logger.error(f下载文件失败 {url}: {str(e)}) return None async def _parse_pdf_content(self, pdf_content: bytes, pdf_url: str) - Dict: 解析PDF内容 pdf_info { title: , date: None, summary: , full_text: , page_count: 0 } try: # 使用pdfplumber提取文本 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: pdf_info[page_count] len(pdf.pages) # 提取所有页面文本 full_text for i, page in enumerate(pdf.pages[:50]): # 限制前50页 text page.extract_text() if text: full_text text \n pdf_info[full_text] full_text # 尝试从第一页提取标题 if pdf.pages: first_page_text pdf.pages[0].extract_text() or lines first_page_text.split(\n) if lines: pdf_info[title] lines[0][:200] # 生成摘要 if full_text: pdf_info[summary] self._generate_summary(full_text[:5000]) except Exception as e: logger.error(f解析PDF失败 {pdf_url}: {str(e)}) # 回退到PyPDF2 try: reader PdfReader(io.BytesIO(pdf_content)) pdf_info[page_count] len(reader.pages) full_text for page in reader.pages[:10]: text page.extract_text() if text: full_text text pdf_info[full_text] full_text except Exception as e2: logger.error(fPyPDF2解析也失败 {pdf_url}: {str(e2)}) return pdf_info async def _extract_page_title(self, page: Page) - str: 提取页面标题 try: # 尝试多种选择器 selectors [ h1, .title, .article-title, .report-title, title ] for selector in selectors: try: element await page.query_selector(selector) if element: title await element.text_content() if title and len(title) 10: return title.strip() except: continue # 回退到页面标题 return (await page.title())[:200] except: return async def _extract_main_content(self, page: Page) - str: 提取主要内容 try: # 尝试常见的内容选择器 content_selectors [ .article-content, .report-content, .content, .main-content, article, .details, .body ] for selector in content_selectors: try: element await page.query_selector(selector) if element: text await element.text_content() if text and len(text) 200: return self._clean_text(text) except: continue # 回退到body提取 body await page.query_selector(body) if body: text await body.text_content() return self._clean_text(text)[:10000] return except Exception as e: logger.error(f提取内容失败: {str(e)}) return async def _extract_publish_date(self, page: Page) - Optional[datetime]: 提取发布日期 try: date_selectors [ .publish-date, .date, .time, [itempropdatePublished], meta[propertyarticle:published_time], meta[namepublish_date] ] for selector in date_selectors: try: if selector.startswith(meta): element await page.query_selector(selector) if element: date_str await element.get_attribute(content) if date_str: return self._parse_date(date_str) else: element await page.query_selector(selector) if element: date_text await element.text_content() if date_text: return self._parse_date(date_text.strip()) except: continue return None except: return None def _parse_date(self, date_str: str) - Optional[datetime]: 解析日期字符串 if not date_str: return None # 常见日期格式 date_patterns [ r(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[日]?, r(\d{4})\.(\d{1,2})\.(\d{1,2}), r(\d{1,2})[-/](\d{1,2})[-/](\d{4}), ] for pattern in date_patterns: match re.search(pattern, date_str) if match: try: groups match.groups() if len(groups) 3: # 统一转换为YYYY-MM-DD格式 if len(groups[0]) 4: # YYYY-MM-DD year, month, day groups else: # DD-MM-YYYY day, month, year groups return datetime(int(year), int(month), int(day)) except: continue return None def _categorize_industry(self, text: str) - str: 行业分类 industry_keywords { 互联网: [互联网, 电商, 社交, 游戏, 在线教育, 短视频, 直播], 金融: [金融, 银行, 保险, 证券, 支付, 区块链, 数字货币], 科技: [科技, 人工智能, AI, 大数据, 云计算, 5G, 物联网], 消费: [消费, 零售, 餐饮, 食品, 饮料, 美妆, 服装], 医疗: [医疗, 医药, 健康, 医院, 疫苗, 生物], 汽车: [汽车, 新能源, 自动驾驶, 电动车, 造车], 房地产: [房地产, 房产, 楼市, 房价, 住宅], 教育: [教育, 培训, 学校, 在线教育, K12], 旅游: [旅游, 酒店, 航空, 景区, 出行] } text_lower text.lower() for industry, keywords in industry_keywords.items(): for keyword in keywords: if keyword.lower() in text_lower: return industry return 其他 def _extract_keywords(self, text: str, top_n: int 10) - List[str]: 提取关键词 if not text: return [] # 简单的关键词提取实际项目建议使用jieba等库 words re.findall(r[\u4e00-\u9fa5]{2,6}, text) # 过滤停用词 stop_words {的, 了, 在, 是, 和, 与, 及, 或, 等, 有, 这个, 这些} filtered_words [w for w in words if w not in stop_words] # 词频统计 from collections import Counter word_counts Counter(filtered_words) return [word for word, count in word_counts.most_common(top_n)] def _generate_summary(self, text: str, max_length: int 500) - str: 生成摘要 if not text: return # 简单的摘要生成实际项目建议使用文本摘要算法 sentences re.split(r[。!?], text) # 取前几个句子作为摘要 summary for sentence in sentences: if len(summary) len(sentence) max_length: summary sentence 。 else: break return summary.strip() or text[:max_length] def _clean_text(self, text: str) - str: 清洗文本 if not text: return # 去除多余空白字符 text re.sub(r\s, , text) # 去除特殊字符 text re.sub(r[^\w\u4e00-\u9fa5\s.,!?;:。、()【】\[\]《》], , text) return text.strip() def _save_to_database(self, report_data: ReportData): 保存到数据库 try: # 检查是否已存在 existing self.db_session.query(IndustryReport).filter_by( idreport_data.generate_id() ).first() if existing: # 更新现有记录 existing.title report_data.title existing.content_summary report_data.content_summary existing.full_content report_data.full_content existing.updated_at datetime.now() logger.info(f更新报告: {report_data.title}) else: # 创建新记录 report_dict report_data.to_dict() db_report IndustryReport(**report_dict) self.db_session.add(db_report) logger.info(f新增报告: {report_data.title}) self.db_session.commit() except Exception as e: logger.error(f保存到数据库失败: {str(e)}) self.db_session.rollback() async def run(self): 运行爬虫 logger.info(开始行业数据报告采集任务) start_time datetime.now() sites self.config[target_sites] # 创建异步任务 tasks [] for site_name, site_url in sites.items(): task asyncio.create_task(self.crawl_site(site_name, site_url)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks, return_exceptionsTrue) # 统计结果 total_reports self.db_session.query(IndustryReport).count() end_time datetime.now() duration (end_time - start_time).total_seconds() logger.info(f采集任务完成!) logger.info(f总耗时: {duration:.2f}秒) logger.info(f数据库中共有报告: {total_reports}份) # 生成统计报告 self._generate_statistics_report() def _generate_statistics_report(self): 生成统计报告 try: # 查询统计数据 reports self.db_session.query(IndustryReport).all() if not reports: logger.warning(没有找到报告数据) return # 创建DataFrame data [] for report in reports: data.append({ 标题: report.title, 来源: report.data_source, 行业分类: report.industry_category, 发布日期: report.publish_date.strftime(%Y-%m-%d) if report.publish_date else 未知, 文件类型: report.file_type, 关键词: , .join(report.keywords) if report.keywords else }) df pd.DataFrame(data) # 保存为Excel excel_path findustry_reports_{datetime.now().strftime(%Y%m%d_%H%M%S)}.xlsx df.to_excel(excel_path, indexFalse) # 生成统计信息 stats { 报告总数: len(df), 来源分布: df[来源].value_counts().to_dict(), 行业分布: df[行业分类].value_counts().to_dict(), 文件类型分布: df[文件类型].value_counts().to_dict() } # 保存统计信息 stats_path fcrawler_stats_{datetime.now().strftime(%Y%m%d_%H%M%S)}.json with open(stats_path, w, encodingutf-8) as f: json.dump(stats, f, ensure_asciiFalse, indent2) logger.info(f统计报告已保存: {excel_path}) logger.info(f统计信息已保存: {stats_path}) # 打印摘要 print(\n *50) print(行业数据报告采集统计摘要) print(*50) print(f报告总数: {stats[报告总数]}) print(\n来源分布:) for source, count in stats[来源分布].items(): print(f {source}: {count}) print(\n行业分布:) for industry, count in stats[行业分布].items(): print(f {industry}: {count}) except Exception as e: logger.error(f生成统计报告失败: {str(e)}) def export_to_csv(self, output_path: str industry_reports.csv): 导出数据到CSV try: reports self.db_session.query(IndustryReport).all() data [] for report in reports: data.append({ id: report.id, title: report.title, source: report.data_source, url: report.source_url, publish_date: report.publish_date.isoformat() if report.publish_date else , industry: report.industry_category, file_type: report.file_type, summary: report.content_summary, keywords: json.dumps(report.keywords, ensure_asciiFalse) if report.keywords else , created_at: report.created_at.isoformat() }) df pd.DataFrame(data) df.to_csv(output_path, indexFalse, encodingutf-8-sig) logger.info(f数据已导出到: {output_path}) return df except Exception as e: logger.error(f导出CSV失败: {str(e)}) return None # 使用示例 async def main(): 主函数 # 创建爬虫实例 crawler IndustryDataCrawler() # 运行爬虫 await crawler.run() # 导出数据 crawler.export_to_csv() # 关闭数据库连接 if crawler.db_session: crawler.db_session.close() print(\n采集任务已完成) # 异步运行 if __name__ __main__: import io # 用于pdfplumber的BytesIO # 运行主函数 asyncio.run(main())