Skip to content

Commit

Permalink
migrate wikidata datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
Binh Vu committed Aug 29, 2023
1 parent 11eb394 commit 4fe6a3b
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 112 deletions.
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "none"
}
17 changes: 12 additions & 5 deletions kgdata/spark/extended_rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,32 @@ def use(self, other: DatasetSignature) -> DatasetSignature:
if self == other:
return self

assert other.name != NEW_DATASET_NAME
if self.name == NEW_DATASET_NAME:
newone = DatasetSignature(
name=self.name,
created_at=self.created_at,
checksum=self.checksum,
dependencies=self.dependencies.copy(),
)
if other.name in newone.dependencies:
assert other == newone.dependencies[other.name]
if other.name == NEW_DATASET_NAME:
newone.dependencies.update(other.dependencies)
else:
newone.dependencies[other.name] = other
if other.name in newone.dependencies:
assert other == newone.dependencies[other.name]
else:
newone.dependencies[other.name] = other
return newone
else:
deps: dict[str, DatasetSignature] = {self.name: self}
if other.name == NEW_DATASET_NAME:
deps.update(other.dependencies)
else:
deps[other.name] = other
return DatasetSignature(
name=NEW_DATASET_NAME,
created_at="",
checksum="",
dependencies={self.name: self, other.name: other},
dependencies=deps,
)

def without_dependencies(self) -> DatasetSignature:
Expand Down
47 changes: 23 additions & 24 deletions kgdata/wikidata/datasets/class_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,51 @@
import orjson

from kgdata.dataset import Dataset
from kgdata.spark import does_result_dir_exist, saveAsSingleTextFile
from kgdata.spark import does_result_dir_exist
from kgdata.wikidata.config import WikidataDirCfg
from kgdata.wikidata.datasets.classes import classes
from kgdata.wikidata.datasets.entities import entities
from kgdata.wikidata.datasets.entity_types import entity_types, get_instanceof


def class_count(lang="en") -> Dataset[Tuple[str, int]]:
def class_count() -> Dataset[Tuple[str, int]]:
cfg = WikidataDirCfg.get_instance()
ds = Dataset(
cfg.class_count / "*.gz",
deserialize=orjson.loads,
name="class-count",
dependencies=[entity_types(), classes()],
)

if not does_result_dir_exist(cfg.class_count):
ds = entity_types(lang)
if ds.does_exist():
rdd = ds.get_rdd()
else:
rdd = entities(lang).get_rdd().map(get_instanceof)

