开始

feapder 是一款上手简单,功能强大的 Python 爬虫框架,内置 AirSpider、Spider、TaskSpider、BatchSpider 四种爬虫解决不同场景的需求,支持断点续爬、监控报警、浏览器渲染、海量数据去重等功能,更有功能强大的爬虫管理系统 feaplat 为其提供方便的部署及调度

官方文档:feapder官方文档

安装完整版 feapder

1
pip install "feapder[all]"

创建简单爬虫

1
feapder create -s my_spider

生成的简单爬虫实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import feapder


class MySpider(feapder.AirSpider):

# 发送请求方法
def start_requests(self):
yield feapder.Request("https://spidertools.cn")

# 解析响应方法
def parse(self, request, response):
# 提取网站title
print(response.xpath("//title/text()").extract_first())
# 提取网站描述
print(response.xpath("//meta[@name='description']/@content").extract_first())
print("网站地址: ", response.url)


if __name__ == "__main__":
# 构造对象后,调用start方法启动爬虫
MySpider().start()

创建爬虫

create 命令用于创建 feapder 项目,常用选项有 -p-s-i


使用 -p 选项创建一个爬虫项目

1
feapder create -p <project_name>

项目结构如下

1
2
3
4
5
6
7
8
9
10
11
my_spider
│ CHECK_DATA.md
│ main.py
│ README.md
│ setting.py

├─items
│ └─ __init__.py

└─spiders
└─ __init__.py
  • items:文件夹存放与数据库表映射的 item
  • spiders:文件夹存放爬虫脚本
  • main.py:运行入口
  • setting.py:爬虫配置文件

main.py 文件中实现命令行解析来调用不同的爬虫,使用 feapder 封装的 ArgumentParser 解析,add_argument 方法中的 function 参数指定调用的回调函数,可在回调函数中启动爬虫


使用 -s 选项创建单个爬虫

1
feapder create -s <spider_name>

可以选择四种爬虫模板

  • AirSpider:轻量爬虫
  • Spider:分布式爬虫
  • TaskSpider:任务爬虫
  • BatchSpider:批量爬虫

使用 -i 选项创建数据库表的映射对象

1
feapder create -i <item_name>

可以选择四种 item 模板

  • Item
  • 字典 Item
  • UpdateItem
  • 字典 UpdateItem

其他命令行工具详见文档:命令行工具 - feapder官方文档

Request

Request 为 feapder 的下载器,基于 requests 进行了封装,因此支持 requests 的所有参数

常用属性

属性 描述
url 待抓取 url
retry_times 当前重试次数
priority 请求优先级,越小越优先,默认 300
parser_name 回调函数所在的类名,默认为当前类
callback 回调函数,可以是函数,也可是函数名
filter_repeat 是否需要去重
auto_request 是否需要自动请求下载网页,默认是
request_sync 是否同步请求下载网页,默认异步
use_session 是否使用 session 方式
download_midware 下载中间件,默认为 parser 中的 download_midware
render 是否用浏览器渲染,对于动态加载页面,使用浏览器渲染后再获取源码
render_time 渲染时长,即打开网页等待指定时间后再获取源码
method 请求方式
params 请求参数
data 请求 body
headers 请求头
cookies 字典或 CookieJar 对象
timeout 等待服务器数据的超时限制
allow_redirects 是否允许跟踪 POST/PUT/DELETE 方法的重定向
**kwargs 自定义数据,可传递到 parse 方法中

发送请求

调用 get_response 方法获取响应,save_cached 参数指定是否将响应缓存到 Redis,需要在 setting.py 或在环境变量中设置 Redis

1
2
3
4
5
6
7
def get_response(self, save_cached=False):
"""
获取带有selector功能的response
@param save_cached: 保存缓存,方便调试时不用每次都重新下载
@return:
"""
pass

获取缓存的响应

调用 get_response_from_cached 方法从缓存中获取响应,缓存同样依赖 redis,因此需要先配置好 redis 连接信息

1
2
3
4
5
6
7
8
def get_response_from_cached(self, save_cached=True):
"""
用于从缓存中取response
当缓存不存在时,会先下载,然后将响应存入缓存,之后再返回响应
@param save_cached: 保存缓存,方便调试时不用每次都重新下载
@return:
"""
pass

