- 引言 在数据采集过程中,爬虫经常需要面对 重复数据 的问题。如果每次爬取都全量抓取,不仅浪费资源,还可能导致数据冗余。增量爬取(Incremental Crawling) 是一种高效策略,它仅抓取 新增或更新 的数据,而跳过已采集的旧数据。 本文将详细介绍 Python爬虫的增量爬取与历史数据比对 策略,涵盖以下内容:
- 增量爬取的核心思路
- 去重方案对比(数据库、文件、内存)
- 基于时间戳、哈希、数据库比对的实现方法
- 完整代码示例(Scrapy + MySQL 增量爬取)
- 增量爬取的核心思路 增量爬取的核心是 识别哪些数据是新的或已更新的,通常采用以下方式: ● 基于时间戳(Last-Modified / Update-Time) ● 基于内容哈希(MD5/SHA1) ● 基于数据库比对(MySQL/Redis/MongoDB)
- 1 基于时间戳的增量爬取 适用于数据源带有 发布时间(如新闻、博客) 的场景:
- 记录上次爬取的 最新时间戳
- 下次爬取时,只抓取 晚于该时间戳的数据 优点:简单高效,适用于结构化数据 缺点:依赖数据源的时间字段,不适用于无时间戳的网页
- 2 基于内容哈希的去重 适用于 内容可能更新但URL不变 的页面(如电商价格):
- 计算页面内容的 哈希值(如MD5)
- 比对哈希值,若变化则视为更新 优点:适用于动态内容 缺点:计算开销较大
- 3 基于数据库比对的增量爬取 适用于 大规模数据管理:
- 将已爬取的 URL 或关键字段 存入数据库(MySQL/Redis)
- 每次爬取前查询数据库,判断是否已存在 优点:支持分布式去重 缺点:需要额外存储
- 去重方案对比 方案 适用场景 优点 缺点 内存去重 单机小规模爬虫 速度快(set() ) 重启后数据丢失 文件存储 中小规模爬虫 简单(CSV/JSON) 性能较低 SQL数据库 结构化数据管理 支持复杂查询(MySQL) 需要数据库维护 NoSQL数据库 高并发分布式爬虫 高性能(Redis/MongoDB) 内存占用较高
- 增量爬取实现方法
- 1 基于时间戳的增量爬取(示例)
from datetime import datetime
class NewsSpider(scrapy.Spider): name = "news_spider" last_crawl_time = None # 上次爬取的最新时间
def start_requests(self):
# 从文件/DB加载上次爬取时间
self.last_crawl_time = self.load_last_crawl_time()
# 设置代理信息
proxy = "http://www.16yun.cn:5445"
proxy_auth = "16QMSOML:280651"
# 添加代理到请求中
yield scrapy.Request(
url="https://news.example.com/latest",
meta={
'proxy': proxy,
'proxy_user_pass': proxy_auth
}
)
def parse(self, response):
# 检查响应状态码,判断是否成功获取数据
if response.status != 200:
self.logger.error(f"Failed to fetch data from {response.url}. Status code: {response.status}")
self.logger.error("This might be due to network issues or an invalid URL. Please check the URL and try again.")
return
for article in response.css(".article"):
pub_time = datetime.strptime(
article.css(".time::text").get(),
"%Y-%m-%d %H:%M:%S"
)
if self.last_crawl_time and pub_time <= self.last_crawl_time:
continue # 跳过旧文章
yield {
"title": article.css("h2::text").get(),
"time": pub_time,
}
# 更新最新爬取时间
self.save_last_crawl_time(datetime.now())
def load_last_crawl_time(self):
try:
with open("last_crawl.txt", "r") as f:
return datetime.strptime(f.read(), "%Y-%m-%d %H:%M:%S")
except FileNotFoundError:
return None
def save_last_crawl_time(self, time):
with open("last_crawl.txt", "w") as f:
f.write(time.strftime("%Y-%m-%d %H:%M:%S"))
4.2 基于内容哈希的去重(示例)
``` import hashlib
class ContentHashSpider(scrapy.Spider):
name = "hash_spider"
seen_hashes = set() # 存储已爬取的哈希
def parse(self, response):
content = response.css("body").get()
content_hash = hashlib.md5(content.encode()).hexdigest()
if content_hash in self.seen_hashes:
return # 跳过重复内容
self.seen_hashes.add(content_hash)
yield {"url": response.url, "content": content}
4.3 基于MySQL的增量爬取(完整示例) (1)MySQL 表结构
id INT AUTO_INCREMENT PRIMARY KEY,
url VARCHAR(255) UNIQUE,
content_hash CHAR(32),
last_updated TIMESTAMP
);
(2)Scrapy 爬虫代码
import hashlib
from scrapy import Spider, Request
class MySQLIncrementalSpider(Spider):
name = "mysql_incremental"
start_urls = ["https://example.com"]
def __init__(self):
self.conn = pymysql.connect(
host="localhost",
user="root",
password="123456",
db="crawler_db"
)
self.cursor = self.conn.cursor()
def parse(self, response):
url = response.url
content = response.text
content_hash = hashlib.md5(content.encode()).hexdigest()
# 检查是否已爬取
self.cursor.execute(
"SELECT content_hash FROM crawled_data WHERE url=%s",
(url,)
)
result = self.cursor.fetchone()
if result and result[0] == content_hash:
return # 内容未更新
# 插入或更新数据库
self.cursor.execute(
"""INSERT INTO crawled_data (url, content_hash, last_updated)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE
content_hash=%s, last_updated=NOW()""",
(url, content_hash, content_hash)
)
self.conn.commit()
yield {"url": url, "content": content}
def close(self, reason):
self.cursor.close()
self.conn.close()
- 结论 策略 适用场景 推荐存储方案 时间戳比对 新闻、博客等带时间的数据 文件/MySQL 内容哈希 动态内容(如商品价格) Redis/内存 数据库去重 结构化数据管理 MySQL/MongoDB 最佳实践: ● 小型爬虫 → 内存去重(set()) ● 中型爬虫 → 文件存储(JSON/CSV) ● 大型分布式爬虫 → Redis + MySQL 通过合理选择增量爬取策略,可以显著提升爬虫效率,减少资源浪费。