2026/2/17 19:22:15
网站建设
项目流程
dede怎么设置wap网站,网站排名怎么优化,wordpress网站文章排版插件,爱南宁app引言
在当今互联网时代#xff0c;招聘信息的及时获取和整合对于求职者、招聘方以及人力资源研究者都具有重要意义。传统的单一招聘平台已无法满足多样化的信息需求#xff0c;因此构建一个招聘信息聚合爬虫系统显得尤为必要。本文将深入探讨如何使用Python最新技术栈构建一…引言在当今互联网时代招聘信息的及时获取和整合对于求职者、招聘方以及人力资源研究者都具有重要意义。传统的单一招聘平台已无法满足多样化的信息需求因此构建一个招聘信息聚合爬虫系统显得尤为必要。本文将深入探讨如何使用Python最新技术栈构建一个高效、稳定、可扩展的招聘信息聚合爬虫系统。技术选型与架构设计核心技术栈Playwright微软推出的新一代浏览器自动化工具支持多浏览器比Selenium更快速稳定AsyncioPython原生异步IO框架实现高并发数据采集FastAPI现代、快速的Web框架用于构建API接口MongoDBNoSQL数据库适合存储非结构化的招聘数据Redis缓存和数据去重Docker容器化部署系统架构text数据采集层 → 数据处理层 → 数据存储层 → API服务层 ↑ ↑ ↑ ↑ Playwright PySpark MongoDB FastAPI Asyncio Pandas Redis Uvicorn项目实现1. 环境配置与依赖安装python# requirements.txt playwright1.40.0 asyncio3.4.3 aiohttp3.9.1 fastapi0.104.1 uvicorn[standard]0.24.0 pymongo4.5.0 redis5.0.1 pandas2.1.3 pydantic2.5.0 beautifulsoup44.12.2 lxml4.9.3 celery5.3.4 docker6.1.32. 核心爬虫类实现pythonimport asyncio import aiohttp from typing import List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime import json from urllib.parse import urljoin, urlparse import hashlib from contextlib import asynccontextmanager from playwright.async_api import async_playwright, Page, Browser from pydantic import BaseModel, Field from motor.motor_asyncio import AsyncIOMotorClient import redis.asyncio as redis import logging # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) logger logging.getLogger(__name__) # 数据模型定义 class JobPosition(BaseModel): 职位数据模型 id: str Field(default_factorylambda: hashlib.md5().hexdigest()) title: str company: str location: str salary: Optional[str] None experience: Optional[str] None education: Optional[str] None job_type: Optional[str] None description: str requirements: List[str] Field(default_factorylist) benefits: List[str] Field(default_factorylist) source: str # 来源网站 source_url: str published_date: datetime crawl_time: datetime Field(default_factorydatetime.now) tags: List[str] Field(default_factorylist) class Config: json_encoders { datetime: lambda dt: dt.isoformat() } dataclass class CrawlerConfig: 爬虫配置类 user_agent: str Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 timeout: int 30000 headless: bool True max_concurrent: int 5 retry_count: int 3 proxy: Optional[str] None class AsyncJobCrawler: 异步招聘信息爬虫 def __init__(self, config: CrawlerConfig None): self.config config or CrawlerConfig() self.session None self.browser None self.context None self.redis_client None self.mongo_client None self.db None async def init_resources(self): 初始化资源 # 初始化Redis连接 self.redis_client await redis.Redis( hostlocalhost, port6379, db0, decode_responsesTrue ) # 初始化MongoDB连接 self.mongo_client AsyncIOMotorClient(mongodb://localhost:27017) self.db self.mongo_client.job_aggregator # 初始化Playwright self.playwright await async_playwright().start() self.browser await self.playwright.chromium.launch( headlessself.config.headless, args[--disable-blink-featuresAutomationControlled] ) # 创建浏览器上下文 self.context await self.browser.new_context( user_agentself.config.user_agent, viewport{width: 1920, height: 1080} ) async def close_resources(self): 关闭资源 if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() if self.redis_client: await self.redis_client.close() asynccontextmanager async def get_page(self): 获取页面上下文管理器 page await self.context.new_page() try: yield page finally: await page.close() async def crawl_boss_zhipin(self, keyword: str, city: str 北京) - List[JobPosition]: 爬取BOSS直聘 positions [] base_url fhttps://www.zhipin.com/web/geek/job async with self.get_page() as page: # 设置请求拦截避免被检测 await page.route(**/*, lambda route: route.continue_()) # 构造查询参数 params { query: keyword, city: city, page: 1 } try: await page.goto(f{base_url}?{self._dict_to_query(params)}, timeoutself.config.timeout) # 等待内容加载 await page.wait_for_selector(.job-list-box, timeout10000) # 模拟滚动加载 for _ in range(3): await page.evaluate(window.scrollTo(0, document.body.scrollHeight)) await asyncio.sleep(1) # 提取职位信息 job_items await page.query_selector_all(.job-card-wrapper) for item in job_items[:10]: # 限制数量避免被封 try: title_elem await item.query_selector(.job-title) company_elem await item.query_selector(.company-name) salary_elem await item.query_selector(.salary) if all([title_elem, company_elem]): title await title_elem.text_content() company await company_elem.text_content() salary await salary_elem.text_content() if salary_elem else None # 获取详情页链接 detail_link await item.get_attribute(href) if detail_link: detail_url urljoin(https://www.zhipin.com, detail_link) # 爬取详情页 position_detail await self._crawl_detail_page(detail_url) position JobPosition( titletitle.strip(), companycompany.strip(), salarysalary.strip() if salary else None, locationcity, sourceBOSS直聘, source_urldetail_url, published_datedatetime.now(), **position_detail ) # 去重检查 if await self._is_duplicate(position): continue positions.append(position) # 保存到数据库 await self.save_position(position) except Exception as e: logger.error(f解析职位项失败: {e}) continue except Exception as e: logger.error(f爬取BOSS直聘失败: {e}) return positions async def _crawl_detail_page(self, url: str) - Dict[str, Any]: 爬取详情页信息 detail_info { description: , requirements: [], benefits: [] } try: async with self.get_page() as page: await page.goto(url, timeoutself.config.timeout) # 等待内容加载 await page.wait_for_selector(.job-detail, timeout5000) # 提取职位描述 desc_elem await page.query_selector(.job-sec-text) if desc_elem: detail_info[description] await desc_elem.text_content() # 提取职位要求 req_elems await page.query_selector_all(.job-requirement li) detail_info[requirements] [ await elem.text_content() for elem in req_elems ] # 提取福利待遇 benefit_elems await page.query_selector_all(.job-benefits span) detail_info[benefits] [ await elem.text_content() for elem in benefit_elems ] except Exception as e: logger.error(f爬取详情页失败 {url}: {e}) return detail_info async def crawl_lagou(self, keyword: str, city: str 北京) - List[JobPosition]: 爬取拉勾网 positions [] base_url https://www.lagou.com/jobs/list_ async with self.get_page() as page: # 设置Cookie绕过反爬 await page.context.add_cookies([ { name: user_trace_token, value: test_token, domain: .lagou.com, path: / } ]) url f{base_url}{keyword}?city{city} try: await page.goto(url, timeoutself.config.timeout) # 处理弹窗 try: close_btn await page.wait_for_selector(.popup-close, timeout3000) if close_btn: await close_btn.click() except: pass # 提取职位列表 await page.wait_for_selector(.item__10RTO, timeout10000) job_items await page.query_selector_all(.item__10RTO) for item in job_items[:10]: try: title_elem await item.query_selector(.p-top__1F7CL a) company_elem await item.query_selector(.company-name__2-SjF) salary_elem await item.query_selector(.money__3Lkgq) if all([title_elem, company_elem]): title await title_elem.text_content() company await company_elem.text_content() salary await salary_elem.text_content() if salary_elem else None detail_link await title_elem.get_attribute(href) position JobPosition( titletitle.strip(), companycompany.strip(), salarysalary.strip() if salary else None, locationcity, source拉勾网, source_urldetail_link or url, published_datedatetime.now(), description ) if await self._is_duplicate(position): continue positions.append(position) await self.save_position(position) except Exception as e: logger.error(f解析拉勾职位失败: {e}) except Exception as e: logger.error(f爬取拉勾网失败: {e}) return positions async def _is_duplicate(self, position: JobPosition) - bool: 检查职位是否重复 # 使用MD5生成唯一标识 position_hash hashlib.md5( f{position.title}_{position.company}_{position.source}.encode() ).hexdigest() # 检查Redis中是否存在 exists await self.redis_client.exists(fjob:{position_hash}) if exists: return True # 设置24小时过期 await self.redis_client.setex(fjob:{position_hash}, 86400, 1) return False async def save_position(self, position: JobPosition): 保存职位到数据库 try: # 保存到MongoDB await self.db.positions.update_one( {id: position.id}, {$set: position.dict()}, upsertTrue ) logger.info(f保存职位成功: {position.title}) except Exception as e: logger.error(f保存职位失败: {e}) def _dict_to_query(self, params: Dict) - str: 将字典转换为查询字符串 return .join([f{k}{v} for k, v in params.items()]) async def crawl_multiple_sources(self, keyword: str, sources: List[str] None) - List[JobPosition]: 多源并发爬取 if sources is None: sources [boss, lagou] tasks [] if boss in sources: tasks.append(self.crawl_boss_zhipin(keyword)) if lagou in sources: tasks.append(self.crawl_lagou(keyword)) # 可以添加更多平台 results await asyncio.gather(*tasks, return_exceptionsTrue) all_positions [] for result in results: if isinstance(result, Exception): logger.error(f爬取任务失败: {result}) elif isinstance(result, list): all_positions.extend(result) return all_positions3. 分布式任务队列实现pythonfrom celery import Celery from pydantic import BaseModel from typing import List import asyncio # Celery配置 celery_app Celery( job_crawler, brokerredis://localhost:6379/0, backendredis://localhost:6379/0 ) celery_app.conf.update( task_serializerjson, accept_content[json], result_serializerjson, timezoneAsia/Shanghai, enable_utcTrue, ) class CrawlTask(BaseModel): 爬虫任务模型 keywords: List[str] sources: List[str] cities: List[str] max_results: int 50 celery_app.task(bindTrue, max_retries3) def start_crawl_task(self, task_data: dict): 启动爬虫任务 task CrawlTask(**task_data) # 创建事件循环并运行异步任务 loop asyncio.new_event_loop() asyncio.set_event_loop(loop) try: crawler AsyncJobCrawler() loop.run_until_complete(crawler.init_resources()) all_results [] for keyword in task.keywords: for city in task.cities: positions loop.run_until_complete( crawler.crawl_multiple_sources(keyword, task.sources) ) all_results.extend(positions[:task.max_results]) loop.run_until_complete(crawler.close_resources()) return { status: success, count: len(all_results), data: [pos.dict() for pos in all_results] } except Exception as e: self.retry(exce, countdown60) finally: loop.close()4. FastAPI Web服务pythonfrom fastapi import FastAPI, HTTPException, BackgroundTasks, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from typing import List, Optional from datetime import datetime, timedelta app FastAPI( title招聘信息聚合API, description多源招聘信息聚合爬虫系统, version1.0.0 ) # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) app.get(/) async def root(): return {message: 招聘信息聚合爬虫API服务} app.post(/api/v1/crawl) async def start_crawl( keywords: List[str] Query(..., description搜索关键词), sources: List[str] Query([boss, lagou], description数据来源), cities: List[str] Query([北京], description城市), max_results: int Query(50, description每个关键词最大结果数), background_tasks: BackgroundTasks None ): 启动爬虫任务 task_data { keywords: keywords, sources: sources, cities: cities, max_results: max_results } # 异步执行爬虫任务 task start_crawl_task.delay(task_data) return { task_id: task.id, status: started, message: 爬虫任务已启动 } app.get(/api/v1/positions) async def get_positions( keyword: Optional[str] None, city: Optional[str] None, source: Optional[str] None, page: int 1, limit: int 20, days: int 7 ): 查询职位信息 # 连接MongoDB from motor.motor_asyncio import AsyncIOMotorClient client AsyncIOMotorClient(mongodb://localhost:27017) db client.job_aggregator # 构建查询条件 query {} if keyword: query[$or] [ {title: {$regex: keyword, $options: i}}, {company: {$regex: keyword, $options: i}}, {description: {$regex: keyword, $options: i}} ] if city: query[location] {$regex: city, $options: i} if source: query[source] source # 时间过滤 time_threshold datetime.now() - timedelta(daysdays) query[crawl_time] {$gte: time_threshold} # 执行查询 cursor db.positions.find(query).sort(published_date, -1) cursor.skip((page - 1) * limit).limit(limit) positions await cursor.to_list(lengthlimit) total await db.positions.count_documents(query) # 转换ObjectId为字符串 for pos in positions: pos[_id] str(pos[_id]) return { data: positions, total: total, page: page, limit: limit, total_pages: (total limit - 1) // limit } app.get(/api/v1/statistics) async def get_statistics(days: int 30): 获取统计信息 client AsyncIOMotorClient(mongodb://localhost:27017) db client.job_aggregator time_threshold datetime.now() - timedelta(daysdays) # 各平台职位数量 pipeline [ {$match: {crawl_time: {$gte: time_threshold}}}, {$group: {_id: $source, count: {$sum: 1}}}, {$sort: {count: -1}} ] source_stats await db.positions.aggregate(pipeline).to_list(None) # 热门职位 title_pipeline [ {$match: {crawl_time: {$gte: time_threshold}}}, {$group: {_id: $title, count: {$sum: 1}}}, {$sort: {count: -1}}, {$limit: 10} ] hot_positions await db.positions.aggregate(title_pipeline).to_list(None) # 城市分布 city_pipeline [ {$match: {crawl_time: {$gte: time_threshold}}}, {$group: {_id: $location, count: {$sum: 1}}}, {$sort: {count: -1}}, {$limit: 10} ] city_distribution await db.positions.aggregate(city_pipeline).to_list(None) return { source_distribution: source_stats, hot_positions: hot_positions, city_distribution: city_distribution, period_days: days }5. Docker部署配置dockerfile# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ unzip \ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update apt-get install -y google-chrome-stable \ rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser chown -R appuser:appuser /app USER appuser # 启动命令 CMD [uvicorn, main:app, --host, 0.0.0.0, --port, 8000, --reload]yaml# docker-compose.yml version: 3.8 services: mongodb: image: mongo:latest container_name: job_mongodb restart: always ports: - 27017:27017 volumes: - mongodb_data:/data/db environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: password redis: image: redis:alpine container_name: job_redis restart: always ports: - 6379:6379 volumes: - redis_data:/data celery_worker: build: . container_name: celery_worker restart: always command: celery -A main.celery_app worker --loglevelinfo depends_on: - redis - mongodb environment: - REDIS_HOSTredis - MONGO_HOSTmongodb celery_beat: build: . container_name: celery_beat restart: always command: celery -A main.celery_app beat --loglevelinfo depends_on: - redis - mongodb environment: - REDIS_HOSTredis - MONGO_HOSTmongodb web: build: . container_name: job_crawler_web restart: always ports: - 8000:8000 depends_on: - mongodb - redis - celery_worker environment: - REDIS_HOSTredis - MONGO_HOSTmongodb - MONGO_USERNAMEadmin - MONGO_PASSWORDpassword volumes: mongodb_data: redis_data:6. 高级功能扩展pythonclass AdvancedJobAnalyzer: 高级职位分析器 def __init__(self): import nltk from sklearn.feature_extraction.text import TfidfVectorizer import jieba # 初始化NLP工具 nltk.download(stopwords) self.stopwords set(nltk.corpus.stopwords.words(chinese)) self.vectorizer TfidfVectorizer(max_features100) async def analyze_salary_trend(self, positions: List[JobPosition]): 分析薪资趋势 import pandas as pd import numpy as np df pd.DataFrame([pos.dict() for pos in positions]) # 提取薪资数值 df[salary_numeric] df[salary].apply(self._extract_salary) # 按职位分组分析 salary_by_title df.groupby(title)[salary_numeric].agg([mean, count]) return salary_by_title.to_dict() def _extract_salary(self, salary_str: str) - float: 从薪资字符串提取数值 import re if not salary_str: return 0 # 匹配数字模式 pattern r(\d\.?\d*)K?-\d\.?\d*K match re.search(pattern, salary_str) if match: numbers re.findall(r\d\.?\d*, match.group()) if numbers: return float(numbers[0]) return 0 async def extract_skills(self, positions: List[JobPosition]) - Dict[str, List[str]]: 从职位描述中提取技能关键词 skills_dict {} # 预定义技能词库 tech_skills { Python, Java, JavaScript, C, Go, Rust, Django, Flask, FastAPI, Spring, React, Vue, MySQL, PostgreSQL, MongoDB, Redis, Elasticsearch, Docker, Kubernetes, AWS, Azure, GCP, TensorFlow, PyTorch, 机器学习, 深度学习 } for position in positions: text f{position.title} {position.description} found_skills [skill for skill in tech_skills if skill in text] if found_skills: skills_dict[position.title] found_skills return skills_dict async def generate_industry_report(self, positions: List[JobPosition]) - Dict: 生成行业分析报告 from collections import Counter import pandas as pd # 提取公司行业信息 companies [pos.company for pos in positions] # 简单的行业分类 industry_keywords { 互联网: [科技, 网络, 互联网, 信息, 软件], 金融: [银行, 证券, 保险, 金融, 投资], 教育: [教育, 培训, 学校, 学院], 医疗: [医疗, 医院, 健康, 医药], 制造: [制造, 工厂, 工业, 生产] } industry_counts Counter() for company in companies: for industry, keywords in industry_keywords.items(): if any(keyword in company for keyword in keywords): industry_counts[industry] 1 break else: industry_counts[其他] 1 return dict(industry_counts)反爬虫策略与应对方案1. 动态User-Agent轮换pythonclass UserAgentManager: User-Agent管理器 def __init__(self): self.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 ] def get_random_agent(self): import random return random.choice(self.user_agents)2. IP代理池pythonclass ProxyManager: 代理IP管理器 def __init__(self): self.proxy_list [] async def refresh_proxies(self): 刷新代理IP池 async with aiohttp.ClientSession() as session: async with session.get(https://api.proxy-provider.com/proxies) as resp: data await resp.json() self.proxy_list data.get(proxies, []) def get_proxy(self): import random return random.choice(self.proxy_list) if self.proxy_list else None3. 请求频率控制pythonclass RateLimiter: 请求频率限制器 def __init__(self, max_requests: int 10, period: int 60): self.max_requests max_requests self.period period self.requests [] async def wait_if_needed(self): import asyncio from datetime import datetime, timedelta now datetime.now() cutoff now - timedelta(secondsself.period) # 清理过期的请求记录 self.requests [req for req in self.requests if req cutoff] if len(self.requests) self.max_requests: # 等待直到有请求过期 wait_time (self.requests[0] - cutoff).total_seconds() await asyncio.sleep(wait_time) self.requests.append(now)性能优化建议连接池管理使用aiohttp的连接池复用连接异步数据库操作使用异步MongoDB驱动数据缓存使用Redis缓存热点数据增量爬取记录最后爬取时间只爬取新数据分布式爬取使用Celery实现分布式任务调度监控与日志pythonclass MonitoringSystem: 爬虫监控系统 def __init__(self): import prometheus_client self.request_counter prometheus_client.Counter( crawler_requests_total, Total number of requests ) self.error_counter prometheus_client.Counter( crawler_errors_total, Total number of errors ) def record_request(self, url: str, success: bool): self.request_counter.inc() if not success: self.error_counter.inc() def generate_report(self): 生成监控报告 import psutil import time return { timestamp: time.time(), cpu_percent: psutil.cpu_percent(), memory_percent: psutil.virtual_memory().percent, disk_usage: psutil.disk_usage(/).percent, requests_total: self.request_counter._value.get(), errors_total: self.error_counter._value.get() }结语本文详细介绍了一个完整的招聘信息聚合爬虫系统的设计与实现。通过使用Playwright、Asyncio、FastAPI等现代Python技术我们构建了一个高效、可扩展的分布式爬虫系统。系统不仅实现了基本的数据采集功能还包含了数据存储、API服务、任务调度、监控报警等完整的企业级功能。在实际应用中还需要注意以下事项遵守robots.txt尊重网站的爬虫协议数据隐私保护妥善处理收集的个人信息法律合规确保爬虫行为符合相关法律法规资源消耗控制合理控制爬取频率避免对目标网站造成压力数据质量保证建立数据清洗和验证机制