第 9 章

网页抓取——requests + BeautifulSoup + Playwright

第9章:网页数据抓取——httpx + BeautifulSoup + Playwright 实战

网页里有海量数据,但手动复制粘贴是最低效的方式。本章从技术选型讲起,带你用 httpx 发起高性能 HTTP 请求,用 BeautifulSoup 解析 HTML,用 Playwright 搞定需要 JavaScript 渲染的动态页面,再配上反爬策略和持久化存储,最后用竞品价格监控系统作为综合实战。

抓取技术选型

面对一个抓取需求,选对工具能节省 80% 的时间。核心问题只有两个:**页面是静态的还是动态的?**数据量有多大?

HTTP 客户端:requests vs httpx

维度 requests httpx
异步支持 不支持(只能同步) 支持 async/await
HTTP/2 不支持 原生支持
API 风格 极简,入门友好 与 requests 几乎兼容
适用场景 单页抓取、小批量 高并发、大批量

**结论:**新项目直接用 httpx,API 和 requests 几乎一样,迁移成本为零,但天花板高得多。

HTML 解析:BeautifulSoup vs lxml vs parsel

优势 劣势 推荐场景
BeautifulSoup4 文档最多,最易上手 速度最慢 学习、小项目
lxml 速度快,支持XPath 安装有C依赖 大批量解析
parsel CSS+XPath均支持,Scrapy同款 社区较小 灵活需求

动态页面:Playwright vs Selenium

Selenium 是老牌工具,但 Playwright 在每一个维度都全面超越它:更快的执行速度、内置等待策略、更好的异步支持、以及更简洁的 API。2024 年以后的新项目,请直接选 Playwright。

选型决策树

httpx + BeautifulSoup 基础

安装依赖

pip install httpx beautifulsoup4 lxml

基础请求:Headers 伪装与 Session 复用

import httpx

# 伪装成普通浏览器,避免被简单反爬拦截
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}

# 使用 Client 复用 TCP 连接(性能更好,自动带 Cookie)
with httpx.Client(headers=HEADERS, timeout=15, follow_redirects=True) as client:
    resp = client.get("https://example.com/news")
    resp.raise_for_status()  # 非 2xx 状态码抛出异常
    html = resp.text
    print(f"状态码: {resp.status_code}, 页面长度: {len(html)}")

CSS 选择器 vs XPath

from bs4 import BeautifulSoup

soup = BeautifulSoup(html, "lxml")

# CSS 选择器(推荐,更直观)
titles = soup.select("div.article-list h2.title a")
for tag in titles:
    print(tag.get_text(strip=True), tag["href"])

# XPath(通过 lxml 直接用,适合复杂路径)
from lxml import etree
tree = etree.HTML(html)
links = tree.xpath('//div[@class="article-list"]//h2[@class="title"]/a/@href')
print(links)

完整案例:抓取新闻列表页

import httpx
from bs4 import BeautifulSoup
import csv
from datetime import datetime

def scrape_news_list(url: str) -> list[dict]:
    """抓取新闻列表,返回标题+链接+时间的列表"""
    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; NewsBot/1.0)",
    }
    with httpx.Client(headers=headers, timeout=15) as client:
        resp = client.get(url)
        resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "lxml")
    results = []

    for item in soup.select("article.news-item"):
        title_tag = item.select_one("h2 a")
        time_tag = item.select_one("time")
        if not title_tag:
            continue
        results.append({
            "title": title_tag.get_text(strip=True),
            "url": title_tag.get("href", ""),
            "published": time_tag.get("datetime", "") if time_tag else "",
        })
    return results

def save_to_csv(data: list[dict], filename: str):
    if not data:
        print("无数据可保存")
        return
    with open(filename, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)
    print(f"已保存 {len(data)} 条到 {filename}")

if __name__ == "__main__":
    news = scrape_news_list("https://example.com/news")
    save_to_csv(news, f"news_{datetime.now():%Y%m%d}.csv")

异步爬虫:asyncio + httpx.AsyncClient

同步爬虫抓 100 个页面需要等上一个请求完成才发下一个,就像单线排队。异步爬虫可以同时发出所有请求,总时间从"100次单次耗时之和"缩短为"最慢那次的耗时"。

并发控制:Semaphore

import asyncio
import httpx
from bs4 import BeautifulSoup

async def fetch_page(client: httpx.AsyncClient, url: str, sem: asyncio.Semaphore) -> dict:
    """带并发限制地抓取单个页面"""
    async with sem:  # 限制同时进行的请求数
        try:
            resp = await client.get(url, timeout=15)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.text, "lxml")
            title = soup.select_one("h1")
            return {
                "url": url,
                "title": title.get_text(strip=True) if title else "",
                "status": resp.status_code,
            }
        except httpx.HTTPError as e:
            return {"url": url, "title": "", "status": str(e)}

