开始
feapder 是一款上手简单,功能强大的 Python 爬虫框架,内置 AirSpider、Spider、TaskSpider、BatchSpider 四种爬虫解决不同场景的需求,支持断点续爬、监控报警、浏览器渲染、海量数据去重等功能,更有功能强大的爬虫管理系统 feaplat 为其提供方便的部署及调度
官方文档:feapder官方文档
安装完整版 feapder
1
| pip install "feapder[all]"
|
创建简单爬虫
1
| feapder create -s my_spider
|
生成的简单爬虫实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import feapder
class MySpider(feapder.AirSpider): def start_requests(self): yield feapder.Request("https://spidertools.cn")
def parse(self, request, response): print(response.xpath("//title/text()").extract_first()) print(response.xpath("//meta[@name='description']/@content").extract_first()) print("网站地址: ", response.url)
if __name__ == "__main__": MySpider().start()
|
创建爬虫
create 命令用于创建 feapder 项目,常用选项有 -p
、-s
、-i
使用 -p
选项创建一个爬虫项目
1
| feapder create -p <project_name>
|
项目结构如下
1 2 3 4 5 6 7 8 9 10 11
| my_spider │ CHECK_DATA.md │ main.py │ README.md │ setting.py │ ├─items │ └─ __init__.py │ └─spiders └─ __init__.py
|
main.py 文件中实现命令行解析来调用不同的爬虫,使用 feapder 封装的 ArgumentParser
解析,add_argument
方法中的 function
参数指定调用的回调函数,可在回调函数中启动爬虫
使用 -s
选项创建单个爬虫
1
| feapder create -s <spider_name>
|
可以选择四种爬虫模板
- AirSpider:轻量爬虫
- Spider:分布式爬虫
- TaskSpider:任务爬虫
- BatchSpider:批量爬虫
使用 -i
选项创建数据库表的映射对象
1
| feapder create -i <item_name>
|
可以选择四种 item 模板
- Item
- 字典 Item
- UpdateItem
- 字典 UpdateItem
其他命令行工具详见文档:命令行工具 - feapder官方文档
Request
Request 为 feapder 的下载器,基于 requests 进行了封装,因此支持 requests 的所有参数
常用属性
属性 |
描述 |
url |
待抓取 url |
retry_times |
当前重试次数 |
priority |
请求优先级,越小越优先,默认 300 |
parser_name |
回调函数所在的类名,默认为当前类 |
callback |
回调函数,可以是函数,也可是函数名 |
filter_repeat |
是否需要去重 |
auto_request |
是否需要自动请求下载网页,默认是 |
request_sync |
是否同步请求下载网页,默认异步 |
use_session |
是否使用 session 方式 |
download_midware |
下载中间件,默认为 parser 中的 download_midware |
render |
是否用浏览器渲染,对于动态加载页面,使用浏览器渲染后再获取源码 |
render_time |
渲染时长,即打开网页等待指定时间后再获取源码 |
method |
请求方式 |
params |
请求参数 |
data |
请求 body |
headers |
请求头 |
cookies |
字典或 CookieJar 对象 |
timeout |
等待服务器数据的超时限制 |
allow_redirects |
是否允许跟踪 POST/PUT/DELETE 方法的重定向 |
**kwargs |
自定义数据,可传递到 parse 方法中 |
发送请求
调用 get_response
方法获取响应,save_cached
参数指定是否将响应缓存到 Redis,需要在 setting.py
或在环境变量中设置 Redis
1 2 3 4 5 6 7
| def get_response(self, save_cached=False): """ 获取带有selector功能的response @param save_cached: 保存缓存,方便调试时不用每次都重新下载 @return: """ pass
|
获取缓存的响应
调用 get_response_from_cached
方法从缓存中获取响应,缓存同样依赖 redis,因此需要先配置好 redis 连接信息
1 2 3 4 5 6 7 8
| def get_response_from_cached(self, save_cached=True): """ 用于从缓存中取response 当缓存不存在时,会先下载,然后将响应存入缓存,之后再返回响应 @param save_cached: 保存缓存,方便调试时不用每次都重新下载 @return: """ pass
|
Response
Response 对 requests 返回的 response 进行了封装,因此支持 response 所有方法
响应解析
-
支持 xpath 选择器
1
| response.xpath("//a/@href")
|
-
支持 css 选择器
1
| response.css("a::attr(href)")
|
-
支持正则表达式
1
| response.re("<a.*?href='(.*?)'")
|
-
支持 BeautifulSoup
常用功能
AirSpider
AirSpider 是一款轻量爬虫,面对一些数据量较少,无需断点续爬,无需分布式采集的需求,可采用此爬虫
基本使用
基本实现及常用自定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import feapder
class AirSpiderTest(feapder.AirSpider): __custom_setting__ = dict( PROXY_EXTRACT_API="代理提取地址", ) def start_requests(self): yield feapder.Request("https://www.baidu.com") yield feapder.Request("url2", callback=self.parser_detail) yield feapder.Request("url3", download_midware=self.my_midware)
def parse(self, request, response): if response.status_code != 200: raise Exception("非法页面") print(response) def parse_detail(self, request, response): pass def download_midware(self, request): request.headers = {'User-Agent':"lalala"} return request def my_midware(self, request): return request def validate(self, request, response): pass
if __name__ == "__main__": AirSpiderTest(thread_count=10).start()
|
数据入库
AirSpider 不支持自动入库,需要借助数据库接口模块手动实现,框架内封装了 MysqlDB
、RedisDB
等模块,可以通过这些模块操作数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from feapder.db.mysqldb import MysqlDB
class AirSpiderTest(feapder.AirSpider): __custom_setting__ = dict( MYSQL_IP="localhost", MYSQL_PORT = 3306, MYSQL_DB = "feapder", MYSQL_USER_NAME = "feapder", MYSQL_USER_PASS = "feapder123" ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db = MysqlDB()
|
MysqlDB 操作方法文档:MysqlDB
Spider
Spider 是一款基于 redis 的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能
Spider 支持 AirSpider 的所有操作,此外支持数据自动入库
Spider 爬取的数据需要经过 item 封装,在 parse 方法中通过 yield
返回 item,数据库中会自动创建对应的表,将返回的 item 插入到表中
定义 item 如下
1 2 3 4 5 6 7 8 9 10 11 12
| from feapder import Item
class SpiderDataItem(Item): """ This class was generated by feapder. command: feapder create -i spider_data. """
def __init__(self, *args, **kwargs): self.title = None
|
Spider 实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import feapder from items import *
class TestSpider(feapder.Spider): def start_requests(self): yield feapder.Request("https://www.baidu.com") def parse(self, request, response): title = response.xpath("//title/text()").extract_first() item = spider_data_item.SpiderDataItem() item.title = title yield item
|
TaskSpider
TaskSpider 是一款分布式爬虫,内部封装了取种子任务的逻辑,内置支持从 redis 或者 mysql 获取任务,也可通过自定义实现从其他来源获取任务
种子表指的是一个初始 URL 列表,通常包含 id 和 url 两个字段,每个 url 就是种子,它们是爬虫开始爬取的起点,通过从这些种子 url 中获取内容,爬虫可以提取其他链接并继续递归抓取更多页面,直到完成预定的任务
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import feapder from feapder import ArgumentParser
class TaskSpiderTest(feapder.TaskSpider): __custom_setting__ = dict( REDISDB_IP_PORTS="localhost:6379", REDISDB_USER_PASS="", REDISDB_DB=0, MYSQL_IP="localhost", MYSQL_PORT=3306, MYSQL_DB="feapder", MYSQL_USER_NAME="feapder", MYSQL_USER_PASS="feapder123", )
def add_task(self): self._redisdb.zadd(self._task_table, {"id": 1, "url": "https://www.baidu.com"})
def start_requests(self, task): task_id, url = task yield feapder.Request(url, task_id=task_id)
def parse(self, request, response): print(response.xpath("//title/text()").extract_first()) print(response.xpath("//meta[@name='description']/@content").extract_first()) print("网站地址: ", response.url)
def start(args): """ 用mysql做种子表 """ spider = TaskSpiderTest( task_table="spider_task", task_keys=["id", "url"], redis_key="test:task_spider", keep_alive=True, ) if args == 1: spider.start_monitor_task() else: spider.start()
def start2(args): """ 用redis做种子表 """ spider = TaskSpiderTest( task_table="spider_task2", task_table_type="redis", redis_key="test:task_spider", keep_alive=True, use_mysql=False, ) if args == 1: spider.start_monitor_task() else: spider.start()
if __name__ == "__main__": parser = ArgumentParser(description="测试TaskSpider") parser.add_argument("--start", type=int, help="用mysql做种子表 (1|2)", function=start) parser.add_argument("--start2", type=int, help="用redis做种子表 (1|2)", function=start2) parser.start()
|
BatchSpider
BatchSpider 是一款分布式批次爬虫,对于需要周期性采集的数据,优先考虑使用本爬虫
使用 BatchSpider 时,种子表中需要包含 id、url、state 字段,state 字段有 4 种状态,分别是待抓取 (0)、抓取完毕 (1)、抓取中 (2)、抓取失败 (-1)
在爬取时,feapder 会分批下发状态为 0 的任务到 redis 任务队列,并更新状态为 2。当所有状态 0 任务都下发完毕且任务队列中没有任务,feapder 会检查表中是否还有状态 2 的任务,将其更新为状态 0,继续下发。当表中任务的状态只有 1 或 -1 时,爬取结束
状态 1 和 -1 的任务需要手动维护,当任务完成时需要更新状态为 1,当任务发生异常,需要更新状态为 -1,使用 update_task_batch
方法更新任务状态
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import feapder from feapder import ArgumentParser from items import *
class TestSpider(feapder.BatchSpider): def init_task(self): pass
def start_requests(self, task): id, url = task yield feapder.Request(url, task_id=id, render=True)
def parse(self, request, response): title = response.xpath('//title/text()').extract_first() item = spider_data_item.SpiderDataItem() item.title = title yield item yield self.update_task_batch(request.task_id, 1)
def exception_request(self, request, response): """ @summary: 请求或者parser里解析出异常的request @result: request / callback / None (返回值必须可迭代) """ yield request
def failed_request(self, request, response): """ @summary: 超过最大重试次数的request @result: request / item / callback / None (返回值必须可迭代) """ yield request yield self.update_task_batch(request.task_id, -1)
def crawl_test(args): spider = test_spider.TestSpider( redis_key="feapder:test_batch_spider", task_table="batch_spider_task", task_keys=["id", "url"], task_state="state", batch_record_table="batch_spider_record", batch_name="批次爬虫测试", batch_interval=7, )
if args == 1: spider.start_monitor_task() else: spider.start()
if __name__ == "__main__": parser = ArgumentParser(description="批次爬虫测试") parser.add_argument("--start", type=int, function=crawl_test) parser.start()
|
爬虫集成
feapder 支持将多个爬虫集成为一个爬虫,统一下发任务进行爬取
对于集成 Spider 爬虫,只需将继承的 Spider 类改为 BaseParser 类,BaseParser 类支持 Spider 类相同的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import feapder from items import *
class TestParser(feapder.BaseParser): def start_requests(self): yield feapder.Request("https://www.baidu.com") def parse(self, request, response): pass
spider = feapder.Spider(redis_key="feapder:test_spider_integration") spider.add_parser(TestParser) spider.start()
|
对于集成 BatchSpider,需要将继承类改为 BatchParser 类,BatchParser 类支持 BatchSpider 类相同的方法,除了 init_task
方法,所有的 BatchParser 都使用集成后 BatchSpider 的 init_task
方法
在种子表中,需要添加一个 parser_name
字段,指定该任务交由哪个解析器解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import feapder
class TencentNewsParser(feapder.BatchParser):
def start_requests(self, task): task_id = task[0] url = task[1] yield feapder.Request(url, task_id=task_id)
def parse(self, request, response): title = response.xpath("//title/text()").extract_first() print(self.name, title) yield self.update_task_batch(request.task_id, 1) spider = feapder.BatchSpider( task_table="batch_spider_integration_task", batch_record_table="batch_spider_record", batch_name="批次爬虫集成测试", batch_interval=7, task_keys=["id", "url", "parser_name"], redis_key="feapder:test_batch_spider_integration", task_state="state", ) spider.add_parser(SinaNewsParser) spider.add_parser(TencentNewsParser)
|