文章

feapder基础

开始

feapder是一款上手简单,功能强大的Python爬虫框架,内置AirSpider、Spider、TaskSpider、BatchSpider四种爬虫解决不同场景的需求,支持断点续爬、监控报警、浏览器渲染、海量数据去重等功能,更有功能强大的爬虫管理系统feaplat为其提供方便的部署及调度

官方文档:feapder官方文档

安装完整版feapder

1
pip install "feapder[all]"

创建简单爬虫

1
feapder create -s my_spider

生成的简单爬虫实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import feapder


class MySpider(feapder.AirSpider):
    
    # 发送请求方法
    def start_requests(self):
        yield feapder.Request("https://spidertools.cn")

    # 解析响应方法
    def parse(self, request, response):
        # 提取网站title
        print(response.xpath("//title/text()").extract_first())
        # 提取网站描述
        print(response.xpath("//meta[@name='description']/@content").extract_first())
        print("网站地址: ", response.url)


if __name__ == "__main__":
    # 构造对象后,调用start方法启动爬虫
    MySpider().start()

创建爬虫

create命令用于创建feapder项目,常用选项有-p-s-i


使用-p选项创建一个爬虫项目

1
feapder create -p <project_name>

项目结构如下

1
2
3
4
5
6
7
8
9
10
11
my_spider
  │  CHECK_DATA.md
  │  main.py
  │  README.md
  │  setting.py
  │
  ├─items
  │  └─ __init__.py
  │
  └─spiders
     └─ __init__.py
  • items:文件夹存放与数据库表映射的item
  • spiders:文件夹存放爬虫脚本
  • main.py:运行入口
  • setting.py:爬虫配置文件

main.py文件中实现命令行解析来调用不同的爬虫,使用feapder封装的ArgumentParser解析,add_argument方法中的function参数指定调用的回调函数,可在回调函数中启动爬虫


使用-s选项创建单个爬虫

1
feapder create -s <spider_name>

可以选择四种爬虫模板

  • AirSpider:轻量爬虫
  • Spider:分布式爬虫
  • TaskSpider:任务爬虫
  • BatchSpider:批量爬虫

使用-i选项创建数据库表的映射对象

1
feapder create -i <item_name>

可以选择四种item模板

  • Item
  • 字典Item
  • UpdateItem
  • 字典UpdateItem

其他命令行工具详见文档:命令行工具 - feapder官方文档

Request

Request为feapder的下载器,基于requests进行了封装,因此支持requests的所有参数

常用属性

属性描述
url待抓取url
retry_times当前重试次数
priority请求优先级,越小越优先,默认300
parser_name回调函数所在的类名,默认为当前类
callback回调函数,可以是函数,也可是函数名
filter_repeat是否需要去重
auto_request是否需要自动请求下载网页,默认是
request_sync是否同步请求下载网页,默认异步
use_session是否使用session方式
download_midware下载中间件,默认为parser中的download_midware
render是否用浏览器渲染,对于动态加载页面,使用浏览器渲染后再获取源码
render_time渲染时长,即打开网页等待指定时间后再获取源码
method请求方式
params请求参数
data请求body
headers请求头
cookies字典或CookieJar对象
timeout等待服务器数据的超时限制
allow_redirects是否允许跟踪POST/PUT/DELETE方法的重定向
**kwargs自定义数据,可传递到parse方法中

发送请求

调用get_response方法获取响应,save_cached参数指定是否将响应缓存到Redis,需要在setting.py或在环境变量中设置Redis

1
2
3
4
5
6
7
def get_response(self, save_cached=False):
    """
    获取带有selector功能的response
    @param save_cached: 保存缓存,方便调试时不用每次都重新下载
    @return:
    """
    pass

获取缓存的响应

调用get_response_from_cached方法从缓存中获取响应,缓存同样依赖redis,因此需要先配置好redis连接信息

1
2
3
4
5
6
7
8
def get_response_from_cached(self, save_cached=True):
    """
    用于从缓存中取response
    当缓存不存在时,会先下载,然后将响应存入缓存,之后再返回响应
    @param save_cached: 保存缓存,方便调试时不用每次都重新下载
    @return:
    """
    pass

Response

Response对requests返回的response进行了封装,因此支持response所有方法