Response

Response 对 requests 返回的 response 进行了封装,因此支持 response 所有方法

响应解析

  • 支持 xpath 选择器

    1
    response.xpath("//a/@href")
  • 支持 css 选择器

    1
    response.css("a::attr(href)")
  • 支持正则表达式

    1
    response.re("<a.*?href='(.*?)'")
  • 支持 BeautifulSoup

    1
    response.bs4().title

常用功能

  • 获取响应源码

    1
    response.text
  • 获取 json 数据

    1
    response.json
  • 查看下载内容:打开浏览器,渲染下载内容

    1
    response.open()
  • requests.Response 转换为 feapder.Response

    1
    response = feapder.Response(response)
  • 序列化

    1
    response_dict = response.to_dict
  • 反序列化

    1
    feapder.Response.from_dict(response_dict)

AirSpider

AirSpider 是一款轻量爬虫,面对一些数据量较少,无需断点续爬,无需分布式采集的需求,可采用此爬虫

基本使用

基本实现及常用自定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import feapder


class AirSpiderTest(feapder.AirSpider):

# 爬虫自定义配置,仅对当前爬虫有效,优先级大于配置文件
__custom_setting__ = dict(
PROXY_EXTRACT_API="代理提取地址",
)

# 分发请求任务函数
def start_requests(self):
yield feapder.Request("https://www.baidu.com")
# 设置自定义解析函数
yield feapder.Request("url2", callback=self.parser_detail)
# 设置自定义下载中间件
yield feapder.Request("url3", download_midware=self.my_midware)

# 默认响应解析函数
def parse(self, request, response):
# 抛出异常即可自动重试
if response.status_code != 200:
raise Exception("非法页面")
# 在parse方法中使用yield返回Request,实现递归爬取
print(response)

# 自定义解析函数
def parse_detail(self, request, response):
pass

# 默认下载中间件,在parse函数之前调用
def download_midware(self, request):
request.headers = {'User-Agent':"lalala"}
return request

# 自定义下载中间件
def my_midware(self, request):
return request

# 校验函数, 可用于校验response是否正确
# 若函数内抛出异常,则重试请求
# 若返回True 或 None,则进入解析函数
# 若返回False,则抛弃当前请求
# 可通过request.callback_name 区分不同的回调函数,编写不同的校验逻辑
def validate(self, request, response):
pass


if __name__ == "__main__":
# 使用多线程
AirSpiderTest(thread_count=10).start()

数据入库

AirSpider 不支持自动入库,需要借助数据库接口模块手动实现,框架内封装了 MysqlDBRedisDB 等模块,可以通过这些模块操作数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from feapder.db.mysqldb import MysqlDB