async def scrape_many(urls: list[str], concurrency: int = 10) -> list[dict]:
    """异步并发抓取多个页面,最多同时 concurrency 个请求"""
    sem = asyncio.Semaphore(concurrency)
    headers = {"User-Agent": "Mozilla/5.0 (compatible; AsyncBot/1.0)"}
    async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
        tasks = [fetch_page(client, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)
    return list(results)

# 使用示例:抓取 100 个页面
if __name__ == "__main__":
    urls = [f"https://example.com/page/{i}" for i in range(1, 101)]
    results = asyncio.run(scrape_many(urls, concurrency=10))
    success = [r for r in results if r["title"]]
    print(f"成功: {len(success)}/{len(urls)}")

**并发数不是越大越好:**过高并发容易触发目标网站的限流甚至封 IP。建议从 5~10 开始测试,根据目标网站的响应速度和容忍度调整。请求间也可以加随机延迟(await asyncio.sleep(random.uniform(0.5, 2.0)))。

Playwright 动态页面

安装

pip install playwright
playwright install chromium  # 安装 Chromium 浏览器内核

基础用法与等待策略

from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    # headless=True 无头模式(不显示浏览器窗口),生产环境用
    # headless=False 调试时用,可以看到浏览器实际操作
    browser = p.chromium.launch(headless=True)
    page = browser.new_page()

    page.goto("https://example.com/dynamic-page")

    # 等待策略选择:
    # 1. 等待特定元素出现(最常用)
    page.wait_for_selector("div.data-table", timeout=10000)

    # 2. 等待网络请求静止(适合 SPA 应用)
    # page.wait_for_load_state("networkidle")

    # 3. 固定等待(不推荐,只用于最后手段)
    # page.wait_for_timeout(2000)

    html = page.content()
    browser.close()

处理登录

from playwright.sync_api import sync_playwright

def login_and_scrape(username: str, password: str, target_url: str) -> str:
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)  # 调试时看到操作
        context = browser.new_context()
        page = context.new_page()

        # 登录
        page.goto("https://example.com/login")
        page.fill("#username", username)
        page.fill("#password", password)
        page.click("button[type='submit']")
        page.wait_for_url("**/dashboard**")  # 等待跳转到仪表盘

        # 保存登录状态(下次可复用,避免重复登录)
        context.storage_state(path="auth_state.json")

        # 访问目标页
        page.goto(target_url)
        page.wait_for_selector("div.data-container")
        html = page.content()
        browser.close()
    return html

处理无限滚动

import time
from playwright.sync_api import sync_playwright

def scrape_infinite_scroll(url: str, max_scrolls: int = 10) -> list[str]:
    """抓取无限滚动页面,返回所有条目文本"""
    items = []
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url)
        page.wait_for_selector("div.feed-item")

        for _ in range(max_scrolls):
            # 提取当前已加载的条目
            current = page.query_selector_all("div.feed-item")
            for el in current:
                text = el.inner_text().strip()
                if text and text not in items:
                    items.append(text)

            # 滚动到底部触发加载
            prev_count = len(current)
            page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            page.wait_for_timeout(1500)  # 等待新内容加载

            # 如果数量没增加,说明已到底部
            new_count = len(page.query_selector_all("div.feed-item"))
            if new_count == prev_count:
                break

        browser.close()
    return items

反爬处理

User-Agent 轮换

import random

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/119.0.0.0 Safari/537.36",
]

def get_random_headers() -> dict:
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
    }

请求间隔与指数退避

import time
import random
import httpx

def fetch_with_retry(url: str, max_retries: int = 3) -> str:
    """带指数退避重试的请求"""
    for attempt in range(max_retries):
        try:
            # 每次请求前随机等待 1~3 秒
            time.sleep(random.uniform(1.0, 3.0))
            with httpx.Client(headers=get_random_headers(), timeout=15) as client:
                resp = client.get(url)
                resp.raise_for_status()
                return resp.text
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:  # Too Many Requests
                wait = (2 ** attempt) * 5  # 5s, 10s, 20s
                print(f"被限流,等待 {wait}s 后重试...")
                time.sleep(wait)
            else:
                raise
    raise RuntimeError(f"重试 {max_retries} 次后仍失败: {url}")

**代理 IP 接入:**当单一 IP 被封时,可以接入代理池。httpx 支持 proxies={"all://": "http://proxy:port"} 参数,或使用付费代理服务商提供的 API 动态获取 IP。免费代理质量差,生产环境建议用付费代理。