响应解析

  • 支持xpath选择器

    1
    
    response.xpath("//a/@href")
    
  • 支持css选择器

    1
    
    response.css("a::attr(href)")
    
  • 支持正则表达式

    1
    
    response.re("<a.*?href='(.*?)'")
    
  • 支持BeautifulSoup

    1
    
    response.bs4().title
    

常用功能

  • 获取响应源码

    1
    
    response.text
    
  • 获取json数据

    1
    
    response.json
    
  • 查看下载内容:打开浏览器,渲染下载内容

    1
    
    response.open()
    
  • requests.Response转换为feapder.Response

    1
    
    response = feapder.Response(response)
    
  • 序列化

    1
    
    response_dict = response.to_dict
    
  • 反序列化

    1
    
    feapder.Response.from_dict(response_dict)
    

AirSpider

AirSpider是一款轻量爬虫,面对一些数据量较少,无需断点续爬,无需分布式采集的需求,可采用此爬虫

基本使用

基本实现及常用自定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import feapder


class AirSpiderTest(feapder.AirSpider):
    
    # 爬虫自定义配置,仅对当前爬虫有效,优先级大于配置文件
    __custom_setting__ = dict(
        PROXY_EXTRACT_API="代理提取地址",
    )
    
    # 分发请求任务函数
    def start_requests(self):
        yield feapder.Request("https://www.baidu.com")
        # 设置自定义解析函数
        yield feapder.Request("url2", callback=self.parser_detail)
        # 设置自定义下载中间件
        yield feapder.Request("url3", download_midware=self.my_midware)

    # 默认响应解析函数
    def parse(self, request, response):
        # 抛出异常即可自动重试
        if response.status_code != 200:
        	raise Exception("非法页面")
        # 在parse方法中使用yield返回Request,实现递归爬取
        print(response)
        
    # 自定义解析函数
    def parse_detail(self, request, response):
        pass
    
    # 默认下载中间件,在parse函数之前调用
    def download_midware(self, request):
        request.headers = {'User-Agent':"lalala"}
        return request
    
    # 自定义下载中间件
    def my_midware(self, request):
        return request
    
    # 校验函数, 可用于校验response是否正确
    # 若函数内抛出异常,则重试请求
    # 若返回True 或 None,则进入解析函数
    # 若返回False,则抛弃当前请求
    # 可通过request.callback_name 区分不同的回调函数,编写不同的校验逻辑
    def validate(self, request, response):
        pass


if __name__ == "__main__":
    # 使用多线程
    AirSpiderTest(thread_count=10).start()

数据入库

AirSpider不支持自动入库,需要借助数据库接口模块手动实现,框架内封装了MysqlDBRedisDB等模块,可以通过这些模块操作数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from feapder.db.mysqldb import MysqlDB

class AirSpiderTest(feapder.AirSpider):
    __custom_setting__ = dict(
        MYSQL_IP="localhost",
        MYSQL_PORT = 3306,
        MYSQL_DB = "feapder",
        MYSQL_USER_NAME = "feapder",
        MYSQL_USER_PASS = "feapder123"
    )
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.db = MysqlDB()
    # MysqlDB操作方法详见官方文档

MysqlDB操作方法文档:MysqlDB

Spider

Spider是一款基于redis的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能

Spider支持AirSpider的所有操作,此外支持数据自动入库

Spider爬取的数据需要经过item封装,在parse方法中通过yield返回item,数据库中会自动创建对应的表,将返回的item插入到表中

定义item如下

1
2
3
4
5
6
7
8
9
10
11
12
from feapder import Item


class SpiderDataItem(Item):
    """
    This class was generated by feapder.
    command: feapder create -i spider_data.
    """

    def __init__(self, *args, **kwargs):
        # self.id = None  # type : int(10) unsigned | allow_null : NO | key : PRI | default_value : None | extra : auto_increment | column_comment : 
        self.title = None  # type : varchar(255) | allow_null : YES | key :  | default_value : None | extra : | column_comment :

Spider实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *


class TestSpider(feapder.Spider):
    def start_requests(self):
        yield feapder.Request("https://www.baidu.com")
        
    def parse(self, request, response):
        title = response.xpath("//title/text()").extract_first()
        item = spider_data_item.SpiderDataItem()
        item.title = title
        # 在parse中使用yield返回item,实现批量入库
        yield item

