Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: proper class for field info #1730

Draft
wants to merge 6 commits into
base: fix/parallel-datapackage
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions frictionless/detector/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,31 +404,30 @@ def detect_schema(
schema.fields = fields # type: ignore

# Sync schema
if self.schema_sync:
if labels:
case_sensitive = options["header_case"]
if self.schema_sync and labels:
case_sensitive = options["header_case"]

if not case_sensitive:
labels = [label.lower() for label in labels]
if not case_sensitive:
labels = [label.lower() for label in labels]

if len(labels) != len(set(labels)):
note = '"schema_sync" requires unique labels in the header'
raise FrictionlessException(note)
if len(labels) != len(set(labels)):
note = '"schema_sync" requires unique labels in the header'
raise FrictionlessException(note)

mapped_fields = self.mapped_schema_fields_names(
schema.fields, # type: ignore
case_sensitive,
)
mapped_fields = self.map_schema_fields_by_name(
schema.fields,
case_sensitive,
)

self.rearrange_schema_fields_given_labels(
mapped_fields,
schema,
labels,
)
self.rearrange_schema_fields_given_labels(
mapped_fields,
schema,
labels,
)

self.add_missing_required_labels_to_schema_fields(
mapped_fields, schema, labels, case_sensitive
)
self.add_missing_required_labels_to_schema_fields(
mapped_fields, schema, labels, case_sensitive
)

# Patch schema
if self.schema_patch:
Expand All @@ -445,7 +444,7 @@ def detect_schema(
return schema

@staticmethod
def mapped_schema_fields_names(
def map_schema_fields_by_name(
fields: List[Field], case_sensitive: bool
) -> Dict[str, Field]:
"""Create a dictionnary to map field names with schema fields"""
Expand All @@ -460,8 +459,10 @@ def rearrange_schema_fields_given_labels(
schema: Schema,
labels: List[str],
):
"""Rearrange fields according to the order of labels. All fields
missing from labels are dropped"""
"""Rearrange fields according to the order of labels.
All fields missing from labels are dropped.
Any extra-field is filled in with a default `"type": "any"` field.
"""
schema.clear_fields()

for name in labels:
Expand Down
119 changes: 86 additions & 33 deletions frictionless/resources/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,9 @@ def __open_lookup(self):
self.__lookup[source_name][source_key].add(cells)

def __open_row_stream(self):
# TODO: we need to rework this field_info / row code
# During row streaming we create a field info structure
# This structure is optimized and detached version of schema.fields
# We create all data structures in-advance to share them between rows

# Create field info
field_number = 0
field_info: Dict[str, Any] = {"names": [], "objects": [], "mapping": {}}
for field in self.schema.fields:
field_number += 1
field_info["names"].append(field.name)
field_info["objects"].append(field.to_copy())
field_info["mapping"][field.name] = (
field,
field_number,
field.create_cell_reader(),
field.create_cell_writer(),
)
fields_info = FieldsInfo(
self.schema.fields, self.labels, self.detector.schema_sync
)

# Create state
memory_unique: Dict[str, Any] = {}
Expand Down Expand Up @@ -313,7 +298,7 @@ def row_stream():

row = Row(
cells,
field_info=field_info,
fields_info=fields_info,
row_number=row_number,
)

Expand Down Expand Up @@ -395,21 +380,20 @@ def row_stream():

if self.detector.schema_sync:
# Missing required labels are not included in the
# field_info parameter used for row creation
# fields_info parameter used for row creation
for field in self.schema.fields:
self.remove_missing_required_label_from_field_info(field, field_info)
self.remove_missing_required_label_from_field_info(field, fields_info)

# Create row stream
self.__row_stream = row_stream()

def remove_missing_required_label_from_field_info(
self, field: Field, field_info: Dict[str, Any]
self, field: Field, fields_info: FieldsInfo
):
is_case_sensitive = self.dialect.header_case
if self.label_is_missing(
field.name, field_info["names"], self.labels, is_case_sensitive
field.name, fields_info.ls(), self.labels, is_case_sensitive
):
self.remove_field_from_field_info(field.name, field_info)
fields_info.rm(field.name)

@staticmethod
def label_is_missing(
Expand All @@ -430,17 +414,12 @@ def label_is_missing(

return field_name not in table_labels and field_name in expected_field_names

@staticmethod
def remove_field_from_field_info(field_name: str, field_info: Dict[str, Any]):
field_index = field_info["names"].index(field_name)
del field_info["names"][field_index]
del field_info["objects"][field_index]
del field_info["mapping"][field_name]

def primary_key_cells(self, row: Row, case_sensitive: bool) -> Tuple[Any, ...]:
"""Create a tuple containg all cells from a given row associated to primary
keys"""
return tuple(row[label] for label in self.primary_key_labels(row, case_sensitive))
return tuple(
row[label] for label in self.primary_key_labels(row, case_sensitive)
)

def primary_key_labels(
self,
Expand Down Expand Up @@ -700,3 +679,77 @@ def write(
self, target: Optional[Union[Resource, Any]] = None, **options: Any
) -> TableResource:
return self.write_table(target, **options)


class _FieldInfo:
"""Private class to store additional data alongside a field."""

def __init__(self, field: Field, field_number: int):
"""field_number: 1-indexed rank of the field"""
self.field = field
self.field_number = field_number
self.cell_reader = field.create_cell_reader()
self.cell_writer = field.create_cell_writer()


class FieldsInfo:
"""Helper class for linking columns to schema fields.

It abstracts away the different ways of making this link. In particular, the
reference may be the schema (`detector.schema_sync = False`), or the labels
(`detector.schema_sync = True`).

This class is not Public API, and should be used only in non-public
interfaces.
"""

def __init__(
self, fields: List[Field], labels: Optional[List[str]], schema_sync: bool
):
if schema_sync and labels:
self._expected_fields: List[_FieldInfo] = []
if len(labels) != len(set(labels)):
note = '"schema_sync" requires unique labels in the header'
raise FrictionlessException(note)

for label_index, label in enumerate(labels):
try:
field = next(f for f in fields if f.name == label)
except StopIteration:
field = Field.from_descriptor({"name": label, "type": "any"})
self._expected_fields.append(_FieldInfo(field, label_index + 1))
else:
self._expected_fields = [
_FieldInfo(field, i + 1) for i, field in enumerate(fields)
]

def ls(self) -> List[str]:
"""List all column names"""
return [fi.field.name for fi in self._expected_fields]

def get(self, field_name: str) -> _FieldInfo:
"""Get a Field by its name

In case no field with field_name exists, the behavior depends on
the `detector.schema_sync` option:

Raises:
ValueError
"""
try:
return next(
fi for fi in self._expected_fields if fi.field.name == field_name
)
except StopIteration:
raise ValueError(f"{field_name} is missing from expected fields")

def get_copies(self) -> List[Field]:
"""Return field copies"""
return [fi.field.to_copy() for fi in self._expected_fields]

def rm(self, field_name: str):
try:
i = self.ls().index(field_name)
del self._expected_fields[i]
except ValueError:
raise ValueError(f"'{field_name}' is not in fields data")
Loading
Loading