Skip to content

Commit

Permalink
WIP for desy spider
Browse files Browse the repository at this point in the history
Signed-off-by: Spiros Delviniotis <[email protected]>
  • Loading branch information
spirosdelviniotis committed Jul 12, 2017
1 parent ad88862 commit 54970bf
Show file tree
Hide file tree
Showing 26 changed files with 1,267 additions and 56 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ env:
- SUITE=unit
- SUITE=functional_wsp
- SUITE=functional_arxiv
- SUITE=functional_desy

matrix:
fast_finish: true
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ services:
- scrapyd
- ftp_server

functional_desy:
<<: *service_base
command: py.test -vv tests/functional/desy
links:
- scrapyd
- ftp_server

functional_arxiv:
<<: *service_base
command: py.test -vv tests/functional/arxiv
Expand Down Expand Up @@ -68,6 +75,7 @@ services:
environment:
- PUBLICHOST=localhost
volumes:
- ${PWD}/tests/functional/desy/fixtures/ftp_server/DESY:/home/ftpusers/bob/DESY
- ${PWD}/tests/functional/wsp/fixtures/ftp_server/WSP:/home/ftpusers/bob/WSP
- ${PWD}/tests/functional/wsp/fixtures/ftp_server/pureftpd.passwd:/etc/pure-ftpd/passwd/pureftpd.passwd

Expand Down
2 changes: 1 addition & 1 deletion hepcrawl/crawler2hep.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _filter_affiliation(affiliations):
acquisition_source = crawler_record.get('acquisition_source', {})
builder.add_acquisition_source(
method=acquisition_source['method'],
date=acquisition_source['date'],
date=acquisition_source['datetime'],
source=acquisition_source['source'],
submission_number=acquisition_source['submission_number'],
)
Expand Down
61 changes: 49 additions & 12 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import requests

from lxml import etree
from dojson.contrib.marc21.utils import create_record

from inspire_dojson.hep import hep
from inspire_schemas.utils import validate

from .crawler2hep import crawler2hep


Expand Down Expand Up @@ -50,20 +56,12 @@ def __init__(self):
def open_spider(self, spider):
self.results_data = []

def process_item(self, item, spider):
"""Convert internal format to INSPIRE data model."""
self.count += 1
def _post_enhance_item(self, item, spider):
item = self._generate_record_meta(item, spider)
if 'related_article_doi' in item:
item['dois'] += item.pop('related_article_doi', [])

source = spider.name
item['acquisition_source'] = {
'source': source,
'method': 'hepcrawl',
'date': datetime.datetime.now().isoformat(),
'submission_number': os.environ.get('SCRAPY_JOB', ''),
}

item['titles'] = [{
'title': item.pop('title', ''),
'subtitle': item.pop('subtitle', ''),
Expand Down Expand Up @@ -115,10 +113,49 @@ def process_item(self, item, spider):
])

item = crawler2hep(dict(item))
spider.logger.debug('Validated item.')
self.results_data.append(item)
spider.logger.debug('Validated item by Builder.')
return item

def _read_item_from_marcxml(self, item, spider): # change names and split
def _create_valid_hep_record(str_xml_record):
object_record = create_record(etree.XML(str_xml_record))
dojson_record = hep.do(object_record)
dojson_record = self._generate_record_meta(dojson_record, spider)
validate(dojson_record, 'hep')
return dojson_record

list_hep_records = []
for str_xml_record in item['marcxml']:
hep_record = _create_valid_hep_record(str_xml_record)
spider.logger.debug('Validated hep-record.')
list_hep_records.append(hep_record)

return list_hep_records

def _generate_record_meta(self, item, spider):
item['acquisition_source'] = {
'source': spider.name,
'method': 'hepcrawl',
'datetime': datetime.datetime.now().isoformat(),
'submission_number': os.environ.get('SCRAPY_JOB', ''),
}
return item

def process_item(self, item, spider):
"""Convert internal format to INSPIRE data model."""
self.count += 1

if item.get('marcxml'):
item = self._read_item_from_marcxml(item, spider)
self.results_data.extend(item)
else:
item = self._post_enhance_item(item, spider)
self.results_data.append(item)

return {
'dict': item
}