TaskSpider

TaskSpider是一款分布式爬虫,内部封装了取种子任务的逻辑,内置支持从redis或者mysql获取任务,也可通过自定义实现从其他来源获取任务

种子表指的是一个初始URL列表,通常包含id和url两个字段,每个url就是种子,它们是爬虫开始爬取的起点,通过从这些种子url中获取内容,爬虫可以提取其他链接并继续递归抓取更多页面,直到完成预定的任务

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import feapder
from feapder import ArgumentParser


class TaskSpiderTest(feapder.TaskSpider):
    # 自定义数据库,若项目中有setting.py文件,此自定义可删除
    # redis必选,mysql可选
    __custom_setting__ = dict(
        REDISDB_IP_PORTS="localhost:6379",
        REDISDB_USER_PASS="",
        REDISDB_DB=0,
        MYSQL_IP="localhost",
        MYSQL_PORT=3306,
        MYSQL_DB="feapder",
        MYSQL_USER_NAME="feapder",
        MYSQL_USER_PASS="feapder123",
    )

    def add_task(self):
        # 添加种子任务:在调用start_monitor_task()时会调用这个函数用于初始化任务,即数据库中添加任务的url
        self._redisdb.zadd(self._task_table, {"id": 1, "url": "https://www.baidu.com"})

    def start_requests(self, task):
        task_id, url = task
        yield feapder.Request(url, task_id=task_id)

    def parse(self, request, response):
        print(response.xpath("//title/text()").extract_first())
        print(response.xpath("//meta[@name='description']/@content").extract_first())
        print("网站地址: ", response.url)

        # mysql 需要更新任务状态为做完 即 state=1
        # yield self.update_task_batch(request.task_id)

def start(args):
    """
    用mysql做种子表
    """
    spider = TaskSpiderTest(
        task_table="spider_task", # 任务表名
        task_keys=["id", "url"], # 表里查询的字段
        redis_key="test:task_spider", # redis里做任务队列的key
        keep_alive=True, # 是否常驻,适用于master/work模式,任务完成后爬虫不会退出,继续等待任务
    )
    if args == 1:
        spider.start_monitor_task()
    else:
        spider.start()


def start2(args):
    """
    用redis做种子表
    """
    spider = TaskSpiderTest(
        task_table="spider_task2", # 任务表名
        task_table_type="redis", # 任务表类型为redis
        redis_key="test:task_spider", # redis里做任务队列的key
        keep_alive=True, # 是否常驻
        use_mysql=False, # 若用不到mysql,可以不使用
    )
    # TaskSpider的爬取分为两步,分别是master和work
    # master负责初始化任务,监控批次进度,下发批次等,通过start_monitor_task()启动
    # work负责实际地爬取,通过start()启动
    if args == 1:
        spider.start_monitor_task()
    else:
        spider.start()


if __name__ == "__main__":
    parser = ArgumentParser(description="测试TaskSpider")
    parser.add_argument("--start", type=int, help="用mysql做种子表 (1|2)", function=start)
    parser.add_argument("--start2", type=int, help="用redis做种子表 (1|2)", function=start2)
    parser.start()

    # 下发任务  python3 task_spider_test.py --start 1
    # 采集  python3 task_spider_test.py --start 2

BatchSpider

BatchSpider是一款分布式批次爬虫,对于需要周期性采集的数据,优先考虑使用本爬虫

使用BatchSpider时,种子表中需要包含id、url、state字段,state字段有4种状态,分别是待抓取(0)、抓取完毕(1)、抓取中(2)、抓取失败(-1)

在爬取时,feapder会分批下发状态为0的任务到redis任务队列,并更新状态为2。当所有状态0任务都下发完毕且任务队列中没有任务,feapder会检查表中是否还有状态2的任务,将其更新为状态0,继续下发。当表中任务的状态只有1或-1时,爬取结束

状态1和-1的任务需要手动维护,当任务完成时需要更新状态为1,当任务发生异常,需要更新状态为-1,使用update_task_batch方法更新任务状态

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import feapder
from feapder import ArgumentParser
from items import *

