广源建设集团网站成都服务器租赁
2026/2/20 20:20:12 网站建设 项目流程
广源建设集团网站,成都服务器租赁,网站互动功能,网站维护更新费用引言#xff1a;学术数据采集的重要性和挑战在当今的数字化研究时代#xff0c;学术论文摘要作为科研成果的核心浓缩#xff0c;对研究人员、学术机构和企业研发部门具有不可估量的价值。传统的手动收集方式效率低下#xff0c;无法满足大规模数据需求#xff0c;而动态网…引言学术数据采集的重要性和挑战在当今的数字化研究时代学术论文摘要作为科研成果的核心浓缩对研究人员、学术机构和企业研发部门具有不可估量的价值。传统的手动收集方式效率低下无法满足大规模数据需求而动态网页技术和反爬机制的普及使得数据采集变得愈发复杂。本文将深入探讨如何构建一个高效、稳定且智能化的学术论文摘要采集系统采用最新的Python爬虫技术栈。技术架构演进从传统爬虫到现代化解决方案1. 技术选型对比传统方案Requests BeautifulSoup适合静态页面动态渲染挑战越来越多的学术网站采用JavaScript动态加载现代化方案Playwright Asyncio Scrapy框架完美应对复杂场景2. 核心技术组件Playwright微软开源的自动化测试工具支持多浏览器、无头模式AsyncioPython原生异步I/O框架实现高并发采集Scrapy框架成熟的爬虫框架提供完整的爬虫生命周期管理Redis分布式任务队列和去重存储MongoDB非结构化数据存储适应多变的论文数据结构完整代码实现智能学术论文摘要采集系统环境配置与依赖安装python# requirements.txt playwright1.40.0 scrapy2.11.0 scrapy-playwright0.0.33 asyncio3.4.3 aiohttp3.9.1 redis5.0.1 pymongo4.6.0 pydantic2.5.0 selenium4.16.0 beautifulsoup44.12.2 pandas2.1.4 numpy1.26.2 lxml4.9.3 fake-useragent1.4.0 rate-limiter1.0.01. 基于Playwright的智能爬虫核心类pythonimport asyncio import logging from typing import List, Dict, Optional, Any from dataclasses import dataclass from datetime import datetime from urllib.parse import urlparse, urljoin, urlencode import hashlib from playwright.async_api import async_playwright, Page, BrowserContext from pydantic import BaseModel, Field, validator from fake_useragent import UserAgent import aiohttp import redis from pymongo import MongoClient # 数据模型定义 class AcademicPaper(BaseModel): 学术论文数据模型 paper_id: str Field(..., description论文唯一标识) title: str Field(..., description论文标题) abstract: str Field(..., description论文摘要) authors: List[str] Field(default_factorylist, description作者列表) keywords: List[str] Field(default_factorylist, description关键词) publication_date: Optional[str] Field(None, description发表日期) journal: Optional[str] Field(None, description期刊名称) doi: Optional[str] Field(None, descriptionDOI标识) url: str Field(..., description原文链接) citations: Optional[int] Field(None, description引用次数) download_count: Optional[int] Field(None, description下载次数) crawl_timestamp: datetime Field(default_factorydatetime.now, description爬取时间) validator(abstract) def validate_abstract_length(cls, v): if len(v) 50: raise ValueError(Abstract too short) return v.strip() def generate_id(self) - str: 生成论文唯一ID content f{self.title}{self.doi if self.doi else self.url} return hashlib.md5(content.encode()).hexdigest() # 异步浏览器管理器 class AsyncBrowserManager: 管理Playwright浏览器实例的异步上下文管理器 def __init__(self, headless: bool True, proxy: Optional[str] None): self.headless headless self.proxy proxy self.playwright None self.browser None self.context None async def __aenter__(self): self.playwright await async_playwright().start() launch_options { headless: self.headless, proxy: {server: self.proxy} if self.proxy else None, args: [ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox, --disable-setuid-sandbox, --disable-web-security, --disable-featuresIsolateOrigins,site-per-process, ] } self.browser await self.playwright.chromium.launch(**launch_options) # 创建上下文模拟真实浏览器 self.context await self.browser.new_context( viewport{width: 1920, height: 1080}, user_agentUserAgent().random, localeen-US, timezone_idAmerica/New_York, permissions[geolocation], ignore_https_errorsTrue ) # 添加Stealth.js防检测 await self.context.add_init_script( // 覆盖webdriver属性 Object.defineProperty(navigator, webdriver, { get: () undefined }); // 覆盖chrome属性 window.chrome { runtime: {}, loadTimes: function() {}, csi: function() {}, app: {} }; // 覆盖plugins属性 Object.defineProperty(navigator, plugins, { get: () [1, 2, 3, 4, 5] }); // 覆盖languages属性 Object.defineProperty(navigator, languages, { get: () [en-US, en] }); ) return self.context async def __aexit__(self, exc_type, exc_val, exc_tb): if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() # 智能页面解析器 class AcademicPageParser: 智能解析学术论文页面的解析器 staticmethod async def parse_abstract_from_page(page: Page, url: str) - Optional[AcademicPaper]: 从页面解析论文摘要信息 try: # 等待页面关键元素加载 await page.wait_for_load_state(networkidle) await page.wait_for_timeout(2000) # 额外等待动态内容 # 尝试多种选择器定位摘要 selectors [ div.abstract, .abstract-text, section#abstract, [class*abstract], div.article-abstract, meta[namedescription], meta[propertyog:description] ] abstract None for selector in selectors: try: element await page.query_selector(selector) if element: abstract await element.text_content() if abstract and len(abstract) 50: break except: continue # 获取论文标题 title_selectors [h1.title, h1.article-title, title, meta[propertyog:title]] title None for selector in title_selectors: try: element await page.query_selector(selector) if element: title await element.text_content() if selector ! meta[propertyog:title] \ else await element.get_attribute(content) if title: break except: continue # 获取作者信息 authors [] author_selectors [.authors, .author-list, meta[namecitation_authors]] for selector in author_selectors: try: elements await page.query_selector_all(selector) if elements: for elem in elements: author_text await elem.text_content() if author_text: authors.extend([a.strip() for a in author_text.split(,)]) if authors: break except: continue # 获取关键词 keywords [] keyword_selectors [.keywords, meta[namekeywords], meta[namecitation_keywords]] for selector in keyword_selectors: try: element await page.query_selector(selector) if element: keyword_text await element.text_content() if not selector.startswith(meta) \ else await element.get_attribute(content) if keyword_text: keywords [k.strip() for k in keyword_text.split(,)] break except: continue if abstract and title: paper AcademicPaper( paper_idhashlib.md5(f{title}{url}.encode()).hexdigest(), titletitle.strip(), abstractabstract.strip(), authors[a for a in authors if a], keywordskeywords, urlurl ) # 提取额外元数据 await AcademicPageParser.extract_metadata(page, paper) return paper except Exception as e: logging.error(f解析页面 {url} 时出错: {str(e)}) return None staticmethod async def extract_metadata(page: Page, paper: AcademicPaper): 提取论文的元数据信息 try: # 提取DOI doi_selectors [meta[namecitation_doi], meta[namedoi], .doi] for selector in doi_selectors: element await page.query_selector(selector) if element: doi await element.get_attribute(content) if selector.startswith(meta) \ else await element.text_content() if doi: paper.doi doi.strip() break # 提取发表日期 date_selectors [meta[namecitation_publication_date], meta[namedate], .publication-date] for selector in date_selectors: element await page.query_selector(selector) if element: date_str await element.get_attribute(content) if selector.startswith(meta) \ else await element.text_content() if date_str: paper.publication_date date_str.strip() break # 提取期刊信息 journal_selectors [meta[namecitation_journal_title], .journal-title] for selector in journal_selectors: element await page.query_selector(selector) if element: journal await element.get_attribute(content) if selector.startswith(meta) \ else await element.text_content() if journal: paper.journal journal.strip() break except Exception as e: logging.warning(f提取元数据失败: {str(e)}) # 分布式任务队列 class RedisTaskQueue: 基于Redis的分布式任务队列 def __init__(self, redis_url: str redis://localhost:6379): self.redis_client redis.from_url(redis_url, decode_responsesTrue) self.queue_key academic:crawler:queue self.visited_key academic:crawler:visited self.failed_key academic:crawler:failed async def add_url(self, url: str, priority: int 0): 添加URL到任务队列 score time.time() - priority * 1000 # 优先级越高分数越小 await self.redis_client.zadd(self.queue_key, {url: score}) async def get_next_url(self) - Optional[str]: 获取下一个要爬取的URL # 使用原子操作获取并移除最高优先级URL script local result redis.call(ZRANGE, KEYS[1], 0, 0) if result[1] then redis.call(ZREM, KEYS[1], result[1]) return result[1] end return nil return await self.redis_client.eval(script, 1, self.queue_key) async def mark_visited(self, url: str): 标记URL为已访问 await self.redis_client.sadd(self.visited_key, url) async def is_visited(self, url: str) - bool: 检查URL是否已访问 return await self.redis_client.sismember(self.visited_key, url) async def mark_failed(self, url: str, reason: str): 标记失败的URL await self.redis_client.hset(self.failed_key, url, reason) # 主爬虫类 class AcademicPaperCrawler: 学术论文摘要爬虫主类 def __init__(self, config: Dict[str, Any]): self.config config self.task_queue RedisTaskQueue(config.get(redis_url)) self.mongo_client MongoClient(config.get(mongo_url)) self.db self.mongo_client[config.get(database, academic_papers)] self.collection self.db.papers # 统计信息 self.stats { total_crawled: 0, successful: 0, failed: 0, start_time: datetime.now() } # 配置浏览器 self.browser_config { headless: config.get(headless, True), proxy: config.get(proxy), timeout: config.get(timeout, 30000) } # 请求限流 self.rate_limiter asyncio.Semaphore(config.get(concurrent_limit, 5)) # 设置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(crawler.log), logging.StreamHandler() ] ) self.logger logging.getLogger(__name__) async def crawl_url(self, url: str) - Optional[AcademicPaper]: 爬取单个URL # 检查是否已访问 if await self.task_queue.is_visited(url): self.logger.info(fURL已访问: {url}) return None async with self.rate_limiter: async with AsyncBrowserManager( headlessself.browser_config[headless], proxyself.browser_config[proxy] ) as context: try: # 创建新页面 page await context.new_page() # 设置超时 page.set_default_timeout(self.browser_config[timeout]) # 导航到URL self.logger.info(f正在爬取: {url}) response await page.goto(url, wait_untilnetworkidle) if response.status 400: self.logger.error(fHTTP错误 {response.status}: {url}) await self.task_queue.mark_failed(url, fHTTP {response.status}) return None # 解析页面 parser AcademicPageParser() paper await parser.parse_abstract_from_page(page, url) if paper: # 保存到数据库 await self.save_paper(paper) await self.task_queue.mark_visited(url) self.stats[successful] 1 self.logger.info(f成功爬取: {paper.title}) return paper else: self.logger.warning(f无法解析页面内容: {url}) await self.task_queue.mark_failed(url, 解析失败) self.stats[failed] 1 await page.close() except Exception as e: self.logger.error(f爬取失败 {url}: {str(e)}) await self.task_queue.mark_failed(url, str(e)) self.stats[failed] 1 return None return None async def save_paper(self, paper: AcademicPaper): 保存论文到MongoDB try: # 检查是否已存在 existing await self.collection.find_one({paper_id: paper.paper_id}) if existing: # 更新现有记录 await self.collection.update_one( {paper_id: paper.paper_id}, {$set: paper.dict()} ) self.logger.debug(f更新论文: {paper.title}) else: # 插入新记录 await self.collection.insert_one(paper.dict()) self.logger.debug(f保存新论文: {paper.title}) except Exception as e: self.logger.error(f保存论文失败 {paper.title}: {str(e)}) async def discover_links(self, page: Page, base_url: str) - List[str]: 从页面发现新的论文链接 links [] try: # 查找可能的论文链接 link_selectors [ a[href*abstract], a[href*paper], a[href*article], a[href*doi.org], .article-title a, .title a ] for selector in link_selectors: elements await page.query_selector_all(selector) for elem in elements: href await elem.get_attribute(href) if href: full_url urljoin(base_url, href) # 过滤非学术链接 if self.is_academic_url(full_url): links.append(full_url) except Exception as e: self.logger.error(f发现链接失败: {str(e)}) return list(set(links)) # 去重 def is_academic_url(self, url: str) - bool: 判断是否为学术URL academic_domains [ arxiv.org, ieee.org, springer.com, sciencedirect.com, nature.com, science.org, wiley.com, tandfonline.com, aclweb.org, acm.org, researchgate.net, semanticscholar.org, arxiv-vanity.com, doi.org, pubmed.ncbi.nlm.nih.gov ] domain urlparse(url).netloc.lower() return any(academic_domain in domain for academic_domain in academic_domains) async def run(self, start_urls: List[str], max_pages: int 100): 运行爬虫主循环 # 添加起始URL到队列 for url in start_urls: await self.task_queue.add_url(url, priority10) self.logger.info(f爬虫启动目标{max_pages}个页面) while self.stats[total_crawled] max_pages: url await self.task_queue.get_next_url() if not url: self.logger.info(任务队列为空) break # 爬取页面 paper await self.crawl_url(url) self.stats[total_crawled] 1 # 进度显示 if self.stats[total_crawled] % 10 0: self.print_progress() # 发现新链接 if paper and self.stats[total_crawled] max_pages: async with AsyncBrowserManager() as context: page await context.new_page() try: await page.goto(url, wait_untilnetworkidle) new_links await self.discover_links(page, url) # 添加新链接到队列 for link in new_links[:5]: # 限制每个页面发现的数量 if not await self.task_queue.is_visited(link): await self.task_queue.add_url(link) except Exception as e: self.logger.error(f发现新链接失败: {str(e)}) finally: await page.close() # 避免请求过于频繁 await asyncio.sleep(self.config.get(delay, 1)) self.print_final_stats() def print_progress(self): 打印爬取进度 elapsed datetime.now() - self.stats[start_time] rate self.stats[total_crawled] / elapsed.total_seconds() * 60 print(f\n{*50}) print(f已爬取: {self.stats[total_crawled]} 个页面) print(f成功: {self.stats[successful]}, 失败: {self.stats[failed]}) print(f爬取速率: {rate:.2f} 页/分钟) print(f运行时间: {elapsed}) print(f{*50}) def print_final_stats(self): 打印最终统计信息 elapsed datetime.now() - self.stats[start_time] print(f\n{#*60}) print(f爬虫运行完成!) print(f{#*60}) print(f总运行时间: {elapsed}) print(f总爬取页面: {self.stats[total_crawled]}) print(f成功爬取: {self.stats[successful]}) print(f失败: {self.stats[failed]}) print(f成功率: {self.stats[successful]/self.stats[total_crawled]*100:.2f}%) print(f{#*60}) # Scrapy集成版本 import scrapy from scrapy_playwright.page import PageMethod from scrapy.http import Response, Request class AcademicSpider(scrapy.Spider): Scrapy版本的学术论文爬虫 name academic_papers custom_settings { DOWNLOAD_HANDLERS: { http: scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler, https: scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler, }, TWISTED_REACTOR: twisted.internet.asyncioreactor.AsyncioSelectorReactor, PLAYWRIGHT_BROWSER_TYPE: chromium, PLAYWRIGHT_LAUNCH_OPTIONS: { headless: True, args: [--disable-blink-featuresAutomationControlled] }, CONCURRENT_REQUESTS: 10, AUTOTHROTTLE_ENABLED: True, AUTOTHROTTLE_START_DELAY: 1, DOWNLOAD_DELAY: 2, ITEM_PIPELINES: { academic_pipeline.MongoDBPipeline: 300, } } def start_requests(self): 起始请求 urls [ https://arxiv.org/list/cs.AI/recent, https://dl.acm.org/, https://ieeexplore.ieee.org/Xplore/home.jsp ] for url in urls: yield scrapy.Request( url, callbackself.parse, meta{ playwright: True, playwright_page_methods: [ PageMethod(wait_for_selector, div.abstract), ], playwright_include_page: True } ) async def parse(self, response: Response): 解析页面 page response.meta[playwright_page] # 使用Playwright页面对象 papers await page.query_selector_all(.paper-item) for paper_elem in papers: # 提取论文信息 title await paper_elem.query_selector(.title) abstract await paper_elem.query_selector(.abstract) if title and abstract: item { title: await title.text_content(), abstract: await abstract.text_content(), url: response.url } yield item # 发现分页链接 next_page await page.query_selector(.next-page) if next_page: next_url await next_page.get_attribute(href) if next_url: yield scrapy.Request( urlresponse.urljoin(next_url), callbackself.parse, meta{ playwright: True, playwright_page: page, playwright_page_methods: [ PageMethod(wait_for_selector, div.abstract), ] } ) await page.close() # MongoDB数据管道 class MongoDBPipeline: MongoDB数据存储管道 def __init__(self, mongo_uri, mongo_db): self.mongo_uri mongo_uri self.mongo_db mongo_db classmethod def from_crawler(cls, crawler): return cls( mongo_uricrawler.settings.get(MONGO_URI), mongo_dbcrawler.settings.get(MONGO_DATABASE, academic) ) def open_spider(self, spider): self.client MongoClient(self.mongo_uri) self.db self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): self.db.papers.update_one( {title: item[title]}, {$set: dict(item)}, upsertTrue ) return item # 主程序入口 async def main(): 主程序入口 # 配置参数 config { redis_url: redis://localhost:6379/0, mongo_url: mongodb://localhost:27017/, database: academic_papers, headless: True, concurrent_limit: 5, delay: 1, timeout: 30000, proxy: None # 可配置代理: http://user:passproxy:port } # 起始URL列表 start_urls [ https://arxiv.org/list/cs.AI/recent, https://dl.acm.org/, https://scholar.google.com/, https://www.semanticscholar.org/, https://pubmed.ncbi.nlm.nih.gov/ ] # 创建爬虫实例 crawler AcademicPaperCrawler(config) try: # 运行爬虫 await crawler.run(start_urls, max_pages50) # 导出数据 await export_data_to_csv(config) except KeyboardInterrupt: print(\n用户中断正在保存进度...) except Exception as e: print(f爬虫运行失败: {str(e)}) finally: print(爬虫程序结束) async def export_data_to_csv(config): 导出数据到CSV文件 import pandas as pd client MongoClient(config[mongo_url]) db client[config[database]] collection db.papers # 查询所有数据 cursor collection.find({}) papers list(cursor) if papers: # 转换为DataFrame df pd.DataFrame(papers) # 移除MongoDB的_id字段 if _id in df.columns: df.drop(_id, axis1, inplaceTrue) # 保存到CSV timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename facademic_papers_{timestamp}.csv df.to_csv(filename, indexFalse, encodingutf-8-sig) print(f数据已导出到: {filename}) print(f总共导出 {len(df)} 条记录) client.close() if __name__ __main__: # 运行异步主程序 asyncio.run(main())系统优化与高级特性1. 反反爬虫策略pythonclass AntiAntiCrawler: 反反爬虫策略管理器 def __init__(self): self.user_agents UserAgent() self.proxy_pool [ http://proxy1:port, http://proxy2:port, # ... 代理池 ] def rotate_user_agent(self): 轮换User-Agent return self.user_agents.random def get_random_proxy(self): 获取随机代理 return random.choice(self.proxy_pool) if self.proxy_pool else None def human_like_mouse_movement(self, page: Page): 模拟人类鼠标移动 # 实现随机鼠标移动模式 pass def random_delay(self, min_seconds1, max_seconds3): 随机延迟 time.sleep(random.uniform(min_seconds, max_seconds))2. 分布式部署方案python# docker-compose.yml 配置 version: 3.8 services: redis: image: redis:alpine ports: - 6379:6379 mongodb: image: mongo:latest ports: - 27017:27017 volumes: - mongo_data:/data/db crawler: build: . depends_on: - redis - mongodb environment: - REDIS_URLredis://redis:6379/0 - MONGO_URLmongodb://mongodb:27017/ deploy: replicas: 3 # 启动3个爬虫实例

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询