Changeset 1827:61f26fb8b3b0
- Timestamp:
- 11/06/09 13:46:36 (9 months ago)
- Author:
- Pablo Hoffman <pablo@…>
- Branch:
- default
- Message:
-
Changed item pipeline API to pass spider references (instead of domain names) to process_item() method
- Files:
-
Legend:
- Unmodified
- Added
- Removed
-
|
r1751
|
r1827
|
|
| 154 | 154 | |
| 155 | 155 | class StoreItemPipeline(object): |
| 156 | | def process_item(self, domain, response, item): |
| | 156 | def process_item(self, spider, item): |
| 157 | 157 | torrent_id = item['url'].split('/')[-1] |
| 158 | 158 | f = open("torrent-%s.pickle" % torrent_id, "w") |
-
|
r1783
|
r1827
|
|
| 157 | 157 | [dmoz] INFO: Enabled spider middlewares: ... |
| 158 | 158 | [dmoz] INFO: Enabled item pipelines: ... |
| 159 | | [dmoz.org] INFO: Domain opened |
| | 159 | [dmoz.org] INFO: Spider opened |
| 160 | 160 | [dmoz.org] DEBUG: Crawled <http://www.dmoz.org/Computers/Programming/Languages/Python/Resources/> from <None> |
| 161 | 161 | [dmoz.org] DEBUG: Crawled <http://www.dmoz.org/Computers/Programming/Languages/Python/Books/> from <None> |
| 162 | | [dmoz.org] INFO: Domain closed (finished) |
| | 162 | [dmoz.org] INFO: Spider closed (finished) |
| 163 | 163 | [-] Main loop terminated. |
| 164 | 164 | |
| 165 | 165 | Pay attention to the lines containing ``[dmoz.org]``, which corresponds to |
| 166 | | our spider (identified by the domain "dmoz.org"). You can see a log line for each |
| 167 | | URL defined in ``start_urls``. Because these URLs are the starting ones, they |
| 168 | | have no referrers, which is shown at the end of the log line, where it says |
| 169 | | ``from <None>``. |
| | 166 | our spider (identified by the domain ``"dmoz.org"``). You can see a log line |
| | 167 | for each URL defined in ``start_urls``. Because these URLs are the starting |
| | 168 | ones, they have no referrers, which is shown at the end of the log line, |
| | 169 | where it says ``from <None>``. |
| 170 | 170 | |
| 171 | 171 | But more interesting, as our ``parse`` method instructs, two files have been |
| … |
… |
|
| 446 | 446 | |
| 447 | 447 | class DmozPipeline(object): |
| 448 | | def process_item(self, domain, item): |
| | 448 | def process_item(self, spider, item): |
| 449 | 449 | return item |
| 450 | 450 | |
| … |
… |
|
| 462 | 462 | self.csvwriter = csv.writer(open('items.csv', 'wb')) |
| 463 | 463 | |
| 464 | | def process_item(self, domain, item): |
| | 464 | def process_item(self, spider, item): |
| 465 | 465 | self.csvwriter.writerow([item['title'][0], item['link'][0], item['desc'][0]]) |
| 466 | 466 | return item |
-
|
r1822
|
r1827
|
|
| 52 | 52 | |
| 53 | 53 | def spider_opened(self, spider): |
| 54 | | domain = spider.domain_name |
| 55 | | file = open('%s_products.xml' % domain, 'w+b') |
| 56 | | self.files[domain] = file |
| | 54 | file = open('%s_products.xml' % spider.domain_name, 'w+b') |
| | 55 | self.files[spider] = file |
| 57 | 56 | self.exporter = XmlItemExporter(file) |
| 58 | 57 | self.exporter.start_exporting() |
| 59 | 58 | |
| 60 | 59 | def spider_closed(self, spider): |
| 61 | | domain = spider.domain_name |
| 62 | 60 | self.exporter.finish_exporting() |
| 63 | | file = self.files.pop(domain) |
| | 61 | file = self.files.pop(spider) |
| 64 | 62 | file.close() |
| 65 | 63 | |
| 66 | | def process_item(self, domain, item): |
| | 64 | def process_item(self, spider, item): |
| 67 | 65 | self.exporter.export_item(item) |
| 68 | 66 | return item |
-
|
r1822
|
r1827
|
|
| 25 | 25 | single Python class that must define the following method: |
| 26 | 26 | |
| 27 | | .. method:: process_item(domain, item) |
| 28 | | |
| 29 | | ``domain`` is a string with the domain of the spider which scraped the item |
| 30 | | |
| 31 | | ``item`` is a :class:`~scrapy.item.Item` with the item scraped |
| | 27 | .. method:: process_item(spider, item) |
| | 28 | |
| | 29 | :param spider: the spider which scraped the item |
| | 30 | :type spider: :class:`~scrapy.spider.BaseSpider` object |
| | 31 | |
| | 32 | :param item: the item scraped |
| | 33 | :type item: :class:`~scrapy.item.Item` object |
| 32 | 34 | |
| 33 | 35 | This method is called for every item pipeline component and must either return |
| … |
… |
|
| 50 | 52 | vat_factor = 1.15 |
| 51 | 53 | |
| 52 | | def process_item(self, domain, item): |
| | 54 | def process_item(self, spider, item): |
| 53 | 55 | if item['price']: |
| 54 | 56 | if item['price_excludes_vat']: |
| … |
… |
|
| 69 | 71 | ] |
| 70 | 72 | |
| 71 | | Item pipeline example with resources per domain |
| | 73 | Item pipeline example with resources per spider |
| 72 | 74 | =============================================== |
| 73 | 75 | |
| 74 | 76 | Sometimes you need to keep resources about the items processed grouped per |
| 75 | | domain, and delete those resource when a domain finish. |
| | 77 | spider, and delete those resource when a spider finish. |
| 76 | 78 | |
| 77 | 79 | An example is a filter that looks for duplicate items, and drops those items |
| … |
… |
|
| 91 | 93 | |
| 92 | 94 | def spider_opened(self, spider): |
| 93 | | self.duplicates[spider.domain_name] = set() |
| | 95 | self.duplicates[spider] = set() |
| 94 | 96 | |
| 95 | 97 | def spider_closed(self, spider): |
| 96 | | del self.duplicates[spider.domain_name] |
| 97 | | |
| 98 | | def process_item(self, domain, item): |
| 99 | | if item.id in self.duplicates[domain]: |
| | 98 | del self.duplicates[spider] |
| | 99 | |
| | 100 | def process_item(self, spider, item): |
| | 101 | if item.id in self.duplicates[spider]: |
| 100 | 102 | raise DropItem("Duplicate item found: %s" % item) |
| 101 | 103 | else: |
| 102 | | self.duplicates[domain].add(item.id) |
| | 104 | self.duplicates[spider].add(item.id) |
| 103 | 105 | return item |
| 104 | 106 | |
-
|
r1822
|
r1827
|
|
| 37 | 37 | from scrapy.conf import settings |
| 38 | 38 | |
| 39 | | items_per_domain = settings.getint('ITEMSAMPLER_COUNT', 1) |
| 40 | | close_domain = settings.getbool('ITEMSAMPLER_CLOSE_DOMAIN', False) |
| | 39 | items_per_spider = settings.getint('ITEMSAMPLER_COUNT', 1) |
| | 40 | close_spider = settings.getbool('ITEMSAMPLER_CLOSE_SPIDER', False) |
| 41 | 41 | max_response_size = settings.getbool('ITEMSAMPLER_MAX_RESPONSE_SIZE', ) |
| 42 | 42 | |
| … |
… |
|
| 48 | 48 | raise NotConfigured |
| 49 | 49 | self.items = {} |
| 50 | | self.domains_count = 0 |
| | 50 | self.spiders_count = 0 |
| 51 | 51 | self.empty_domains = set() |
| 52 | 52 | dispatcher.connect(self.spider_closed, signal=signals.spider_closed) |
| 53 | 53 | dispatcher.connect(self.engine_stopped, signal=signals.engine_stopped) |
| 54 | 54 | |
| 55 | | def process_item(self, item, spider): |
| 56 | | domain = spider.domain_name |
| 57 | | sampled = stats.get_value("items_sampled", 0, domain=domain) |
| 58 | | if sampled < items_per_domain: |
| | 55 | def process_item(self, spider, item): |
| | 56 | sampled = stats.get_value("items_sampled", 0, domain=spider.domain_name) |
| | 57 | if sampled < items_per_spider: |
| 59 | 58 | self.items[item.guid] = item |
| 60 | 59 | sampled += 1 |
| 61 | | stats.set_value("items_sampled", sampled, domain=domain) |
| 62 | | log.msg("Sampled %s" % item, domain=domain, level=log.INFO) |
| 63 | | if close_domain and sampled == items_per_domain: |
| | 60 | stats.set_value("items_sampled", sampled, domain=spider.domain_name) |
| | 61 | log.msg("Sampled %s" % item, spider=spider, level=log.INFO) |
| | 62 | if close_spider and sampled == items_per_spider: |
| 64 | 63 | scrapyengine.close_spider(spider) |
| 65 | 64 | return item |
| … |
… |
|
| 69 | 68 | pickle.dump(self.items, f) |
| 70 | 69 | if self.empty_domains: |
| 71 | | log.msg("No products sampled for: %s" % " ".join(self.empty_domains), level=log.WARNING) |
| | 70 | log.msg("No products sampled for: %s" % " ".join(self.empty_domains), \ |
| | 71 | level=log.WARNING) |
| 72 | 72 | |
| 73 | 73 | def spider_closed(self, spider, reason): |
| 74 | | domain = spider.domain_name |
| 75 | | if reason == 'finished' and not stats.get_value("items_sampled", domain=domain): |
| 76 | | self.empty_domains.add(domain) |
| 77 | | self.domains_count += 1 |
| 78 | | log.msg("Sampled %d domains so far (%d empty)" % (self.domains_count, len(self.empty_domains)), level=log.INFO) |
| | 74 | if reason == 'finished' and not stats.get_value("items_sampled", domain=spider.domain_name): |
| | 75 | self.empty_domains.add(spider.domain_name) |
| | 76 | self.spiders_count += 1 |
| | 77 | log.msg("Sampled %d domains so far (%d empty)" % (self.spiders_count, \ |
| | 78 | len(self.empty_domains)), level=log.INFO) |
| 79 | 79 | |
| 80 | 80 | |
| … |
… |
|
| 88 | 88 | |
| 89 | 89 | def process_spider_input(self, response, spider): |
| 90 | | if stats.get_value("items_sampled", domain=spider.domain_name) >= items_per_domain: |
| | 90 | if stats.get_value("items_sampled", domain=spider.domain_name) >= items_per_spider: |
| 91 | 91 | return [] |
| 92 | 92 | elif max_response_size and max_response_size > len(response_httprepr(response)): |
| … |
… |
|
| 101 | 101 | items.append(r) |
| 102 | 102 | |
| 103 | | if stats.get_value("items_sampled", domain=spider.domain_name) >= items_per_domain: |
| | 103 | if stats.get_value("items_sampled", domain=spider.domain_name) >= items_per_spider: |
| 104 | 104 | return [] |
| 105 | 105 | else: |
-
|
r1713
|
r1827
|
|
| 58 | 58 | return item |
| 59 | 59 | current_stage = stages_left.pop(0) |
| 60 | | d = mustbe_deferred(current_stage.process_item, spider.domain_name, item) |
| | 60 | d = mustbe_deferred(current_stage.process_item, spider, item) |
| 61 | 61 | d.addCallback(next_stage, stages_left) |
| 62 | 62 | return d |
-
|
r1632
|
r1827
|
|
| 18 | 18 | dispatcher.connect(self.engine_stopped, signals.engine_stopped) |
| 19 | 19 | |
| 20 | | def process_item(self, domain, item): |
| | 20 | def process_item(self, spider, item): |
| 21 | 21 | self.exporter.export_item(item) |
| 22 | 22 | return item |
-
|
r1822
|
r1827
|
|
| 13 | 13 | DOWNLOAD_PRIORITY = 1000 |
| 14 | 14 | |
| 15 | | class DomainInfo(object): |
| | 15 | class SpiderInfo(object): |
| 16 | 16 | def __init__(self, spider): |
| 17 | | self.domain = spider.domain_name |
| 18 | 17 | self.spider = spider |
| 19 | 18 | self.downloading = {} |
| … |
… |
|
| 22 | 21 | |
| 23 | 22 | def __init__(self): |
| 24 | | self.domaininfo = {} |
| | 23 | self.spiderinfo = {} |
| 25 | 24 | dispatcher.connect(self.spider_opened, signals.spider_opened) |
| 26 | 25 | dispatcher.connect(self.spider_closed, signals.spider_closed) |
| 27 | 26 | |
| 28 | 27 | def spider_opened(self, spider): |
| 29 | | self.domaininfo[spider.domain_name] = self.DomainInfo(spider) |
| | 28 | self.spiderinfo[spider] = self.SpiderInfo(spider) |
| 30 | 29 | |
| 31 | 30 | def spider_closed(self, spider): |
| 32 | | del self.domaininfo[spider.domain_name] |
| | 31 | del self.spiderinfo[spider] |
| 33 | 32 | |
| 34 | | def process_item(self, domain, item): |
| 35 | | info = self.domaininfo[domain] |
| | 33 | def process_item(self, spider, item): |
| | 34 | info = self.spiderinfo[spider] |
| 36 | 35 | requests = arg_to_iter(self.get_media_requests(item, info)) |
| 37 | 36 | dlist = [] |
| … |
… |
|
| 84 | 83 | info.downloading[fp] = (request, dwld) # fill downloading state data |
| 85 | 84 | dwld.addBoth(_downloaded) # append post-download hook |
| 86 | | dwld.addErrback(log.err, domain=info.domain) |
| | 85 | dwld.addErrback(log.err, spider=info.spider) |
| 87 | 86 | |
| 88 | 87 | # declare request in downloading state (None is used as place holder) |
-
|
r1822
|
r1827
|
|
| 28 | 28 | dispatcher.connect(self.spider_closed, signal=signals.spider_closed) |
| 29 | 29 | |
| 30 | | def process_item(self, domain, item): |
| | 30 | def process_item(self, spider, item): |
| 31 | 31 | guid = str(item.guid) |
| 32 | 32 | |
| 33 | | if guid in self.stores[domain]: |
| 34 | | if self.stores[domain][guid] == item: |
| | 33 | if guid in self.stores[spider]: |
| | 34 | if self.stores[spider][guid] == item: |
| 35 | 35 | status = 'old' |
| 36 | 36 | else: |
| … |
… |
|
| 40 | 40 | |
| 41 | 41 | if not status == 'old': |
| 42 | | self.stores[domain][guid] = item |
| 43 | | self.log(domain, item, status) |
| | 42 | self.stores[spider][guid] = item |
| | 43 | self.log(spider, item, status) |
| 44 | 44 | return item |
| 45 | 45 | |
| 46 | 46 | def spider_opened(self, spider): |
| 47 | | domain = spider.domain_name |
| 48 | | uri = Template(self.uritpl).substitute(domain=domain) |
| 49 | | self.stores[domain] = Shove(uri, **self.opts) |
| | 47 | uri = Template(self.uritpl).substitute(domain=spider.domain_name) |
| | 48 | self.stores[spider] = Shove(uri, **self.opts) |
| 50 | 49 | |
| 51 | 50 | def spider_closed(self, spider): |
| 52 | | self.stores[spider.domain_name].sync() |
| | 51 | self.stores[spider].sync() |
| 53 | 52 | |
| 54 | | def log(self, domain, item, status): |
| 55 | | log.msg("Shove (%s): Item guid=%s" % (status, item.guid), level=log.DEBUG, domain=domain) |
| | 53 | def log(self, spider, item, status): |
| | 54 | log.msg("Shove (%s): Item guid=%s" % (status, item.guid), level=log.DEBUG, \ |
| | 55 | spider=spider) |