class TestSpider(feapder.BatchSpider):
    
    # 该方法用于重置任务,每个批次开始时,默认重置非状态-1的任务为状态0
    # 将该方法重写置空,可以实现增量爬取
    def init_task(self):
        pass

    def start_requests(self, task):
        # task为任务表中取出的一条任务
        id, url = task  # id、url为所取的字段,在main函数中指定
        # id, url = task.id, task.url
        yield feapder.Request(url, task_id=id, render=True)  # task_id为任务id,用于更新任务状态

    def parse(self, request, response):
        title = response.xpath('//title/text()').extract_first()  # 取标题
        item = spider_data_item.SpiderDataItem()  # 声明一个item
        item.title = title  # 给item属性赋值
        yield item  # 返回item, item会自动批量入库
        yield self.update_task_batch(request.task_id, 1) # 更新任务状态为1

    def exception_request(self, request, response):
        """
        @summary: 请求或者parser里解析出异常的request
        @result: request / callback / None (返回值必须可迭代)
        """
        yield request

    def failed_request(self, request, response):
        """
        @summary: 超过最大重试次数的request
        @result: request / item / callback / None (返回值必须可迭代)
        """
        yield request
        yield self.update_task_batch(request.task_id, -1)  # 更新任务状态为-1

def crawl_test(args):
    spider = test_spider.TestSpider(
        redis_key="feapder:test_batch_spider",
        task_table="batch_spider_task",  # mysql中的任务表
        task_keys=["id", "url"],  # 需要获取任务表里的字段名,可添加多个
        task_state="state",  # mysql中任务状态字段
        batch_record_table="batch_spider_record",  # mysql中的批次记录表,feapder自动创建
        batch_name="批次爬虫测试",  # 批次名字
        batch_interval=7,  # 批次周期,以天为单位,若以小时为单位,可写1 / 24
    )

    if args == 1:
        spider.start_monitor_task()  # 下发及监控任务
    else:
        spider.start()  # 采集

if __name__ == "__main__":
    parser = ArgumentParser(description="批次爬虫测试")
    parser.add_argument("--start", type=int, function=crawl_test)
    parser.start()

爬虫集成

feapder支持将多个爬虫集成为一个爬虫,统一下发任务进行爬取

对于集成Spider爬虫,只需将继承的Spider类改为BaseParser类,BaseParser类支持Spider类相同的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import feapder
from items import *

# 将feapder.Spider改为feapder.BaseParser
class TestParser(feapder.BaseParser):
    def start_requests(self):
        yield feapder.Request("https://www.baidu.com")
        
    def parse(self, request, response):
        pass

spider = feapder.Spider(redis_key="feapder:test_spider_integration")
spider.add_parser(TestParser)  # 注意此处传入类名
spider.start()

对于集成BatchSpider,需要将继承类改为BatchParser类,BatchParser类支持BatchSpider类相同的方法,除了init_task方法,所有的BatchParser都使用集成后BatchSpider的init_task方法

在种子表中,需要添加一个parser_name字段,指定该任务交由哪个解析器解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import feapder

# 将feapder.BatchSpider改为feapder.BatchParser
class TencentNewsParser(feapder.BatchParser):

    def start_requests(self, task):
        task_id = task[0]
        url = task[1]
        yield feapder.Request(url, task_id=task_id)

    def parse(self, request, response):
        title = response.xpath("//title/text()").extract_first()
        print(self.name, title)
        yield self.update_task_batch(request.task_id, 1)
        
spider = feapder.BatchSpider(
    task_table="batch_spider_integration_task",  # mysql中的任务表
    batch_record_table="batch_spider_record",  # mysql中的批次记录表
    batch_name="批次爬虫集成测试",  # 批次名字
    batch_interval=7,  # 批次时间,以天为单位,若以小时为单位,可写1 / 24
    task_keys=["id", "url", "parser_name"],  # 集成BatchSpider,需要将BatchSpider的名字取出
    redis_key="feapder:test_batch_spider_integration",
    task_state="state",  # mysql中任务状态字段
)
spider.add_parser(SinaNewsParser)
spider.add_parser(TencentNewsParser)
# spider.start_monitor_task()
# spider.start()
本文由作者按照 CC BY 4.0 进行授权