数据存储

存入 SQLite

import sqlite3
from datetime import datetime

def init_db(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL,
            url TEXT UNIQUE,  -- UNIQUE 保证同一 URL 不重复插入
            scraped_at TEXT
        )
    """)
    conn.commit()
    return conn

def upsert_product(conn: sqlite3.Connection, name: str, price: float, url: str):
    """INSERT OR REPLACE 实现增量更新"""
    conn.execute(
        """INSERT INTO products (name, price, url, scraped_at)
           VALUES (?, ?, ?, ?)
           ON CONFLICT(url) DO UPDATE SET
               price=excluded.price,
               scraped_at=excluded.scraped_at""",
        (name, price, url, datetime.now().isoformat()),
    )
    conn.commit()

实战项目:竞品价格监控系统

每天自动抓取竞品商品价格,价格变动超过阈值时发送告警。

import httpx
import sqlite3
import smtplib
from email.message import EmailMessage
from bs4 import BeautifulSoup
from datetime import datetime
import time, random

# ---- 配置 ----
TARGETS = [
    {"name": "竞品A 蓝牙耳机", "url": "https://competitor-a.com/product/123"},
    {"name": "竞品B 蓝牙耳机", "url": "https://competitor-b.com/product/456"},
]
DB_PATH = "prices.db"
ALERT_THRESHOLD = 0.05   # 价格变动超过 5% 触发告警
ALERT_EMAIL = "[email protected]"

# ---- 数据库 ----
def init_db():
    conn = sqlite3.connect(DB_PATH)
    conn.execute("""CREATE TABLE IF NOT EXISTS price_history (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT, url TEXT, price REAL, scraped_at TEXT
    )""")
    conn.commit()
    return conn

def get_last_price(conn, url: str) -> float | None:
    row = conn.execute(
        "SELECT price FROM price_history WHERE url=? ORDER BY scraped_at DESC LIMIT 1",
        (url,)
    ).fetchone()
    return row[0] if row else None

def save_price(conn, name: str, url: str, price: float):
    conn.execute(
        "INSERT INTO price_history (name, url, price, scraped_at) VALUES (?,?,?,?)",
        (name, url, price, datetime.now().isoformat())
    )
    conn.commit()

# ---- 抓取 ----
def scrape_price(url: str) -> float | None:
    headers = {"User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)"}
    try:
        time.sleep(random.uniform(1, 3))
        with httpx.Client(headers=headers, timeout=15) as client:
            resp = client.get(url)
            resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "lxml")
        price_tag = soup.select_one("span.price, div.current-price, [data-price]")
        if price_tag:
            text = price_tag.get_text(strip=True).replace("¥", "").replace(",", "")
            return float(text)
    except Exception as e:
        print(f"抓取失败 {url}: {e}")
    return None

# ---- 告警 ----
def send_alert(name: str, old_price: float, new_price: float, url: str):
    change_pct = (new_price - old_price) / old_price * 100
    msg = EmailMessage()
    msg["Subject"] = f"价格变动告警:{name}"
    msg["From"] = "[email protected]"
    msg["To"] = ALERT_EMAIL
    msg.set_content(
        f"商品:{name}\n"
        f"原价:¥{old_price:.2f}\n"
        f"现价:¥{new_price:.2f}(变动 {change_pct:+.1f}%)\n"
        f"链接:{url}"
    )
    # 实际使用需配置 SMTP 服务,详见第10章
    print(f"[告警] {name}: ¥{old_price}
# ---- 主流程 ----
def main():
    conn = init_db()
    for target in TARGETS:
        name, url = target["name"], target["url"]
        price = scrape_price(url)
        if price is None:
            print(f"[跳过] {name} 抓取失败")
            continue

        last_price = get_last_price(conn, url)
        save_price(conn, name, url, price)

        if last_price is not None:
            change = abs(price - last_price) / last_price
            if change >= ALERT_THRESHOLD:
                send_alert(name, last_price, price, url)
            else:
                print(f"[正常] {name}: ¥{price:.2f}(昨日 ¥{last_price:.2f})")
        else:
            print(f"[首次] {name}: ¥{price:.2f} 已记录")

    conn.close()

if __name__ == "__main__":
    main()

**部署建议:**用 cron(Linux/macOS)或 Windows 任务计划程序每天定时运行此脚本。配合第15章的调度工具可以实现更完善的错误重试和日志记录。价格告警的发送方式请参考第10章(邮件)和第11章(飞书/企业微信)。

上一章

下一章
第10章:邮件自动化
本章评分
4.7  / 5  (34 评分)

💬 留言讨论