网页抓取——requests + BeautifulSoup + Playwright
第9章:网页数据抓取——httpx + BeautifulSoup + Playwright 实战
网页里有海量数据,但手动复制粘贴是最低效的方式。本章从技术选型讲起,带你用 httpx 发起高性能 HTTP 请求,用 BeautifulSoup 解析 HTML,用 Playwright 搞定需要 JavaScript 渲染的动态页面,再配上反爬策略和持久化存储,最后用竞品价格监控系统作为综合实战。
抓取技术选型
面对一个抓取需求,选对工具能节省 80% 的时间。核心问题只有两个:**页面是静态的还是动态的?**数据量有多大?
HTTP 客户端:requests vs httpx
| 维度 | requests | httpx |
|---|---|---|
| 异步支持 | 不支持(只能同步) | 支持 async/await |
| HTTP/2 | 不支持 | 原生支持 |
| API 风格 | 极简,入门友好 | 与 requests 几乎兼容 |
| 适用场景 | 单页抓取、小批量 | 高并发、大批量 |
**结论:**新项目直接用 httpx,API 和 requests 几乎一样,迁移成本为零,但天花板高得多。
HTML 解析:BeautifulSoup vs lxml vs parsel
| 库 | 优势 | 劣势 | 推荐场景 |
|---|---|---|---|
| BeautifulSoup4 | 文档最多,最易上手 | 速度最慢 | 学习、小项目 |
| lxml | 速度快,支持XPath | 安装有C依赖 | 大批量解析 |
| parsel | CSS+XPath均支持,Scrapy同款 | 社区较小 | 灵活需求 |
动态页面:Playwright vs Selenium
Selenium 是老牌工具,但 Playwright 在每一个维度都全面超越它:更快的执行速度、内置等待策略、更好的异步支持、以及更简洁的 API。2024 年以后的新项目,请直接选 Playwright。
选型决策树
- 静态 HTML + 小批量 - 静态 HTML + 大批量并发 - 需要 JavaScript 渲染 / 登录状态 / 点击操作 - 大规模生产爬虫项目
httpx + BeautifulSoup 基础
安装依赖
pip install httpx beautifulsoup4 lxml
基础请求:Headers 伪装与 Session 复用
import httpx
# 伪装成普通浏览器,避免被简单反爬拦截
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
# 使用 Client 复用 TCP 连接(性能更好,自动带 Cookie)
with httpx.Client(headers=HEADERS, timeout=15, follow_redirects=True) as client:
resp = client.get("https://example.com/news")
resp.raise_for_status() # 非 2xx 状态码抛出异常
html = resp.text
print(f"状态码: {resp.status_code}, 页面长度: {len(html)}")
CSS 选择器 vs XPath
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "lxml")
# CSS 选择器(推荐,更直观)
titles = soup.select("div.article-list h2.title a")
for tag in titles:
print(tag.get_text(strip=True), tag["href"])
# XPath(通过 lxml 直接用,适合复杂路径)
from lxml import etree
tree = etree.HTML(html)
links = tree.xpath('//div[@class="article-list"]//h2[@class="title"]/a/@href')
print(links)
完整案例:抓取新闻列表页
import httpx
from bs4 import BeautifulSoup
import csv
from datetime import datetime
def scrape_news_list(url: str) -> list[dict]:
"""抓取新闻列表,返回标题+链接+时间的列表"""
headers = {
"User-Agent": "Mozilla/5.0 (compatible; NewsBot/1.0)",
}
with httpx.Client(headers=headers, timeout=15) as client:
resp = client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
results = []
for item in soup.select("article.news-item"):
title_tag = item.select_one("h2 a")
time_tag = item.select_one("time")
if not title_tag:
continue
results.append({
"title": title_tag.get_text(strip=True),
"url": title_tag.get("href", ""),
"published": time_tag.get("datetime", "") if time_tag else "",
})
return results
def save_to_csv(data: list[dict], filename: str):
if not data:
print("无数据可保存")
return
with open(filename, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f"已保存 {len(data)} 条到 {filename}")
if __name__ == "__main__":
news = scrape_news_list("https://example.com/news")
save_to_csv(news, f"news_{datetime.now():%Y%m%d}.csv")
异步爬虫:asyncio + httpx.AsyncClient
同步爬虫抓 100 个页面需要等上一个请求完成才发下一个,就像单线排队。异步爬虫可以同时发出所有请求,总时间从"100次单次耗时之和"缩短为"最慢那次的耗时"。
并发控制:Semaphore
import asyncio
import httpx
from bs4 import BeautifulSoup
async def fetch_page(client: httpx.AsyncClient, url: str, sem: asyncio.Semaphore) -> dict:
"""带并发限制地抓取单个页面"""
async with sem: # 限制同时进行的请求数
try:
resp = await client.get(url, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
title = soup.select_one("h1")
return {
"url": url,
"title": title.get_text(strip=True) if title else "",
"status": resp.status_code,
}
except httpx.HTTPError as e:
return {"url": url, "title": "", "status": str(e)}
async def scrape_many(urls: list[str], concurrency: int = 10) -> list[dict]:
"""异步并发抓取多个页面,最多同时 concurrency 个请求"""
sem = asyncio.Semaphore(concurrency)
headers = {"User-Agent": "Mozilla/5.0 (compatible; AsyncBot/1.0)"}
async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
tasks = [fetch_page(client, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
return list(results)
# 使用示例:抓取 100 个页面
if __name__ == "__main__":
urls = [f"https://example.com/page/{i}" for i in range(1, 101)]
results = asyncio.run(scrape_many(urls, concurrency=10))
success = [r for r in results if r["title"]]
print(f"成功: {len(success)}/{len(urls)}")
**并发数不是越大越好:**过高并发容易触发目标网站的限流甚至封 IP。建议从 5~10 开始测试,根据目标网站的响应速度和容忍度调整。请求间也可以加随机延迟(
await asyncio.sleep(random.uniform(0.5, 2.0)))。
Playwright 动态页面
安装
pip install playwright
playwright install chromium # 安装 Chromium 浏览器内核
基础用法与等待策略
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
# headless=True 无头模式(不显示浏览器窗口),生产环境用
# headless=False 调试时用,可以看到浏览器实际操作
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("https://example.com/dynamic-page")
# 等待策略选择:
# 1. 等待特定元素出现(最常用)
page.wait_for_selector("div.data-table", timeout=10000)
# 2. 等待网络请求静止(适合 SPA 应用)
# page.wait_for_load_state("networkidle")
# 3. 固定等待(不推荐,只用于最后手段)
# page.wait_for_timeout(2000)
html = page.content()
browser.close()
处理登录
from playwright.sync_api import sync_playwright
def login_and_scrape(username: str, password: str, target_url: str) -> str:
with sync_playwright() as p:
browser = p.chromium.launch(headless=False) # 调试时看到操作
context = browser.new_context()
page = context.new_page()
# 登录
page.goto("https://example.com/login")
page.fill("#username", username)
page.fill("#password", password)
page.click("button[type='submit']")
page.wait_for_url("**/dashboard**") # 等待跳转到仪表盘
# 保存登录状态(下次可复用,避免重复登录)
context.storage_state(path="auth_state.json")
# 访问目标页
page.goto(target_url)
page.wait_for_selector("div.data-container")
html = page.content()
browser.close()
return html
处理无限滚动
import time
from playwright.sync_api import sync_playwright
def scrape_infinite_scroll(url: str, max_scrolls: int = 10) -> list[str]:
"""抓取无限滚动页面,返回所有条目文本"""
items = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
page.wait_for_selector("div.feed-item")
for _ in range(max_scrolls):
# 提取当前已加载的条目
current = page.query_selector_all("div.feed-item")
for el in current:
text = el.inner_text().strip()
if text and text not in items:
items.append(text)
# 滚动到底部触发加载
prev_count = len(current)
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(1500) # 等待新内容加载
# 如果数量没增加,说明已到底部
new_count = len(page.query_selector_all("div.feed-item"))
if new_count == prev_count:
break
browser.close()
return items
反爬处理
User-Agent 轮换
import random
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/119.0.0.0 Safari/537.36",
]
def get_random_headers() -> dict:
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": "zh-CN,zh;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
请求间隔与指数退避
import time
import random
import httpx
def fetch_with_retry(url: str, max_retries: int = 3) -> str:
"""带指数退避重试的请求"""
for attempt in range(max_retries):
try:
# 每次请求前随机等待 1~3 秒
time.sleep(random.uniform(1.0, 3.0))
with httpx.Client(headers=get_random_headers(), timeout=15) as client:
resp = client.get(url)
resp.raise_for_status()
return resp.text
except httpx.HTTPStatusError as e:
if e.response.status_code == 429: # Too Many Requests
wait = (2 ** attempt) * 5 # 5s, 10s, 20s
print(f"被限流,等待 {wait}s 后重试...")
time.sleep(wait)
else:
raise
raise RuntimeError(f"重试 {max_retries} 次后仍失败: {url}")
**代理 IP 接入:**当单一 IP 被封时,可以接入代理池。httpx 支持
proxies={"all://": "http://proxy:port"}参数,或使用付费代理服务商提供的 API 动态获取 IP。免费代理质量差,生产环境建议用付费代理。
数据存储
存入 SQLite
import sqlite3
from datetime import datetime
def init_db(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL,
url TEXT UNIQUE, -- UNIQUE 保证同一 URL 不重复插入
scraped_at TEXT
)
""")
conn.commit()
return conn
def upsert_product(conn: sqlite3.Connection, name: str, price: float, url: str):
"""INSERT OR REPLACE 实现增量更新"""
conn.execute(
"""INSERT INTO products (name, price, url, scraped_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
price=excluded.price,
scraped_at=excluded.scraped_at""",
(name, price, url, datetime.now().isoformat()),
)
conn.commit()
实战项目:竞品价格监控系统
每天自动抓取竞品商品价格,价格变动超过阈值时发送告警。
import httpx
import sqlite3
import smtplib
from email.message import EmailMessage
from bs4 import BeautifulSoup
from datetime import datetime
import time, random
# ---- 配置 ----
TARGETS = [
{"name": "竞品A 蓝牙耳机", "url": "https://competitor-a.com/product/123"},
{"name": "竞品B 蓝牙耳机", "url": "https://competitor-b.com/product/456"},
]
DB_PATH = "prices.db"
ALERT_THRESHOLD = 0.05 # 价格变动超过 5% 触发告警
ALERT_EMAIL = "[email protected]"
# ---- 数据库 ----
def init_db():
conn = sqlite3.connect(DB_PATH)
conn.execute("""CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, url TEXT, price REAL, scraped_at TEXT
)""")
conn.commit()
return conn
def get_last_price(conn, url: str) -> float | None:
row = conn.execute(
"SELECT price FROM price_history WHERE url=? ORDER BY scraped_at DESC LIMIT 1",
(url,)
).fetchone()
return row[0] if row else None
def save_price(conn, name: str, url: str, price: float):
conn.execute(
"INSERT INTO price_history (name, url, price, scraped_at) VALUES (?,?,?,?)",
(name, url, price, datetime.now().isoformat())
)
conn.commit()
# ---- 抓取 ----
def scrape_price(url: str) -> float | None:
headers = {"User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)"}
try:
time.sleep(random.uniform(1, 3))
with httpx.Client(headers=headers, timeout=15) as client:
resp = client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
price_tag = soup.select_one("span.price, div.current-price, [data-price]")
if price_tag:
text = price_tag.get_text(strip=True).replace("¥", "").replace(",", "")
return float(text)
except Exception as e:
print(f"抓取失败 {url}: {e}")
return None
# ---- 告警 ----
def send_alert(name: str, old_price: float, new_price: float, url: str):
change_pct = (new_price - old_price) / old_price * 100
msg = EmailMessage()
msg["Subject"] = f"价格变动告警:{name}"
msg["From"] = "[email protected]"
msg["To"] = ALERT_EMAIL
msg.set_content(
f"商品:{name}\n"
f"原价:¥{old_price:.2f}\n"
f"现价:¥{new_price:.2f}(变动 {change_pct:+.1f}%)\n"
f"链接:{url}"
)
# 实际使用需配置 SMTP 服务,详见第10章
print(f"[告警] {name}: ¥{old_price}
# ---- 主流程 ----
def main():
conn = init_db()
for target in TARGETS:
name, url = target["name"], target["url"]
price = scrape_price(url)
if price is None:
print(f"[跳过] {name} 抓取失败")
continue
last_price = get_last_price(conn, url)
save_price(conn, name, url, price)
if last_price is not None:
change = abs(price - last_price) / last_price
if change >= ALERT_THRESHOLD:
send_alert(name, last_price, price, url)
else:
print(f"[正常] {name}: ¥{price:.2f}(昨日 ¥{last_price:.2f})")
else:
print(f"[首次] {name}: ¥{price:.2f} 已记录")
conn.close()
if __name__ == "__main__":
main()
**部署建议:**用 cron(Linux/macOS)或 Windows 任务计划程序每天定时运行此脚本。配合第15章的调度工具可以实现更完善的错误重试和日志记录。价格告警的发送方式请参考第10章(邮件)和第11章(飞书/企业微信)。
上一章
下一章
第10章:邮件自动化