Skip to content

Commit

Permalink
Merge pull request #162 from spirosdelviniotis/hepcrawl_crawl_once
Browse files Browse the repository at this point in the history
Adds: mechanism for crawling only once

Sem-Ver: feature
  • Loading branch information
david-caro authored Sep 20, 2017
2 parents 9b96510 + bd72a4f commit d98cb2d
Show file tree
Hide file tree
Showing 26 changed files with 679 additions and 152 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ nosetests.xml
coverage.xml
twistd.pid
.coverage.*
tests/unit/responses/edp/test_gz
tests/unit/responses/edp/test_rich

# Translations
*.mo
Expand All @@ -57,6 +59,8 @@ jobs
dbs
items
logs
.scrapy
scrapy_feed_uri

# Local settings
local_settings.py
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ before_install:

install:
- travis_retry docker-compose -f docker-compose.deps.yml run --rm pip
- travis_retry docker-compose -f docker-compose.test.yml run --rm scrapyd_deploy
- travis_retry docker-compose -f docker-compose.test.yml run --rm scrapyd-deploy

script:
- travis_retry docker-compose -f docker-compose.test.yml run --rm ${SUITE}
Expand Down
12 changes: 10 additions & 2 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
- APP_CRAWLER_HOST_URL=http://scrapyd:6800
- APP_API_PIPELINE_TASK_ENDPOINT_DEFAULT=hepcrawl.testlib.tasks.submit_results
- APP_FILES_STORE=/tmp/file_urls
- APP_CRAWL_ONCE_PATH=/code/.scrapy
- COVERAGE_PROCESS_START=/code/.coveragerc
- BASE_USER_UID=${BASE_USER_UID:-1000}
- BASE_USER_GIT=${BASE_USER_GIT:-1000}
Expand Down Expand Up @@ -65,10 +66,17 @@ services:
command: bash -c "rm -f twistd.pid && exec scrapyd"
links:
- celery
healthcheck:
timeout: 5s
interval: 5s
retries: 5
test:
- "CMD-SHELL"
- "curl http://localhost:6800/listprojects.json"

scrapyd_deploy:
scrapyd-deploy:
<<: *service_base
command: bash -c "sleep 8 && scrapyd-deploy" # make sure that the scrapyd is up
command: bash -c "scrapyd-deploy"
links:
- scrapyd

Expand Down
154 changes: 148 additions & 6 deletions hepcrawl/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@

from __future__ import absolute_import, division, print_function

class ErrorHandlingMiddleware(object):
import os
import time
import logging

from ftplib import FTP
from six.moves.urllib.parse import urlparse

from scrapy.exceptions import IgnoreRequest
from scrapy_crawl_once.middlewares import CrawlOnceMiddleware

from hepcrawl.utils import ftp_connection_info


"""Log errors."""
LOGGER = logging.getLogger(__name__)


class ErrorHandlingMiddleware(object):
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
Expand All @@ -24,13 +37,142 @@ def __init__(self, settings):

def process_spider_exception(self, response, exception, spider):
"""Register the error in the spider and continue."""
self.process_exception(response, exception, spider)
return self.process_exception(response, exception, spider)