class_count = rdd.flatMap(lambda x: [(c, 1) for c in x[1]]).reduceByKey(add)
class_count = (
entity_types()
.get_extended_rdd()
.flatMap(lambda x: [(c, 1) for c in x[1]])
.reduceByKey(add)
)
(
classes(lang)
.get_rdd()
classes()
.get_extended_rdd()
.map(lambda x: (x.id, 0))
.leftOuterJoin(class_count)
.map(lambda x: (x[0], x[1][1] if x[1][1] is not None else 0))
.map(orjson.dumps)
.saveAsTextFile(
str(cfg.class_count),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
.save_like_dataset(
ds, auto_coalesce=True, shuffle=True, max_num_partitions=512
)
)

ds = Dataset(cfg.class_count / "*.gz", deserialize=orjson.loads)

if not (cfg.class_count / "../class_count_sorted.tsv").exists():
rdd = (
classes(lang)
.get_rdd()
(
classes()
.get_extended_rdd()
.map(lambda x: (x.id, str(x.label)))
.join(ds.get_rdd())
.join(ds.get_extended_rdd())
.map(lambda x: (x[0], x[1][0], x[1][1]))
.sortBy(lambda x: x[2], ascending=False)
.map(lambda x: "\t".join([str(y) for y in x]))
)

saveAsSingleTextFile(
rdd, str(cfg.class_count / "../class_count_sorted.tsv"), shuffle=False
.save_as_single_text_file(cfg.class_count / "../class_count_sorted.tsv")
)

return ds
2 changes: 1 addition & 1 deletion kgdata/wikidata/datasets/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def classes(lang: str = "en") -> Dataset[WDClass]:
)

get_ds = lambda subdir: Dataset(
cfg.classes / f"{subdir}-{lang}*.gz",
cfg.classes / f"{subdir}-{lang}/*.gz",
deserialize=partial(deser_from_dict, WDClass),
name=f"classes/{subdir}/{lang}",
dependencies=[entities(lang)],
Expand Down
17 changes: 9 additions & 8 deletions kgdata/wikidata/datasets/cross_wiki_mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from functools import lru_cache
from typing import Iterable, Optional, Tuple

import orjson
Expand Down Expand Up @@ -33,6 +34,8 @@ def cross_wiki_mapping(
wd_ds = Dataset(
cfg.cross_wiki_mapping / "from-wikidata/*.gz",
deserialize=WikipediaWikidataMapping.deser,
name="cross-wiki-mapping/from-wikidata",
dependencies=[entities()],
)

need_verification = False
Expand All @@ -42,10 +45,8 @@ def cross_wiki_mapping(
.get_extended_rdd()
.flatMap(extract_sitelink)
.map(WikipediaWikidataMapping.ser)
.save_as_dataset(
wd_ds.get_data_directory(),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
name=f"cross-wiki-mapping/from-wikidata",
.save_like_dataset(
wd_ds, auto_coalesce=True, shuffle=True, max_num_partitions=1024
)
)
need_verification = True
Expand All @@ -68,6 +69,8 @@ def cross_wiki_mapping(
wdwpds = Dataset(
cfg.cross_wiki_mapping / f"from-wikidata-wikipedia/*.gz",
deserialize=WikipediaWikidataMapping.deser,
name="cross-wiki-mapping/from-wikidata-wikipedia",
dependencies=[wd_ds, wiki_articles],
)
if not wdwpds.has_complete_data():
(
Expand All @@ -82,10 +85,8 @@ def cross_wiki_mapping(
.map(resolve_multiple_mapping)
.filter(lambda x: x is not None)
.map(lambda e: WikipediaWikidataMapping.ser(assert_not_null(e)))
.save_as_dataset(
wdwpds.get_data_directory(),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
name="cross-wiki-mapping/from-wikidata-wikipedia",
.save_like_dataset(
wdwpds, auto_coalesce=True, shuffle=True, max_num_partitions=1024
)
)
need_verification = True
Expand Down
29 changes: 17 additions & 12 deletions kgdata/wikidata/datasets/entity_all_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import math
from functools import partial
from functools import lru_cache, partial

from kgdata.dataset import Dataset
from kgdata.dbpedia.datasets.entity_all_types import (
Expand All @@ -21,14 +21,21 @@
PARTITION_SIZE = 10000


def entity_all_types(lang="en") -> Dataset[EntityAllTypes]:
@lru_cache()
def entity_all_types() -> Dataset[EntityAllTypes]:
cfg = WikidataDirCfg.get_instance()
ds = Dataset(
file_pattern=cfg.entity_all_types / "*.gz",
deserialize=EntityAllTypes.deser,
name="entity-all-types",
dependencies=[classes(), entity_types()],
)

unique_check = False

if not does_result_dir_exist(cfg.entity_all_types):
id2count = (
class_count(lang)
class_count()
.get_rdd()
.filter(lambda tup: tup[1] > PARTITION_SIZE)
.map(lambda tup: (tup[0], math.ceil(tup[1] / PARTITION_SIZE)))
Expand All @@ -41,31 +48,29 @@ def entity_all_types(lang="en") -> Dataset[EntityAllTypes]:

id2ancestors = (
classes()
.get_rdd()
.get_extended_rdd()
.flatMap(lambda c: extrapolate_class(c, bc_id2count.value))
)

(
entity_types(lang)
.get_rdd()
entity_types()
.get_extended_rdd()
.flatMap(partial(flip_types, type_count=bc_id2count.value))
.groupByKey()
.leftOuterJoin(id2ancestors)
.flatMap(merge_types)
.groupByKey()
.map(lambda x: EntityAllTypes(x[0], merge_type_dist(x[1])))
.map(EntityAllTypes.ser)
.saveAsTextFile(
str(cfg.entity_all_types),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
.save_like_dataset(
ds,
auto_coalesce=True,
max_num_partitions=1024,
)
)

unique_check = True

ds = Dataset(
file_pattern=cfg.entity_all_types / "*.gz", deserialize=EntityAllTypes.deser
)
if unique_check:
assert are_records_unique(ds.get_rdd(), lambda x: x.id)

Expand Down
54 changes: 35 additions & 19 deletions kgdata/wikidata/datasets/entity_degrees.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from functools import lru_cache
from operator import add
from typing import Optional

Expand All @@ -15,52 +16,67 @@
from kgdata.wikipedia.misc import get_title_from_url


def entity_degrees(lang: str = "en") -> Dataset[EntityDegree]:
@lru_cache
def entity_degrees() -> Dataset[EntityDegree]:
cfg = WikidataDirCfg.get_instance()

if not does_result_dir_exist(cfg.entity_degrees / "step1"):
ent_rdd = entities(lang).get_rdd()
step1_ds = Dataset(
cfg.entity_degrees / "step1/*.gz",
deserialize=EntityDegree.deser,
name="entity-degrees/step1",
dependencies=[
entities(),
],
)
ds = Dataset(
cfg.entity_degrees / "step2/*.gz",
deserialize=EntityDegree.deser,
name="entity-degrees",
dependencies=[
entities(),
article_degrees(),
cross_wiki_mapping(article_metadata()),
],
)
if not step1_ds.has_complete_data():
ent_rdd = entities().get_extended_rdd()

outdegree = ent_rdd.map(lambda e: (e.id, get_outdegree(e)))
indegree = ent_rdd.flatMap(extract_indegree_links).reduceByKey(add)

(
outdegree.leftOuterJoin(indegree)
.map(merge_degree)
.map(EntityDegree.ser)
.coalesce(128)
.saveAsTextFile(
str(cfg.entity_degrees / "step1"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
)
.save_like_dataset(step1_ds, checksum=False)
)

if not does_result_dir_exist(cfg.entity_degrees / "step2"):
(
Dataset(cfg.entity_degrees / "step1/*.gz", deserialize=EntityDegree.deser)
.get_rdd()
step1_ds.get_extended_rdd()
.map(lambda e: (e.id, e))
.leftOuterJoin(
cross_wiki_mapping(article_metadata())
.get_rdd()
.get_extended_rdd()
.map(lambda x: (x.wikipedia_title, x))
.join(
article_degrees(lang)
.get_rdd()
article_degrees()
.get_extended_rdd()
.map(lambda a: (get_title_from_url(a.url), a))
)
.map(lambda tup: (tup[1][0].wikidata_entityid, tup[1][1]))
)
.map(merge_article_degree)
.map(EntityDegree.ser)
.coalesce(128)
.saveAsTextFile(
str(cfg.entity_degrees / "step2"),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
.save_like_dataset(
ds,
auto_coalesce=True,
shuffle=True,
max_num_partitions=1024,
trust_dataset_dependencies=True,
)
)

return Dataset(cfg.entity_degrees / "step2/*.gz", deserialize=EntityDegree.deser)
return ds


def merge_article_degree(
Expand Down
21 changes: 13 additions & 8 deletions kgdata/wikidata/datasets/entity_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@
from kgdata.wikidata.models.wdentity import WDEntity


def entity_types(lang="en") -> Dataset[tuple[str, list[str]]]:
def entity_types() -> Dataset[tuple[str, list[str]]]:
"""Extract types of entities. Mapping from entity id to its type"""
cfg = WikidataDirCfg.get_instance()

if not does_result_dir_exist(cfg.entity_types):
ds = Dataset(
cfg.entity_types / "*.gz",
deserialize=orjson.loads,
name="entity-types",
dependencies=[entities()],
)
if not ds.has_complete_data():
(
entities(lang)
.get_rdd()
entities()
.get_extended_rdd()
.map(get_instanceof)
.map(orjson.dumps)
.saveAsTextFile(
str(cfg.entity_types),
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec",
.save_like_dataset(
ds, auto_coalesce=True, shuffle=True, max_num_partitions=1024
)
)

return Dataset(cfg.entity_types / "*.gz", deserialize=orjson.loads)
return ds


def get_instanceof(ent: WDEntity) -> tuple[str, list[str]]:
Expand Down
Loading

0 comments on commit 4fe6a3b

Please sign in to comment.