Skip to content

Commit

Permalink
Update brooklyn config to support new paging options
Browse files Browse the repository at this point in the history
Remove extra list flattening

Update tests after removing aub/aco collection

WIP

Remove debugging print statements
  • Loading branch information
aaron-collier committed Jan 8, 2025
1 parent 13dce28 commit 8e14577
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 220 deletions.
2 changes: 2 additions & 0 deletions bin/get
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def main(opts):
if collection is None:
sys.exit(f'💥 Provider "{opts.provider}" does not have a collection "{opts.collection}"')

print(f"Harvesting collection: {opts.collection}")

# set driver record limit if it is allowed by the driver
if opts.limit:
if hasattr(collection.catalog, 'record_limit'):
Expand Down
109 changes: 46 additions & 63 deletions catalogs/aub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,6 @@ metadata:
data_path: aub
schedule: "30 13 15 Jan,Apr,Jul,Oct *"
sources:
aco:
driver: oai_xml
args:
collection_url: https://libraries.aub.edu.lb/xtf/oai
metadata_prefix: oai_dc
set: "aco"
allow_expiration: true
full_harvest: true
metadata:
data_path: aub/aco
config: aub
fields:
id:
path: "//header:identifier"
namespace:
header: "http://www.openarchives.org/OAI/2.0/"
optional: true
aladab:
driver: iiif_json_v3
args:
Expand All @@ -30,78 +13,78 @@ sources:
page_fields:
- id
- thumbnail.id
limit: 1000
limit: 1010
metadata:
data_path: aub/aladab
config: aub
fields:
id:
path: "id"
postcards:
driver: oai_xml
driver: iiif_json_v3
args:
collection_url: https://libraries.aub.edu.lb/xtf/oai
metadata_prefix: oai_dc
set: "postcards"
allow_expiration: true
full_harvest: true
collection_url: https://libraries.aub.edu.lb/iiifservices/collection/Postcards
paging:
pages_url: https://libraries.aub.edu.lb/iiifservices/collection/Postcards/{offset}/{limit}
page_data: items
page_fields:
- id
- thumbnail.id
limit: 1010
metadata:
data_path: aub/postcards
config: aub
fields:
id:
path: "//header:identifier"
namespace:
header: "http://www.openarchives.org/OAI/2.0/"
optional: true
path: "id"
posters:
driver: oai_xml
driver: iiif_json_v3
args:
collection_url: https://libraries.aub.edu.lb/xtf/oai
metadata_prefix: oai_dc
set: "posters"
allow_expiration: true
full_harvest: true
collection_url: https://libraries.aub.edu.lb/iiifservices/collection/Posters
paging:
pages_url: https://libraries.aub.edu.lb/iiifservices/collection/Posters/{offset}/{limit}
page_data: items
page_fields:
- id
- thumbnail.id
limit: 1000
metadata:
data_path: aub/posters
data_path: aub/postcards
config: aub
fields:
id:
path: "//header:identifier"
namespace:
header: "http://www.openarchives.org/OAI/2.0/"
optional: true
thamarat_al_funun:
driver: oai_xml
path: "id"
travelbooks:
driver: iiif_json_v3
args:
collection_url: https://libraries.aub.edu.lb/xtf/oai
metadata_prefix: oai_dc
set: "thf"
allow_expiration: true
full_harvest: true
collection_url: https://libraries.aub.edu.lb/iiifservices/collection/Travel%20Books
paging:
pages_url: https://libraries.aub.edu.lb/iiifservices/collection/Travel%20Books/{offset}/{limit}
page_data: items
page_fields:
- id
- thumbnail.id
limit: 1000
metadata:
data_path: aub/thamarat_al_funun
data_path: aub/postcards
config: aub
fields:
id:
path: "//header:identifier"
namespace:
header: "http://www.openarchives.org/OAI/2.0/"
optional: true
travelbooks:
driver: oai_xml
path: "id"
manuscripts:
driver: iiif_json_v3
args:
collection_url: https://libraries.aub.edu.lb/xtf/oai
metadata_prefix: oai_dc
set: "travelbooks"
allow_expiration: true
full_harvest: true
collection_url: https://libraries.aub.edu.lb/iiifservices/collection/Manuscripts
paging:
pages_url: https://libraries.aub.edu.lb/iiifservices/collection/Manuscripts/{offset}/{limit}
page_data: items
page_fields:
- id
- thumbnail.id
limit: 1000
metadata:
data_path: aub/travelbooks
data_path: aub/postcards
config: aub
fields:
id:
path: "//header:identifier"
namespace:
header: "http://www.openarchives.org/OAI/2.0/"
optional: true
path: "id"
3 changes: 2 additions & 1 deletion catalogs/brooklyn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ sources:
args:
collection_url: https://www.brooklynmuseum.org/api/v2/object/
paging:
pages_url: https://www.brooklynmuseum.org/api/v2/collection/5/object
pages_url: https://www.brooklynmuseum.org/api/v2/collection/5/object?limit={limit}&offset={offset}
urls: data.id
limit: 25
page_data: data
record_selector: "data"
api_key: "0IzFpBiUksT8LMVGLUxovj9IR0ltlSH1"
metadata:
Expand Down
89 changes: 34 additions & 55 deletions dlme_airflow/drivers/iiif_json_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,27 @@ def __init__(
super(IiifV3JsonSource, self).__init__(metadata=metadata)
self.collection_url = collection_url
self.paging = paging
self._manifest_urls = []
self._manifests = []
self._path_expressions = {}
self.record_count = 0
self.record_limit = self.metadata.get("record_limit")
self.partition_builder = None


def _open_collection(self):
self._manifest_urls = self._get_manifest_urls()

def _get_manifest_urls(self):
if self.paging:
return PartitionBuilder(self.collection_url, self.paging).urls()
else:
return self._get_manifest_urls_from_items()
self.partition_builder = PartitionBuilder(self.collection_url, self.paging)


def _get_manifest_urls_from_items(self):
resp = self._get(self.collection_url)
collection_result = resp.json()
urls = []
if "items" in collection_result: # IIIF v3
manifests = collection_result["items"]
else:
raise Exception(
f"Unknown collection manifest format: {self.collection_url}"
)
def _open_collection(self):
self._manifests = self._get_manifests()

for manifest in manifests:
if "@id" in manifest:
url = manifest["@id"] # valid in IIIF v2 or v3
elif "id" in manifest:
url = manifest["id"] # valid in IIIF v3 only
else:
raise Exception(f"Unknown URL in manifest: {manifest}")
urls.append(url)

return urls

def _get_manifests(self):
if self.paging:
return self.partition_builder.records()


def _open_manifest(self, manifest_url: str) -> Optional[dict]:
def _open_manifest(self, manifest: dict) -> Optional[dict]:
manifest_url = manifest["id"]
resp = self._get(manifest_url)
if resp.status_code == 200:
manifest_result = resp.json()
Expand All @@ -77,6 +58,9 @@ def _open_manifest(self, manifest_url: str) -> Optional[dict]:
record.update(
self._extract_manifest_metadata(manifest_result.get("metadata", []))
)

# Handles the thumbnail field provided in the collection manifest
record.update({"thumbnail": manifest.get("thumbnail")})
return record

def _extract_specified_fields(self, iiif_manifest: dict) -> dict:
Expand Down Expand Up @@ -106,7 +90,7 @@ def _optional_field_warning(self, id, field, expression, optional):
if optional is True:
logging.debug(f"{id} missing optional field: '{field}'; searched path: '{expression}'")
return

logging.warning(f"{id} missing required field: '{field}'; searched path: '{expression}'")


Expand All @@ -116,33 +100,28 @@ def _extract_manifest_metadata(
output: dict[str, list[str]] = {}

for row in iiif_manifest_metadata:
for key in row.get("label").keys():
name = (
row.get("label")[key][0]
.replace(" ", "-")
.lower()
.replace("(", "")
.replace(")", "")
)
# initialize or append to output[name] based on whether we've seen the label
metadata_value = row.get("value")[key]
if not metadata_value:
continue

if name in output:
output.update({name: _flatten_list([output[name], metadata_value])})
else:
output[name] = metadata_value

# flatten any nested lists into a single list
return {k: list(_flatten_list(v)) for (k, v) in output.items()}
(label, values) = self._extract_metadata_for_row(row)
output.setdefault(label, []).extend(values)

return output

def _extract_metadata_for_row(self, row):
values = []
lang = next(iter(row.get("label")))
label = row.get("label")[lang][0].replace(" ", "-").lower().replace("(", "").replace(")", "")
for key in row.get("label").keys():
# initialize or append to output[name] based on whether we've seen the label
values += row.get("value")[key]

return label, values


def _get_partition(self, i) -> pd.DataFrame:
# if we are over the defined limit return an empty DataFrame right away
if self.record_limit is not None and self.record_count > self.record_limit:
return pd.DataFrame()

result = self._open_manifest(self._manifest_urls[i])
result = self._open_manifest(self._manifests[i])

# If the dictionary has AT LEAST one value that is not None return a
# DataFrame with the keys as columns, and the values as a row.
Expand All @@ -165,7 +144,7 @@ def _get_schema(self):
datashape=None,
dtype=self.dtype,
shape=None,
npartitions=len(self._manifest_urls),
npartitions=len(self._manifests),
extra_metadata={},
)

Expand All @@ -188,10 +167,10 @@ def _stringify_and_strip_if_list(record) -> list[str]:
result_list = []
for data in record:
result_list.append(_stringify_and_strip_if_list(data))

if len(result_list) == 1:
return result_list[0]

return result_list


Expand Down
30 changes: 30 additions & 0 deletions dlme_airflow/utils/partition_url_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(
self.paging_config = paging_config
self.provider_data = None
self.api_key = api_key
self.data = []

def urls(self):
if self.paging_config.get("pages_url"):
Expand All @@ -30,6 +31,13 @@ def urls(self):

return []

def records(self):
if self.paging_config.get("pages_url"):
return self._prefetch_page_data()

# def data(self):
# return self.data

def _urls_from_provider(self):
urls = [self.collection_url]
expression = jsonpath_ng.parse(self.paging_config["urls"])
Expand Down Expand Up @@ -71,12 +79,29 @@ def _prefetch_page_urls(self):
harvested = len(data)

ids += self._extract_ids(data)
if self.paging_config.get("page_fields"):
self.data += self._extract_data(data)

if harvested < self.paging_config["limit"]:
break

return ids

def _prefetch_page_data(self):
offset = 0
harvested = 0
data = []
while True:
api_endpoint = self.paging_config['pages_url'].format(offset=offset,limit=self.paging_config['limit'])
data += self._fetch_provider_data(api_endpoint)[self.paging_config['page_data']]
offset += self.paging_config["limit"]
harvested = len(data)

if harvested < self.paging_config["limit"]:
break

return data

def _fetch_provider_data(self, url):
headers = {}
if self.api_key:
Expand All @@ -89,6 +114,11 @@ def _fetch_provider_data(self, url):
def _extract_ids(self, data):
return [self._format_id(i['id']) for i in data]

def _extract_data(self, data):
return [{
self._format_id(i['id']): i['thumbnail'][0]['id']
} for i in data]

def _format_id(self, id):
if validators.url(id):
return id
Expand Down
Loading

0 comments on commit 8e14577

Please sign in to comment.