def _prepare_payload(self, spider):
"""Return payload for push."""
payload = dict(
Expand Down
129 changes: 129 additions & 0 deletions hepcrawl/spiders/desy_spider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
#
# This file is part of hepcrawl.
# Copyright (C) 2017 CERN.
#
# hepcrawl is a free software; you can redistribute it and/or modify it
# under the terms of the Revised BSD License; see LICENSE file for
# more details.

"""Spider for DESY."""

from __future__ import absolute_import, division, print_function

import os

from lxml import etree


from scrapy import Request
from scrapy.spiders import Spider

from ..utils import (
ftp_list_files,
ftp_connection_info,
)


class DesySpider(Spider):
"""Desy spider.
This spider connects to a given FTP hosts and downloads XML files
for extraction into HEP records.
Examples:
To run a crawl, you need to pass FTP connection information via
``ftp_host`` and ``ftp_netrc``::
$ scrapy crawl desy -a 'ftp_host=ftp.example.com' -a 'ftp_netrc=/path/to/netrc'
To run a crawl on local folder, you need to pass the absolute ``package_path``::
$ scrapy crawl desy -a 'package_path=/path/to/package_dir'
"""
name = 'desy'
custom_settings = {}
start_urls = []

def __init__(
self,
package_path=None,
ftp_folder='DESY',
ftp_host=None,
ftp_netrc=None,
*args,
**kwargs
):
"""Constructor of ``Desy`` spider."""
super(DesySpider, self).__init__(*args, **kwargs)
self.ftp_folder = ftp_folder
self.ftp_host = ftp_host
self.ftp_netrc = ftp_netrc
self.package_path = package_path
self.target_folder = '/tmp/DESY'
if not os.path.exists(self.target_folder):
os.makedirs(self.target_folder)

def start_requests(self):
"""List selected folder on remote FTP and yield files."""
if self.package_path:
file_names = os.listdir(self.package_path)

for file_name in file_names:
file_path = os.path.join(self.package_path, file_name)
yield Request(
'file://{0}'.format(file_path),
callback=self.parse,
)
else:
ftp_host, ftp_params = ftp_connection_info(self.ftp_host, self.ftp_netrc)

remote_files_paths = ftp_list_files(
self.ftp_folder,
target_folder=self.target_folder,
server=ftp_host,
user=ftp_params['ftp_user'],
password=ftp_params['ftp_password'],
lst_missing_files=False,
)

for remote_file in remote_files_paths:
self.log('Try to crawl file from FTP: {0}'.format(remote_file))
remote_file = str(remote_file)
ftp_params['ftp_local_filename'] = os.path.join(
self.target_folder,
os.path.basename(remote_file),
)
remote_url = 'ftp://{0}/{1}'.format(ftp_host, remote_file)
yield Request(
str(remote_url),
meta=ftp_params,
callback=self.handle_package_ftp,
)

def parse(self, response):
"""Parse a ``Desy`` XML file into a HEP record."""
self.log('Got record from url/path: {0}'.format(response.url))

list_marcxml_records = self._get_records(response.body)

return {
'marcxml': list_marcxml_records,
}

def handle_package_ftp(self, response):
"""Yield every XML file found."""
self.log('Visited url {}'.format(response.url))
file_path = response.body
yield Request(
'file://{0}'.format(file_path),
meta={'package_path': file_path}
)

def _get_records(self, response_body):
root = etree.fromstring(response_body)
list_items = root.findall('.//{http://www.loc.gov/MARC21/slim}record')
if not list_items:
list_items = root.findall('.//record')

return [etree.tostring(item) for item in list_items]
2 changes: 1 addition & 1 deletion hepcrawl/spiders/wsp_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def start_requests(self):

new_files_paths = ftp_list_files(
self.ftp_folder,
self.target_folder,
target_folder=self.target_folder,
server=ftp_host,
user=ftp_params['ftp_user'],
password=ftp_params['ftp_password']
Expand Down
21 changes: 17 additions & 4 deletions hepcrawl/testlib/celery_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@


class CeleryMonitor(object):
def __init__(self, app, monitor_timeout=3, monitor_iter_limit=100):
def __init__(self, app, monitor_timeout=3, monitor_iter_limit=100, events_limit=2):
self.results = []
self.recv = None
self.app = app
self.connection = None
self.monitor_timeout = monitor_timeout
self.monitor_iter_limit = monitor_iter_limit
self.events_limit = events_limit

def __enter__(self):
state = self.app.events.State()
Expand Down Expand Up @@ -61,17 +62,24 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.__exit__()

def _wait_for_results(self, events_iter):
any(islice(
generator_events = islice(
events_iter, # iterable
self.monitor_iter_limit # stop
))
)
counter = 0
for dummy in generator_events:
if dummy:
counter += 1
if counter == self.events_limit:
break

@classmethod
def do_crawl(
cls,
app,
monitor_timeout,
monitor_iter_limit,
events_limit,
crawler_instance,
project='hepcrawl',
spider='WSP',
Expand All @@ -80,7 +88,12 @@ def do_crawl(
):
settings = settings or {}

with cls(app, monitor_timeout=monitor_timeout, monitor_iter_limit=monitor_iter_limit) as my_monitor:
with cls(
app,
monitor_timeout=monitor_timeout,
monitor_iter_limit=monitor_iter_limit,
events_limit=events_limit
) as my_monitor:
crawler_instance.schedule(
project=project,
spider=spider,
Expand Down
23 changes: 20 additions & 3 deletions hepcrawl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,34 @@ def ftp_connection_info(ftp_host, netrc_file, passive_mode=False):
return ftp_host, connection_params


def ftp_list_files(server_folder, target_folder, server, user, password, passive_mode=False):
def ftp_list_files(
server_folder,
server,
user,
password,
target_folder=None,
passive_mode=False,
lst_missing_files=True,
):
"""List files from given FTP's server folder to target folder."""
session_factory = ftputil.session.session_factory(
base_class=ftplib.FTP,
port=21,
use_passive_mode=passive_mode,
encrypt_data_channel=True)
encrypt_data_channel=True,
)

with ftputil.FTPHost(server, user, password, session_factory=session_factory) as host:
file_names = host.listdir(os.path.join(host.curdir, '/', server_folder))
return list_missing_files(server_folder, target_folder, file_names)
if lst_missing_files:
return list_missing_files(server_folder, target_folder, file_names)
else:
return [
os.path.join(
server_folder,
file_name
) for file_name in file_names
]


def local_list_files(local_folder, target_folder):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
install_requires = [
'autosemver~=0.2',
'inspire-schemas~=41.0',
'inspire-dojson~=41.0',
'Scrapy>=1.1.0',
# TODO: unpin once they support wheel building again
'scrapyd==1.1.0',
Expand Down
1 change: 1 addition & 0 deletions tests/functional/arxiv/test_arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_arxiv(set_up_local_environment, expected_results):
app=celery_app,
monitor_timeout=5,
monitor_iter_limit=100,
events_limit=1,
crawler_instance=crawler,
project=set_up_local_environment.get('CRAWLER_PROJECT'),
spider='arXiv',
Expand Down
Loading

0 comments on commit 54970bf

Please sign in to comment.