class AirSpiderTest(feapder.AirSpider):
__custom_setting__ = dict(
MYSQL_IP="localhost",
MYSQL_PORT = 3306,
MYSQL_DB = "feapder",
MYSQL_USER_NAME = "feapder",
MYSQL_USER_PASS = "feapder123"
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db = MysqlDB()
# MysqlDB操作方法详见官方文档

MysqlDB 操作方法文档:MysqlDB

Spider

Spider 是一款基于 redis 的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能

Spider 支持 AirSpider 的所有操作,此外支持数据自动入库

Spider 爬取的数据需要经过 item 封装,在 parse 方法中通过 yield 返回 item,数据库中会自动创建对应的表,将返回的 item 插入到表中

定义 item 如下

1
2
3
4
5
6
7
8
9
10
11
12
from feapder import Item


class SpiderDataItem(Item):
"""
This class was generated by feapder.
command: feapder create -i spider_data.
"""

def __init__(self, *args, **kwargs):
# self.id = None # type : int(10) unsigned | allow_null : NO | key : PRI | default_value : None | extra : auto_increment | column_comment :
self.title = None # type : varchar(255) | allow_null : YES | key : | default_value : None | extra : | column_comment :

Spider 实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *


class TestSpider(feapder.Spider):
def start_requests(self):
yield feapder.Request("https://www.baidu.com")

def parse(self, request, response):
title = response.xpath("//title/text()").extract_first()
item = spider_data_item.SpiderDataItem()
item.title = title
# 在parse中使用yield返回item,实现批量入库
yield item

TaskSpider

TaskSpider 是一款分布式爬虫,内部封装了取种子任务的逻辑,内置支持从 redis 或者 mysql 获取任务,也可通过自定义实现从其他来源获取任务

种子表指的是一个初始 URL 列表,通常包含 id 和 url 两个字段,每个 url 就是种子,它们是爬虫开始爬取的起点,通过从这些种子 url 中获取内容,爬虫可以提取其他链接并继续递归抓取更多页面,直到完成预定的任务

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import feapder
from feapder import ArgumentParser


class TaskSpiderTest(feapder.TaskSpider):
# 自定义数据库,若项目中有setting.py文件,此自定义可删除
# redis必选,mysql可选
__custom_setting__ = dict(
REDISDB_IP_PORTS="localhost:6379",
REDISDB_USER_PASS="",
REDISDB_DB=0,
MYSQL_IP="localhost",
MYSQL_PORT=3306,
MYSQL_DB="feapder",
MYSQL_USER_NAME="feapder",
MYSQL_USER_PASS="feapder123",
)

def add_task(self):
# 添加种子任务:在调用start_monitor_task()时会调用这个函数用于初始化任务,即数据库中添加任务的url
self._redisdb.zadd(self._task_table, {"id": 1, "url": "https://www.baidu.com"})

def start_requests(self, task):
task_id, url = task
yield feapder.Request(url, task_id=task_id)

def parse(self, request, response):
print(response.xpath("//title/text()").extract_first())
print(response.xpath("//meta[@name='description']/@content").extract_first())
print("网站地址: ", response.url)

# mysql 需要更新任务状态为做完 即 state=1
# yield self.update_task_batch(request.task_id)

def start(args):
"""
用mysql做种子表
"""
spider = TaskSpiderTest(
task_table="spider_task", # 任务表名
task_keys=["id", "url"], # 表里查询的字段
redis_key="test:task_spider", # redis里做任务队列的key
keep_alive=True, # 是否常驻,适用于master/work模式,任务完成后爬虫不会退出,继续等待任务
)
if args == 1:
spider.start_monitor_task()
else:
spider.start()


def start2(args):
"""
用redis做种子表
"""
spider = TaskSpiderTest(
task_table="spider_task2", # 任务表名
task_table_type="redis", # 任务表类型为redis
redis_key="test:task_spider", # redis里做任务队列的key
keep_alive=True, # 是否常驻
use_mysql=False, # 若用不到mysql,可以不使用
)
# TaskSpider的爬取分为两步,分别是master和work
# master负责初始化任务,监控批次进度,下发批次等,通过start_monitor_task()启动
# work负责实际地爬取,通过start()启动
if args == 1:
spider.start_monitor_task()
else:
spider.start()


if __name__ == "__main__":
parser = ArgumentParser(description="测试TaskSpider")
parser.add_argument("--start", type=int, help="用mysql做种子表 (1|2)", function=start)
parser.add_argument("--start2", type=int, help="用redis做种子表 (1|2)", function=start2)
parser.start()

# 下发任务 python3 task_spider_test.py --start 1
# 采集 python3 task_spider_test.py --start 2

BatchSpider

BatchSpider 是一款分布式批次爬虫,对于需要周期性采集的数据,优先考虑使用本爬虫

使用 BatchSpider 时,种子表中需要包含 id、url、state 字段,state 字段有 4 种状态,分别是待抓取 (0)、抓取完毕 (1)、抓取中 (2)、抓取失败 (-1)

在爬取时,feapder 会分批下发状态为 0 的任务到 redis 任务队列,并更新状态为 2。当所有状态 0 任务都下发完毕且任务队列中没有任务,feapder 会检查表中是否还有状态 2 的任务,将其更新为状态 0,继续下发。当表中任务的状态只有 1 或 -1 时,爬取结束

状态 1 和 -1 的任务需要手动维护,当任务完成时需要更新状态为 1,当任务发生异常,需要更新状态为 -1,使用 update_task_batch 方法更新任务状态

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import feapder
from feapder import ArgumentParser
from items import *

class TestSpider(feapder.BatchSpider):

# 该方法用于重置任务,每个批次开始时,默认重置非状态-1的任务为状态0
# 将该方法重写置空,可以实现增量爬取
def init_task(self):
pass

def start_requests(self, task):
# task为任务表中取出的一条任务
id, url = task # id、url为所取的字段,在main函数中指定
# id, url = task.id, task.url
yield feapder.Request(url, task_id=id, render=True) # task_id为任务id,用于更新任务状态

def parse(self, request, response):
title = response.xpath('//title/text()').extract_first() # 取标题
item = spider_data_item.SpiderDataItem() # 声明一个item
item.title = title # 给item属性赋值
yield item # 返回item, item会自动批量入库
yield self.update_task_batch(request.task_id, 1) # 更新任务状态为1

def exception_request(self, request, response):
"""
@summary: 请求或者parser里解析出异常的request
@result: request / callback / None (返回值必须可迭代)
"""
yield request

def failed_request(self, request, response):
"""
@summary: 超过最大重试次数的request
@result: request / item / callback / None (返回值必须可迭代)
"""
yield request
yield self.update_task_batch(request.task_id, -1) # 更新任务状态为-1

def crawl_test(args):
spider = test_spider.TestSpider(
redis_key="feapder:test_batch_spider",
task_table="batch_spider_task", # mysql中的任务表
task_keys=["id", "url"], # 需要获取任务表里的字段名,可添加多个
task_state="state", # mysql中任务状态字段
batch_record_table="batch_spider_record", # mysql中的批次记录表,feapder自动创建
batch_name="批次爬虫测试", # 批次名字
batch_interval=7, # 批次周期,以天为单位,若以小时为单位,可写1 / 24
)

if args == 1:
spider.start_monitor_task() # 下发及监控任务
else:
spider.start() # 采集

if __name__ == "__main__":
parser = ArgumentParser(description="批次爬虫测试")
parser.add_argument("--start", type=int, function=crawl_test)
parser.start()

爬虫集成

feapder 支持将多个爬虫集成为一个爬虫,统一下发任务进行爬取

对于集成 Spider 爬虫,只需将继承的 Spider 类改为 BaseParser 类,BaseParser 类支持 Spider 类相同的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *

# 将feapder.Spider改为feapder.BaseParser
class TestParser(feapder.BaseParser):
def start_requests(self):
yield feapder.Request("https://www.baidu.com")

def parse(self, request, response):
pass

spider = feapder.Spider(redis_key="feapder:test_spider_integration")
spider.add_parser(TestParser) # 注意此处传入类名
spider.start()

对于集成 BatchSpider,需要将继承类改为 BatchParser 类,BatchParser 类支持 BatchSpider 类相同的方法,除了 init_task 方法,所有的 BatchParser 都使用集成后 BatchSpider 的 init_task 方法

在种子表中,需要添加一个 parser_name 字段,指定该任务交由哪个解析器解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import feapder

# 将feapder.BatchSpider改为feapder.BatchParser
class TencentNewsParser(feapder.BatchParser):

def start_requests(self, task):
task_id = task[0]
url = task[1]
yield feapder.Request(url, task_id=task_id)

def parse(self, request, response):
title = response.xpath("//title/text()").extract_first()
print(self.name, title)
yield self.update_task_batch(request.task_id, 1)

spider = feapder.BatchSpider(
task_table="batch_spider_integration_task", # mysql中的任务表
batch_record_table="batch_spider_record", # mysql中的批次记录表
batch_name="批次爬虫集成测试", # 批次名字
batch_interval=7, # 批次时间,以天为单位,若以小时为单位,可写1 / 24
task_keys=["id", "url", "parser_name"], # 集成BatchSpider,需要将BatchSpider的名字取出
redis_key="feapder:test_batch_spider_integration",
task_state="state", # mysql中任务状态字段
)
spider.add_parser(SinaNewsParser)
spider.add_parser(TencentNewsParser)
# spider.start_monitor_task()
# spider.start()