irpas技术客

【Scrapy】Item Pipeline_zzy979481894

网络投稿 7431

项目管道(Item Pipeline)用于处理Spider返回的Item对象,如果定义了多个项目管道,则按优先级顺序执行

官方文档:https://docs.scrapy.org/en/latest/topics/item-pipeline.html

项目管道就是实现了process_item()方法的Python类,用于处理Spider返回的Item对象 注意:Scrapy并没有提供所谓的ItemPipeline基类,自定义的项目管道类只要有process_item()方法即可,这是利用了Python的协议(protocol)或鸭子类型(duck-typing)的特性,即只关心一个类是否有所需的属性或方法,而不是(像Java那样)通过基类或接口来限制对象的类型

自定义项目管道

项目管道类必须实现以下方法:

process_item(self, item, spider)

对接收到的Item对象执行某些操作(例如保存到数据库),并决定该Item对象继续被后续的项目管道处理,还是不再继续处理 item是要处理的Item对象(可能是字典、Item类或其他),spider是对应的爬虫对象 该方法要么返回一个Item对象,给其他项目管道继续处理;要么产生一个DropItem异常,表示不再继续处理

以下方法是可选的:

open_spider(self, spider)

当spider开始时该方法被调用

close_spider(self, spider)

当spider关闭时该方法被调用

from_crawler(cls, crawler)

这是一个类方法,用于从一个Crawler对象创建项目管道实例

项目管道的典型用途 清理HTML数据验证爬取到的数据(是否包含特定的字段)去重将爬取到的数据保存到数据库

定义项目管道后要在设置文件中激活:

ITEM_PIPELINES = { 'myproject.pipelines.MyPipeline1': 300, 'myproject.pipelines.MyPipeline2': 800, }

数字为优先级,范围0~1000,数字越小优先级越高

示例 验证字段 from itemadapter import ItemAdapter from scrapy.exceptions import DropItem class PricePipeline: vat_factor = 1.15 def process_item(self, item, spider): adapter = ItemAdapter(item) if adapter.get('price'): if adapter.get('price_excludes_vat'): adapter['price'] *= self.vat_factor return item else: raise DropItem('Missing price in {}'.format(item)) 写入JSON文件 import json from itemadapter import ItemAdapter class JsonWriterPipeline: def open_spider(self, spider): self.file = open('items.jsonl', 'w') def close_spider(self, spider): self.file.close() def process_item(self, item, spider): line = json.dumps(ItemAdapter(item).asdict()) + '\n' self.file.write(line) return item 保存到MongoDB import pymongo from itemadapter import ItemAdapter class MongoPipeline: collection_name = 'scrapy_items' def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): self.db[self.collection_name].insert_one(ItemAdapter(item).asdict()) return item 去重 from itemadapter import ItemAdapter from scrapy.exceptions import DropItem class DuplicatesPipeline: def __init__(self): self.ids_seen = set() def process_item(self, item, spider): adapter = ItemAdapter(item) if adapter['id'] in self.ids_seen: raise DropItem('Duplicate item found: {}'.format(item)) else: self.ids_seen.add(adapter['id']) return item


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #scrapyitem #pipeline #项目管道Item