Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds: mechanism for crawling only once #162

Merged
merged 26 commits into from
Sep 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b97cd4b
middlwares: indentation fix
spirosdelviniotis Aug 17, 2017
15ab37d
setup: add `scrapy-crawl-once` plug-in
spirosdelviniotis Aug 17, 2017
7694fe3
middlewares: add support for crawling only once
spirosdelviniotis Aug 17, 2017
a283b21
tests: support for `scrapy-crawl-once`
spirosdelviniotis Aug 17, 2017
1301d25
tests: add tests for `scrapy-clawl-once`
spirosdelviniotis Aug 17, 2017
98a7796
tests: delete unused function for `arxiv` tests
spirosdelviniotis Aug 18, 2017
a758819
wsp: add temporary folder to the crawlings
spirosdelviniotis Aug 18, 2017
28a2603
testlib: refactored `clean_dir` default arguments
spirosdelviniotis Aug 18, 2017
026bdfb
tests: add `clean_dir` to tests that using the DB
spirosdelviniotis Aug 18, 2017
e7ff520
wsp: minor fix
spirosdelviniotis Aug 18, 2017
b509c82
pipelines: minor fix
spirosdelviniotis Aug 18, 2017
a82a158
global: add `CRAWL_ONCE_PATH` to settings
spirosdelviniotis Aug 18, 2017
5a358e1
middlewares: refactor crawl once middleware
david-caro Sep 4, 2017
16ae688
pep8: wsp_spider
david-caro Sep 4, 2017
1ac8785
setup: pin scrapy-crawl-once to the major version
david-caro Sep 4, 2017
6dffdc8
tests: remove unneeded var dir chown
david-caro Sep 4, 2017
6ffa2df
unit: small pep8 refactor
david-caro Sep 20, 2017
ddda124
gitignore: add some test product files
david-caro Sep 20, 2017
610fce4
docker-compose: swap sleep with healthcheck
david-caro Sep 20, 2017
ff1eb86
utils: nicer ParsedItem string reperesentation
david-caro Sep 20, 2017
f111993
celery_monitor: small refactor and default event fix
david-caro Sep 20, 2017
bcf8f76
tohep: add some useful debug logs
david-caro Sep 20, 2017
5aacb1c
middlewares: use better key for crawl-once
david-caro Sep 20, 2017
1a53584
pipelines: added extra debug log
david-caro Sep 20, 2017
3110b0f
desy: adapt to the new middleware
david-caro Sep 20, 2017
bd72a4f
wsp: adapt to new middleware and refactor
david-caro Sep 20, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ nosetests.xml
coverage.xml
twistd.pid
.coverage.*
tests/unit/responses/edp/test_gz
tests/unit/responses/edp/test_rich

# Translations
*.mo
Expand All @@ -57,6 +59,8 @@ jobs
dbs
items
logs
.scrapy
scrapy_feed_uri

# Local settings
local_settings.py
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ before_install:

install:
- travis_retry docker-compose -f docker-compose.deps.yml run --rm pip
- travis_retry docker-compose -f docker-compose.test.yml run --rm scrapyd_deploy
- travis_retry docker-compose -f docker-compose.test.yml run --rm scrapyd-deploy

script:
- travis_retry docker-compose -f docker-compose.test.yml run --rm ${SUITE}
Expand Down
12 changes: 10 additions & 2 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
- APP_CRAWLER_HOST_URL=http://scrapyd:6800
- APP_API_PIPELINE_TASK_ENDPOINT_DEFAULT=hepcrawl.testlib.tasks.submit_results
- APP_FILES_STORE=/tmp/file_urls
- APP_CRAWL_ONCE_PATH=/code/.scrapy
- COVERAGE_PROCESS_START=/code/.coveragerc
- BASE_USER_UID=${BASE_USER_UID:-1000}
- BASE_USER_GIT=${BASE_USER_GIT:-1000}
Expand Down Expand Up @@ -65,10 +66,17 @@ services:
command: bash -c "rm -f twistd.pid && exec scrapyd"
links:
- celery
healthcheck:
timeout: 5s
interval: 5s
retries: 5
test:
- "CMD-SHELL"
- "curl http://localhost:6800/listprojects.json"

scrapyd_deploy:
scrapyd-deploy:
<<: *service_base
command: bash -c "sleep 8 && scrapyd-deploy" # make sure that the scrapyd is up
command: bash -c "scrapyd-deploy"
links:
- scrapyd

