1. 引言
在数据采集过程中,爬虫经常需要面对 重复数据 的问题。如果每次爬取都全量抓取,不仅浪费资源,还可能导致数据冗余。增量爬取(Incremental Crawling) 是一种高效策略,它仅抓取 新增或更新 的数据,而跳过已采集的旧数据。
本文将详细介绍 Python爬虫的增量爬取与历史数据比对 策略,涵盖以下内容:
增量爬取的核心思路
去重方案对比(数据库、文件、内存)
基于时间戳、哈希、数据库比对的实现方法
完整代码示例(Scrapy + MySQL 增量爬取)
2. 增量爬取的核心思路
增量爬取的核心是 识别哪些数据是新的或已更新的,通常采用以下方式:
基于时间戳(Last-Modified / Update-Time)
基于内容哈希(MD5/SHA1)
基于数据库比对(MySQL/Redis/MongoDB)
2.1 基于时间戳的增量爬取
适用于数据源带有 发布时间(如新闻、博客) 的场景:
记录上次爬取的 最新时间戳
下次爬取时,只抓取 晚于该时间戳的数据
优点:简单高效,适用于结构化数据
缺点:依赖数据源的时间字段,不适用于无时间戳的网页
2.2 基于内容哈希的去重
适用于 内容可能更新但URL不变 的页面(如电商价格):
计算页面内容的 哈希值(如MD5)
比对哈希值,若变化则视为更新
优点:适用于动态内容
缺点:计算开销较大
2.3 基于数据库比对的增量爬取
适用于 大规模数据管理:
将已爬取的 URL 或关键字段 存入数据库(MySQL/Redis)
每次爬取前查询数据库,判断是否已存在
优点:支持分布式去重
缺点:需要额外存储
3. 去重方案对比
方案 | 适用场景 | 优点 | 缺点 |
内存去重 | 单机小规模爬虫 | 速度快( ) | 重启后数据丢失 |
文件存储 | 中小规模爬虫 | 简单(CSV/JSON) | 性能较低 |
SQL数据库 | 结构化数据管理 | 支持复杂查询(MySQL) | 需要数据库维护 |
NoSQL数据库 | 高并发分布式爬虫 | 高性能(Redis/MongoDB) | 内存占用较高 |
4. 增量爬取实现方法
4.1 基于时间戳的增量爬取(示例)
import scrapy from datetime import datetime class NewsSpider(scrapy.Spider): name = "news_spider" last_crawl_time = None # 上次爬取的最新时间 def start_requests(self): # 从文件/DB加载上次爬取时间 self.last_crawl_time = self.load_last_crawl_time() # 设置代理信息 proxy = "http://www.16yun.cn:5445" proxy_auth = "16QMSOML:280651" # 添加代理到请求中 yield scrapy.Request( url="https://news.example.com/latest", meta={ 'proxy': proxy, 'proxy_user_pass': proxy_auth } ) def parse(self, response): # 检查响应状态码,判断是否成功获取数据 if response.status != 200: self.logger.error(f"Failed to fetch data from {response.url}. Status code: {response.status}") self.logger.error("This might be due to network issues or an invalid URL. Please check the URL and try again.") return for article in response.css(".article"): pub_time = datetime.strptime( article.css(".time::text").get(), "%Y-%m-%d %H:%M:%S" ) if self.last_crawl_time and pub_time <= self.last_crawl_time: continue # 跳过旧文章 yield { "title": article.css("h2::text").get(), "time": pub_time, } # 更新最新爬取时间 self.save_last_crawl_time(datetime.now()) def load_last_crawl_time(self): try: with open("last_crawl.txt", "r") as f: return datetime.strptime(f.read(), "%Y-%m-%d %H:%M:%S") except FileNotFoundError: return None def save_last_crawl_time(self, time): with open("last_crawl.txt", "w") as f: f.write(time.strftime("%Y-%m-%d %H:%M:%S"))
4.2 基于内容哈希的去重(示例)
import hashlib class ContentHashSpider(scrapy.Spider): name = "hash_spider" seen_hashes = set() # 存储已爬取的哈希 def parse(self, response): content = response.css("body").get() content_hash = hashlib.md5(content.encode()).hexdigest() if content_hash in self.seen_hashes: return # 跳过重复内容 self.seen_hashes.add(content_hash) yield {"url": response.url, "content": content}
4.3 基于MySQL的增量爬取(完整示例)
(1)MySQL 表结构
CREATE TABLE crawled_data ( id INT AUTO_INCREMENT PRIMARY KEY, url VARCHAR(255) UNIQUE, content_hash CHAR(32), last_updated TIMESTAMP );
(2)Scrapy 爬虫代码
import pymysql import hashlib from scrapy import Spider, Request class MySQLIncrementalSpider(Spider): name = "mysql_incremental" start_urls = ["https://example.com"] def __init__(self): self.conn = pymysql.connect( host="localhost", user="root", password="123456", db="crawler_db" ) self.cursor = self.conn.cursor() def parse(self, response): url = response.url content = response.text content_hash = hashlib.md5(content.encode()).hexdigest() # 检查是否已爬取 self.cursor.execute( "SELECT content_hash FROM crawled_data WHERE url=%s", (url,) ) result = self.cursor.fetchone() if result and result[0] == content_hash: return # 内容未更新 # 插入或更新数据库 self.cursor.execute( """INSERT INTO crawled_data (url, content_hash, last_updated) VALUES (%s, %s, NOW()) ON DUPLICATE KEY UPDATE content_hash=%s, last_updated=NOW()""", (url, content_hash, content_hash) ) self.conn.commit() yield {"url": url, "content": content} def close(self, reason): self.cursor.close() self.conn.close()
5. 结论
策略 | 适用场景 | 推荐存储方案 |
时间戳比对 | 新闻、博客等带时间的数据 | 文件/MySQL |
内容哈希 | 动态内容(如商品价格) | Redis/内存 |
数据库去重 | 结构化数据管理 | MySQL/MongoDB |
最佳实践:
小型爬虫 → 内存去重(
set()
)中型爬虫 → 文件存储(JSON/CSV)
大型分布式爬虫 → Redis + MySQL
通过合理选择增量爬取策略,可以显著提升爬虫效率,减少资源浪费。