feapder基础
开始
feapder是一款上手简单,功能强大的Python爬虫框架,内置AirSpider、Spider、TaskSpider、BatchSpider四种爬虫解决不同场景的需求,支持断点续爬、监控报警、浏览器渲染、海量数据去重等功能,更有功能强大的爬虫管理系统feaplat为其提供方便的部署及调度
官方文档:feapder官方文档
安装完整版feapder
1
pip install "feapder[all]"
创建简单爬虫
1
feapder create -s my_spider
生成的简单爬虫实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import feapder
class MySpider(feapder.AirSpider):
# 发送请求方法
def start_requests(self):
yield feapder.Request("https://spidertools.cn")
# 解析响应方法
def parse(self, request, response):
# 提取网站title
print(response.xpath("//title/text()").extract_first())
# 提取网站描述
print(response.xpath("//meta[@name='description']/@content").extract_first())
print("网站地址: ", response.url)
if __name__ == "__main__":
# 构造对象后,调用start方法启动爬虫
MySpider().start()
创建爬虫
create命令用于创建feapder项目,常用选项有-p
、-s
、-i
使用-p
选项创建一个爬虫项目
1
feapder create -p <project_name>
项目结构如下
1
2
3
4
5
6
7
8
9
10
11
my_spider
│ CHECK_DATA.md
│ main.py
│ README.md
│ setting.py
│
├─items
│ └─ __init__.py
│
└─spiders
└─ __init__.py
- items:文件夹存放与数据库表映射的item
- spiders:文件夹存放爬虫脚本
- main.py:运行入口
- setting.py:爬虫配置文件
main.py文件中实现命令行解析来调用不同的爬虫,使用feapder封装的ArgumentParser
解析,add_argument
方法中的function
参数指定调用的回调函数,可在回调函数中启动爬虫
使用-s
选项创建单个爬虫
1
feapder create -s <spider_name>
可以选择四种爬虫模板
- AirSpider:轻量爬虫
- Spider:分布式爬虫
- TaskSpider:任务爬虫
- BatchSpider:批量爬虫
使用-i
选项创建数据库表的映射对象
1
feapder create -i <item_name>
可以选择四种item模板
- Item
- 字典Item
- UpdateItem
- 字典UpdateItem
其他命令行工具详见文档:命令行工具 - feapder官方文档
Request
Request为feapder的下载器,基于requests进行了封装,因此支持requests的所有参数
常用属性
属性 | 描述 |
---|---|
url | 待抓取url |
retry_times | 当前重试次数 |
priority | 请求优先级,越小越优先,默认300 |
parser_name | 回调函数所在的类名,默认为当前类 |
callback | 回调函数,可以是函数,也可是函数名 |
filter_repeat | 是否需要去重 |
auto_request | 是否需要自动请求下载网页,默认是 |
request_sync | 是否同步请求下载网页,默认异步 |
use_session | 是否使用session方式 |
download_midware | 下载中间件,默认为parser中的download_midware |
render | 是否用浏览器渲染,对于动态加载页面,使用浏览器渲染后再获取源码 |
render_time | 渲染时长,即打开网页等待指定时间后再获取源码 |
method | 请求方式 |
params | 请求参数 |
data | 请求body |
headers | 请求头 |
cookies | 字典或CookieJar对象 |
timeout | 等待服务器数据的超时限制 |
allow_redirects | 是否允许跟踪POST/PUT/DELETE方法的重定向 |
**kwargs | 自定义数据,可传递到parse方法中 |
发送请求
调用get_response
方法获取响应,save_cached
参数指定是否将响应缓存到Redis,需要在setting.py
或在环境变量中设置Redis
1
2
3
4
5
6
7
def get_response(self, save_cached=False):
"""
获取带有selector功能的response
@param save_cached: 保存缓存,方便调试时不用每次都重新下载
@return:
"""
pass
获取缓存的响应
调用get_response_from_cached
方法从缓存中获取响应,缓存同样依赖redis,因此需要先配置好redis连接信息
1
2
3
4
5
6
7
8
def get_response_from_cached(self, save_cached=True):
"""
用于从缓存中取response
当缓存不存在时,会先下载,然后将响应存入缓存,之后再返回响应
@param save_cached: 保存缓存,方便调试时不用每次都重新下载
@return:
"""
pass
Response
Response对requests返回的response进行了封装,因此支持response所有方法
响应解析
支持xpath选择器
1
response.xpath("//a/@href")
支持css选择器
1
response.css("a::attr(href)")
支持正则表达式
1
response.re("<a.*?href='(.*?)'")
支持BeautifulSoup
1
response.bs4().title
常用功能
获取响应源码
1
response.text
获取json数据
1
response.json
查看下载内容:打开浏览器,渲染下载内容
1
response.open()
将
requests.Response
转换为feapder.Response
1
response = feapder.Response(response)
序列化
1
response_dict = response.to_dict
反序列化
1
feapder.Response.from_dict(response_dict)
AirSpider
AirSpider是一款轻量爬虫,面对一些数据量较少,无需断点续爬,无需分布式采集的需求,可采用此爬虫
基本使用
基本实现及常用自定义如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import feapder
class AirSpiderTest(feapder.AirSpider):
# 爬虫自定义配置,仅对当前爬虫有效,优先级大于配置文件
__custom_setting__ = dict(
PROXY_EXTRACT_API="代理提取地址",
)
# 分发请求任务函数
def start_requests(self):
yield feapder.Request("https://www.baidu.com")
# 设置自定义解析函数
yield feapder.Request("url2", callback=self.parser_detail)
# 设置自定义下载中间件
yield feapder.Request("url3", download_midware=self.my_midware)
# 默认响应解析函数
def parse(self, request, response):
# 抛出异常即可自动重试
if response.status_code != 200:
raise Exception("非法页面")
# 在parse方法中使用yield返回Request,实现递归爬取
print(response)
# 自定义解析函数
def parse_detail(self, request, response):
pass
# 默认下载中间件,在parse函数之前调用
def download_midware(self, request):
request.headers = {'User-Agent':"lalala"}
return request
# 自定义下载中间件
def my_midware(self, request):
return request
# 校验函数, 可用于校验response是否正确
# 若函数内抛出异常,则重试请求
# 若返回True 或 None,则进入解析函数
# 若返回False,则抛弃当前请求
# 可通过request.callback_name 区分不同的回调函数,编写不同的校验逻辑
def validate(self, request, response):
pass
if __name__ == "__main__":
# 使用多线程
AirSpiderTest(thread_count=10).start()
数据入库
AirSpider不支持自动入库,需要借助数据库接口模块手动实现,框架内封装了MysqlDB
、RedisDB
等模块,可以通过这些模块操作数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from feapder.db.mysqldb import MysqlDB
class AirSpiderTest(feapder.AirSpider):
__custom_setting__ = dict(
MYSQL_IP="localhost",
MYSQL_PORT = 3306,
MYSQL_DB = "feapder",
MYSQL_USER_NAME = "feapder",
MYSQL_USER_PASS = "feapder123"
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = MysqlDB()
# MysqlDB操作方法详见官方文档
MysqlDB操作方法文档:MysqlDB
Spider
Spider是一款基于redis的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能
Spider支持AirSpider的所有操作,此外支持数据自动入库
Spider爬取的数据需要经过item封装,在parse方法中通过yield
返回item,数据库中会自动创建对应的表,将返回的item插入到表中
定义item如下
1
2
3
4
5
6
7
8
9
10
11
12
from feapder import Item
class SpiderDataItem(Item):
"""
This class was generated by feapder.
command: feapder create -i spider_data.
"""
def __init__(self, *args, **kwargs):
# self.id = None # type : int(10) unsigned | allow_null : NO | key : PRI | default_value : None | extra : auto_increment | column_comment :
self.title = None # type : varchar(255) | allow_null : YES | key : | default_value : None | extra : | column_comment :
Spider实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *
class TestSpider(feapder.Spider):
def start_requests(self):
yield feapder.Request("https://www.baidu.com")
def parse(self, request, response):
title = response.xpath("//title/text()").extract_first()
item = spider_data_item.SpiderDataItem()
item.title = title
# 在parse中使用yield返回item,实现批量入库
yield item
TaskSpider
TaskSpider是一款分布式爬虫,内部封装了取种子任务的逻辑,内置支持从redis或者mysql获取任务,也可通过自定义实现从其他来源获取任务
种子表指的是一个初始URL列表,通常包含id和url两个字段,每个url就是种子,它们是爬虫开始爬取的起点,通过从这些种子url中获取内容,爬虫可以提取其他链接并继续递归抓取更多页面,直到完成预定的任务
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import feapder
from feapder import ArgumentParser
class TaskSpiderTest(feapder.TaskSpider):
# 自定义数据库,若项目中有setting.py文件,此自定义可删除
# redis必选,mysql可选
__custom_setting__ = dict(
REDISDB_IP_PORTS="localhost:6379",
REDISDB_USER_PASS="",
REDISDB_DB=0,
MYSQL_IP="localhost",
MYSQL_PORT=3306,
MYSQL_DB="feapder",
MYSQL_USER_NAME="feapder",
MYSQL_USER_PASS="feapder123",
)
def add_task(self):
# 添加种子任务:在调用start_monitor_task()时会调用这个函数用于初始化任务,即数据库中添加任务的url
self._redisdb.zadd(self._task_table, {"id": 1, "url": "https://www.baidu.com"})
def start_requests(self, task):
task_id, url = task
yield feapder.Request(url, task_id=task_id)
def parse(self, request, response):
print(response.xpath("//title/text()").extract_first())
print(response.xpath("//meta[@name='description']/@content").extract_first())
print("网站地址: ", response.url)
# mysql 需要更新任务状态为做完 即 state=1
# yield self.update_task_batch(request.task_id)
def start(args):
"""
用mysql做种子表
"""
spider = TaskSpiderTest(
task_table="spider_task", # 任务表名
task_keys=["id", "url"], # 表里查询的字段
redis_key="test:task_spider", # redis里做任务队列的key
keep_alive=True, # 是否常驻,适用于master/work模式,任务完成后爬虫不会退出,继续等待任务
)
if args == 1:
spider.start_monitor_task()
else:
spider.start()
def start2(args):
"""
用redis做种子表
"""
spider = TaskSpiderTest(
task_table="spider_task2", # 任务表名
task_table_type="redis", # 任务表类型为redis
redis_key="test:task_spider", # redis里做任务队列的key
keep_alive=True, # 是否常驻
use_mysql=False, # 若用不到mysql,可以不使用
)
# TaskSpider的爬取分为两步,分别是master和work
# master负责初始化任务,监控批次进度,下发批次等,通过start_monitor_task()启动
# work负责实际地爬取,通过start()启动
if args == 1:
spider.start_monitor_task()
else:
spider.start()
if __name__ == "__main__":
parser = ArgumentParser(description="测试TaskSpider")
parser.add_argument("--start", type=int, help="用mysql做种子表 (1|2)", function=start)
parser.add_argument("--start2", type=int, help="用redis做种子表 (1|2)", function=start2)
parser.start()
# 下发任务 python3 task_spider_test.py --start 1
# 采集 python3 task_spider_test.py --start 2
BatchSpider
BatchSpider是一款分布式批次爬虫,对于需要周期性采集的数据,优先考虑使用本爬虫
使用BatchSpider时,种子表中需要包含id、url、state字段,state字段有4种状态,分别是待抓取(0)、抓取完毕(1)、抓取中(2)、抓取失败(-1)
在爬取时,feapder会分批下发状态为0的任务到redis任务队列,并更新状态为2。当所有状态0任务都下发完毕且任务队列中没有任务,feapder会检查表中是否还有状态2的任务,将其更新为状态0,继续下发。当表中任务的状态只有1或-1时,爬取结束
状态1和-1的任务需要手动维护,当任务完成时需要更新状态为1,当任务发生异常,需要更新状态为-1,使用update_task_batch
方法更新任务状态
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import feapder
from feapder import ArgumentParser
from items import *
class TestSpider(feapder.BatchSpider):
# 该方法用于重置任务,每个批次开始时,默认重置非状态-1的任务为状态0
# 将该方法重写置空,可以实现增量爬取
def init_task(self):
pass
def start_requests(self, task):
# task为任务表中取出的一条任务
id, url = task # id、url为所取的字段,在main函数中指定
# id, url = task.id, task.url
yield feapder.Request(url, task_id=id, render=True) # task_id为任务id,用于更新任务状态
def parse(self, request, response):
title = response.xpath('//title/text()').extract_first() # 取标题
item = spider_data_item.SpiderDataItem() # 声明一个item
item.title = title # 给item属性赋值
yield item # 返回item, item会自动批量入库
yield self.update_task_batch(request.task_id, 1) # 更新任务状态为1
def exception_request(self, request, response):
"""
@summary: 请求或者parser里解析出异常的request
@result: request / callback / None (返回值必须可迭代)
"""
yield request
def failed_request(self, request, response):
"""
@summary: 超过最大重试次数的request
@result: request / item / callback / None (返回值必须可迭代)
"""
yield request
yield self.update_task_batch(request.task_id, -1) # 更新任务状态为-1
def crawl_test(args):
spider = test_spider.TestSpider(
redis_key="feapder:test_batch_spider",
task_table="batch_spider_task", # mysql中的任务表
task_keys=["id", "url"], # 需要获取任务表里的字段名,可添加多个
task_state="state", # mysql中任务状态字段
batch_record_table="batch_spider_record", # mysql中的批次记录表,feapder自动创建
batch_name="批次爬虫测试", # 批次名字
batch_interval=7, # 批次周期,以天为单位,若以小时为单位,可写1 / 24
)
if args == 1:
spider.start_monitor_task() # 下发及监控任务
else:
spider.start() # 采集
if __name__ == "__main__":
parser = ArgumentParser(description="批次爬虫测试")
parser.add_argument("--start", type=int, function=crawl_test)
parser.start()
爬虫集成
feapder支持将多个爬虫集成为一个爬虫,统一下发任务进行爬取
对于集成Spider爬虫,只需将继承的Spider类改为BaseParser类,BaseParser类支持Spider类相同的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *
# 将feapder.Spider改为feapder.BaseParser
class TestParser(feapder.BaseParser):
def start_requests(self):
yield feapder.Request("https://www.baidu.com")
def parse(self, request, response):
pass
spider = feapder.Spider(redis_key="feapder:test_spider_integration")
spider.add_parser(TestParser) # 注意此处传入类名
spider.start()
对于集成BatchSpider,需要将继承类改为BatchParser类,BatchParser类支持BatchSpider类相同的方法,除了init_task
方法,所有的BatchParser都使用集成后BatchSpider的init_task
方法
在种子表中,需要添加一个parser_name
字段,指定该任务交由哪个解析器解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import feapder
# 将feapder.BatchSpider改为feapder.BatchParser
class TencentNewsParser(feapder.BatchParser):
def start_requests(self, task):
task_id = task[0]
url = task[1]
yield feapder.Request(url, task_id=task_id)
def parse(self, request, response):
title = response.xpath("//title/text()").extract_first()
print(self.name, title)
yield self.update_task_batch(request.task_id, 1)
spider = feapder.BatchSpider(
task_table="batch_spider_integration_task", # mysql中的任务表
batch_record_table="batch_spider_record", # mysql中的批次记录表
batch_name="批次爬虫集成测试", # 批次名字
batch_interval=7, # 批次时间,以天为单位,若以小时为单位,可写1 / 24
task_keys=["id", "url", "parser_name"], # 集成BatchSpider,需要将BatchSpider的名字取出
redis_key="feapder:test_batch_spider_integration",
task_state="state", # mysql中任务状态字段
)
spider.add_parser(SinaNewsParser)
spider.add_parser(TencentNewsParser)
# spider.start_monitor_task()
# spider.start()