Expand Down
154 changes: 148 additions & 6 deletions hepcrawl/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@

from __future__ import absolute_import, division, print_function

class ErrorHandlingMiddleware(object):
import os
import time
import logging

from ftplib import FTP
from six.moves.urllib.parse import urlparse

from scrapy.exceptions import IgnoreRequest
from scrapy_crawl_once.middlewares import CrawlOnceMiddleware

from hepcrawl.utils import ftp_connection_info


"""Log errors."""
LOGGER = logging.getLogger(__name__)


class ErrorHandlingMiddleware(object):
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
Expand All @@ -24,13 +37,142 @@ def __init__(self, settings):

def process_spider_exception(self, response, exception, spider):
"""Register the error in the spider and continue."""
self.process_exception(response, exception, spider)
return self.process_exception(response, exception, spider)

def process_exception(self, request, exception, spider):
"""Register the error in the spider and continue."""
if 'errors' not in spider.state:
spider.state['errors'] = []
spider.state['errors'].append({
spider.state.setdefault('errors', []).append({
'exception': exception,
'sender': request,
})


class HepcrawlCrawlOnceMiddleware(CrawlOnceMiddleware):
"""
This spider and downloader middleware allows to avoid re-crawling pages
which were already downloaded in previous crawls.

To enable it, modify ``settings.py``::

SPIDER_MIDDLEWARES = {
# ...
'scrapy_crawl_once.CrawlOnceMiddleware': 100,
# ...
}

DOWNLOADER_MIDDLEWARES = {
# ...
'scrapy_crawl_once.CrawlOnceMiddleware': 50,
# ...
}

By default it does nothing. To avoid crawling a particular page
multiple times set ``request.meta['crawl_once'] = True``. Other
``request.meta`` keys that modify it's behavior:

* ``crawl_once_value`` - a value to store in DB. By default, timestamp
is stored for Http/Https requests and last-modified is stored for
FTP/File requests.
* ``crawl_once_key`` - unique file name is used.

Settings:

* ``CRAWL_ONCE_ENABLED``:set it to False to disable middleware. Default
is True.
* ``CRAWL_ONCE_PATH``: a path to a folder with crawled requests database.
By default ``.scrapy/crawl_once/`` path is used; this folder contains
``<spider_name>.sqlite`` files with databases of seen requests.
* ``CRAWL_ONCE_DEFAULT``: default value for ``crawl_once`` meta key (False
by default). When True, all requests are handled by this middleware
unless disabled explicitly using
``request.meta['crawl_once'] = False``.


For more info see: https://github.com/TeamHG-Memex/scrapy-crawl-once
"""
def process_request(self, request, spider):
if not request.meta.get('crawl_once', self.default):
if 'crawl_once' in request.meta:
LOGGER.info('Crawl-Once: skipping by explicit crawl_once meta')
else:
LOGGER.info('Crawl-Once: skipping by default crawl_once meta')
return

request.meta['crawl_once_key'] = self._get_key(request)
request.meta['crawl_once_value'] = self._get_timestamp(request, spider)

if not self._has_to_be_crawled(request, spider):
LOGGER.info(
'Crawl-Once: Skipping due to `has_to_be_crawled`, %s' % request
)
self.stats.inc_value('crawl_once/ignored')
raise IgnoreRequest()

LOGGER.info(
'Crawl-Once: Not skipping: %s' % request
)

def _has_to_be_crawled(self, request, spider):
request_db_key = self._get_key(request)

if request_db_key not in self.db:
return True

new_file_timestamp = self._get_timestamp(request, spider)
old_file_timestamp = self.db.get(key=request_db_key)
return new_file_timestamp > old_file_timestamp

def _get_key(self, request):
parsed_url = urlparse(request.url)
fname = os.path.basename(parsed_url.path)
if parsed_url.scheme == 'file':
prefix = 'local'
else:
prefix = 'remote'

return prefix + '::' + fname

@classmethod
def _get_timestamp(cls, request, spider):
parsed_url = urlparse(request.url)
full_url = request.url
if parsed_url.scheme == 'ftp':
last_modified = cls._get_ftp_timestamp(spider, full_url)
elif parsed_url.scheme == 'file':
last_modified = cls._get_file_timestamp(full_url)
else:
last_modified = time.time()

return last_modified

@classmethod
def _get_ftp_timestamp(cls, spider, url):
ftp_host, params = ftp_connection_info(
spider.ftp_host,
spider.ftp_netrc,
)
ftp = FTP(
host=ftp_host,
user=params['ftp_user'],
passwd=params['ftp_password'],
)
return ftp.sendcmd(
'MDTM {}'.format(
cls._get_ftp_relative_path(
url=url,
host=ftp_host
)
)
)

@staticmethod
def _get_ftp_relative_path(url, host):
return url.replace(
'ftp://{0}/'.format(host),
'',
)

@staticmethod
def _get_file_timestamp(url):
file_path = url.replace('file://', '')
return os.stat(file_path).st_mtime
17 changes: 15 additions & 2 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import absolute_import, division, print_function

import os
import shutil
import pprint

import requests

Expand Down Expand Up @@ -92,10 +94,16 @@ def open_spider(self, spider):
def _post_enhance_item(self, item, spider):
source = spider.name

return item_to_hep(
enhanced_record = item_to_hep(
item=item,
source=source,
)
spider.logger.debug(
'Got post-enhanced hep record:\n%s' % pprint.pformat(
enhanced_record
)
)
return enhanced_record

def process_item(self, item, spider):
"""Convert internal format to INSPIRE data model."""
Expand Down Expand Up @@ -124,7 +132,8 @@ def _prepare_payload(self, spider):
]
return payload

def _cleanup(self, spider):
@staticmethod
def _cleanup(spider):
"""Run cleanup."""
# Cleanup errors
if 'errors' in spider.state:
Expand Down Expand Up @@ -175,6 +184,10 @@ def close_spider(self, spider):
"""Post results to BROKER API."""
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

if hasattr(spider, 'tmp_dir'):
shutil.rmtree(path=spider.tmp_dir, ignore_errors=True)

if 'SCRAPY_JOB' in os.environ and self.count > 0:
task_endpoint = spider.settings[
'API_PIPELINE_TASK_ENDPOINT_MAPPING'
Expand Down
9 changes: 9 additions & 0 deletions hepcrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,23 @@
# See http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
SPIDER_MIDDLEWARES = {
'hepcrawl.middlewares.ErrorHandlingMiddleware': 543,
'hepcrawl.middlewares.HepcrawlCrawlOnceMiddleware': 100,
}

# Enable or disable downloader middlewares
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
'hepcrawl.middlewares.ErrorHandlingMiddleware': 543,
'hepcrawl.middlewares.HepcrawlCrawlOnceMiddleware': 100,
}

CRAWL_ONCE_ENABLED = True
CRAWL_ONCE_DEFAULT = True
CRAWL_ONCE_PATH = os.environ.get(
'APP_CRAWL_ONCE_PATH',
'/var/lib/scrapy/crawl_once/',
)

# Enable or disable extensions
# See http://scrapy.readthedocs.org/en/latest/topics/extensions.html
EXTENSIONS = {
Expand Down
15 changes: 13 additions & 2 deletions hepcrawl/spiders/desy_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,18 @@ def start_requests(self):
yield request

@staticmethod
def _get_full_uri(current_path, base_url, schema, hostname=''):
def _get_full_uri(current_path, base_url, schema, hostname=None):
hostname = hostname or ''
if os.path.isabs(current_path):
full_path = current_path
else:
full_path = os.path.join(base_url, current_path)

return '{schema}://{hostname}{full_path}'.format(**vars())
return '{schema}://{hostname}{full_path}'.format(
schema=schema,
hostname=hostname,
full_path=full_path,
)

def parse(self, response):
"""Parse a ``Desy`` XML file into a :class:`hepcrawl.utils.ParsedItem`.
Expand All @@ -208,8 +213,12 @@ def parse(self, response):
url_schema = 'file'
hostname = None

self.log('Getting marc xml records...')
marcxml_records = self._get_marcxml_records(response.body)
self.log('Got %d marc xml records' % len(marcxml_records))
self.log('Getting hep records...')
hep_records = self._hep_records_from_marcxml(marcxml_records)
self.log('Got %d hep records' % len(hep_records))

for hep_record in hep_records:
list_file_urls = [
Expand All @@ -222,12 +231,14 @@ def parse(self, response):
for fft_path in hep_record['_fft']
]

self.log('Got the following fft urls: %s' % list_file_urls)
parsed_item = ParsedItem(
record=hep_record,
file_urls=list_file_urls,
ftp_params=ftp_params,
record_format='hep',
)
self.log('Got item: %s' % parsed_item)

yield parsed_item

Expand Down
Loading