def process_exception(self, request, exception, spider):
"""Register the error in the spider and continue."""
if 'errors' not in spider.state:
spider.state['errors'] = []
spider.state['errors'].append({
spider.state.setdefault('errors', []).append({
'exception': exception,
'sender': request,
})


class HepcrawlCrawlOnceMiddleware(CrawlOnceMiddleware):
"""
This spider and downloader middleware allows to avoid re-crawling pages
which were already downloaded in previous crawls.
To enable it, modify ``settings.py``::
SPIDER_MIDDLEWARES = {
# ...
'scrapy_crawl_once.CrawlOnceMiddleware': 100,
# ...
}
DOWNLOADER_MIDDLEWARES = {
# ...
'scrapy_crawl_once.CrawlOnceMiddleware': 50,
# ...
}
By default it does nothing. To avoid crawling a particular page
multiple times set ``request.meta['crawl_once'] = True``. Other
``request.meta`` keys that modify it's behavior:
* ``crawl_once_value`` - a value to store in DB. By default, timestamp
is stored for Http/Https requests and last-modified is stored for
FTP/File requests.
* ``crawl_once_key`` - unique file name is used.
Settings:
* ``CRAWL_ONCE_ENABLED``:set it to False to disable middleware. Default
is True.
* ``CRAWL_ONCE_PATH``: a path to a folder with crawled requests database.
By default ``.scrapy/crawl_once/`` path is used; this folder contains
``<spider_name>.sqlite`` files with databases of seen requests.
* ``CRAWL_ONCE_DEFAULT``: default value for ``crawl_once`` meta key (False
by default). When True, all requests are handled by this middleware
unless disabled explicitly using
``request.meta['crawl_once'] = False``.
For more info see: https://github.com/TeamHG-Memex/scrapy-crawl-once
"""
def process_request(self, request, spider):
if not request.meta.get('crawl_once', self.default):
if 'crawl_once' in request.meta:
LOGGER.info('Crawl-Once: skipping by explicit crawl_once meta')
else:
LOGGER.info('Crawl-Once: skipping by default crawl_once meta')
return

request.meta['crawl_once_key'] = self._get_key(request)
request.meta['crawl_once_value'] = self._get_timestamp(request, spider)

if not self._has_to_be_crawled(request, spider):
LOGGER.info(
'Crawl-Once: Skipping due to `has_to_be_crawled`, %s' % request
)
self.stats.inc_value('crawl_once/ignored')
raise IgnoreRequest()

LOGGER.info(
'Crawl-Once: Not skipping: %s' % request
)

def _has_to_be_crawled(self, request, spider):
request_db_key = self._get_key(request)

if request_db_key not in self.db:
return True

new_file_timestamp = self._get_timestamp(request, spider)
old_file_timestamp = self.db.get(key=request_db_key)
return new_file_timestamp > old_file_timestamp

def _get_key(self, request):
parsed_url = urlparse(request.url)
fname = os.path.basename(parsed_url.path)
if parsed_url.scheme == 'file':
prefix = 'local'
else:
prefix = 'remote'

return prefix + '::' + fname

@classmethod
def _get_timestamp(cls, request, spider):
parsed_url = urlparse(request.url)
full_url = request.url
if parsed_url.scheme == 'ftp':
last_modified = cls._get_ftp_timestamp(spider, full_url)
elif parsed_url.scheme == 'file':
last_modified = cls._get_file_timestamp(full_url)
else:
last_modified = time.time()

return last_modified

@classmethod
def _get_ftp_timestamp(cls, spider, url):
ftp_host, params = ftp_connection_info(
spider.ftp_host,
spider.ftp_netrc,
)
ftp = FTP(
host=ftp_host,
user=params['ftp_user'],
passwd=params['ftp_password'],
)
return ftp.sendcmd(
'MDTM {}'.format(
cls._get_ftp_relative_path(
url=url,
host=ftp_host
)
)
)

@staticmethod
def _get_ftp_relative_path(url, host):
return url.replace(
'ftp://{0}/'.format(host),
'',
)

@staticmethod
def _get_file_timestamp(url):
file_path = url.replace('file://', '')
return os.stat(file_path).st_mtime
17 changes: 15 additions & 2 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import absolute_import, division, print_function

import os
import shutil
import pprint

import requests

Expand Down Expand Up @@ -92,10 +94,16 @@ def open_spider(self, spider):
def _post_enhance_item(self, item, spider):
source = spider.name

return item_to_hep(
enhanced_record = item_to_hep(
item=item,
source=source,
)
spider.logger.debug(
'Got post-enhanced hep record:\n%s' % pprint.pformat(
enhanced_record
)
)
return enhanced_record

def process_item(self, item, spider):
"""Convert internal format to INSPIRE data model."""
Expand Down Expand Up @@ -124,7 +132,8 @@ def _prepare_payload(self, spider):
]
return payload

def _cleanup(self, spider):
@staticmethod
def _cleanup(spider):
"""Run cleanup."""
# Cleanup errors
if 'errors' in spider.state:
Expand Down Expand Up @@ -175,6 +184,10 @@ def close_spider(self, spider):
"""Post results to BROKER API."""
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

if hasattr(spider, 'tmp_dir'):
shutil.rmtree(path=spider.tmp_dir, ignore_errors=True)

if 'SCRAPY_JOB' in os.environ and self.count > 0:
task_endpoint = spider.settings[
'API_PIPELINE_TASK_ENDPOINT_MAPPING'
Expand Down
9 changes: 9 additions & 0 deletions hepcrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,23 @@
# See http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
SPIDER_MIDDLEWARES = {
'hepcrawl.middlewares.ErrorHandlingMiddleware': 543,
'hepcrawl.middlewares.HepcrawlCrawlOnceMiddleware': 100,
}

# Enable or disable downloader middlewares
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
'hepcrawl.middlewares.ErrorHandlingMiddleware': 543,
'hepcrawl.middlewares.HepcrawlCrawlOnceMiddleware': 100,
}

CRAWL_ONCE_ENABLED = True
CRAWL_ONCE_DEFAULT = True
CRAWL_ONCE_PATH = os.environ.get(
'APP_CRAWL_ONCE_PATH',
'/var/lib/scrapy/crawl_once/',
)

# Enable or disable extensions
# See http://scrapy.readthedocs.org/en/latest/topics/extensions.html
EXTENSIONS = {
Expand Down
15 changes: 13 additions & 2 deletions hepcrawl/spiders/desy_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,18 @@ def start_requests(self):
yield request

@staticmethod
def _get_full_uri(current_path, base_url, schema, hostname=''):
def _get_full_uri(current_path, base_url, schema, hostname=None):
hostname = hostname or ''
if os.path.isabs(current_path):
full_path = current_path
else:
full_path = os.path.join(base_url, current_path)

return '{schema}://{hostname}{full_path}'.format(**vars())
return '{schema}://{hostname}{full_path}'.format(
schema=schema,
hostname=hostname,
full_path=full_path,
)

def parse(self, response):
"""Parse a ``Desy`` XML file into a :class:`hepcrawl.utils.ParsedItem`.
Expand All @@ -208,8 +213,12 @@ def parse(self, response):
url_schema = 'file'
hostname = None

self.log('Getting marc xml records...')
marcxml_records = self._get_marcxml_records(response.body)
self.log('Got %d marc xml records' % len(marcxml_records))
self.log('Getting hep records...')
hep_records = self._hep_records_from_marcxml(marcxml_records)
self.log('Got %d hep records' % len(hep_records))

for hep_record in hep_records:
list_file_urls = [
Expand All @@ -222,12 +231,14 @@ def parse(self, response):
for fft_path in hep_record['_fft']
]

self.log('Got the following fft urls: %s' % list_file_urls)
parsed_item = ParsedItem(
record=hep_record,
file_urls=list_file_urls,
ftp_params=ftp_params,
record_format='hep',
)
self.log('Got item: %s' % parsed_item)

yield parsed_item

Expand Down
Loading

0 comments on commit d98cb2d

Please sign in to comment.