-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
164 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
class PipelineContext(object): | ||
""" Context of parameters utilized in pipeline | ||
""" | ||
|
||
def __init__(self, d): | ||
assert(isinstance(d, dict)) | ||
self._d = d | ||
|
||
def __provide(self, param): | ||
if param not in self._d: | ||
raise Exception(f"Key `{param}` is not in dictionary.\n{self._d}") | ||
return self._d[param] | ||
|
||
# region public | ||
|
||
def provide(self, param): | ||
return self.__provide(param) | ||
|
||
def provide_or_none(self, param): | ||
return self.__provide(param) if param in self._d else None | ||
|
||
def update(self, param, value, is_new_key=False): | ||
if is_new_key and param in self._d: | ||
raise Exception(f"Key `{param}` is already presented in pipeline context dictionary.") | ||
self._d[param] = value | ||
|
||
# endregion | ||
|
||
# region base methods | ||
|
||
def __contains__(self, item): | ||
return item in self._d | ||
|
||
# endregion |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from bulk_translate.src.pipeline.context import PipelineContext | ||
|
||
|
||
class BasePipelineItem(object): | ||
""" Single pipeline item that might be instantiated and embedded into pipeline. | ||
""" | ||
|
||
def __init__(self, src_key="result", result_key="result", src_func=None): | ||
assert(isinstance(src_key, str) or src_key is None) | ||
assert(callable(src_func) or src_func is None) | ||
self.__src_key = src_key | ||
self._src_func = src_func | ||
self.__result_key = result_key | ||
|
||
@property | ||
def ResultKey(self): | ||
return self.__result_key | ||
|
||
@property | ||
def SupportBatching(self): | ||
""" By default pipeline item is not designed for batching. | ||
""" | ||
return False | ||
|
||
def get_source(self, src_ctx, call_func=True, force_key=None): | ||
""" Extract input element for processing. | ||
""" | ||
assert(isinstance(src_ctx, PipelineContext)) | ||
|
||
# If there is no information about key, then we consider absence of the source. | ||
if self.__src_key is None: | ||
return None | ||
|
||
# Extracting actual source. | ||
src_data = src_ctx.provide(self.__src_key if force_key is None else force_key) | ||
if self._src_func is not None and call_func: | ||
src_data = self._src_func(src_data) | ||
|
||
return src_data | ||
|
||
def apply_core(self, input_data, pipeline_ctx): | ||
"""By default we do nothing.""" | ||
pass | ||
|
||
def apply(self, input_data, pipeline_ctx=None): | ||
""" Performs input processing an update it for a further pipeline items. | ||
""" | ||
output_data = self.apply_core(input_data=input_data, pipeline_ctx=pipeline_ctx) | ||
return output_data if output_data is not None else input_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from bulk_translate.src.pipeline.items.base import BasePipelineItem | ||
|
||
|
||
class MapPipelineItem(BasePipelineItem): | ||
|
||
def __init__(self, map_func=None, **kwargs): | ||
assert(callable(map_func)) | ||
super(MapPipelineItem, self).__init__(**kwargs) | ||
self._map_func = map_func | ||
|
||
def apply_core(self, input_data, pipeline_ctx): | ||
return map(self._map_func, input_data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from bulk_translate.src.pipeline.context import PipelineContext | ||
from bulk_translate.src.pipeline.items.base import BasePipelineItem | ||
|
||
|
||
class BatchingPipelineLauncher: | ||
|
||
@staticmethod | ||
def run(pipeline, pipeline_ctx, src_key=None): | ||
assert(isinstance(pipeline, list)) | ||
assert(isinstance(pipeline_ctx, PipelineContext)) | ||
assert(isinstance(src_key, str) or src_key is None) | ||
|
||
for ind, item in enumerate(filter(lambda itm: itm is not None, pipeline)): | ||
assert (isinstance(item, BasePipelineItem)) | ||
|
||
# Handle the content of the batch or batch itself. | ||
content = item.get_source(pipeline_ctx, call_func=False, force_key=src_key if ind == 0 else None) | ||
handled_batch = [item._src_func(i) if item._src_func is not None else i for i in content] | ||
|
||
if item.SupportBatching: | ||
batch_result = list(item.apply(input_data=handled_batch, pipeline_ctx=pipeline_ctx)) | ||
else: | ||
batch_result = [item.apply(input_data=input_data, pipeline_ctx=pipeline_ctx) | ||
for input_data in handled_batch] | ||
|
||
pipeline_ctx.update(param=item.ResultKey, value=batch_result, is_new_key=False) | ||
|
||
return pipeline_ctx |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
class BatchIterator: | ||
|
||
def __init__(self, data_iter, batch_size, end_value=None): | ||
assert(isinstance(batch_size, int) and batch_size > 0) | ||
assert(callable(end_value) or end_value is None) | ||
self.__data_iter = data_iter | ||
self.__index = 0 | ||
self.__batch_size = batch_size | ||
self.__end_value = end_value | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): | ||
buffer = [] | ||
while True: | ||
try: | ||
data = next(self.__data_iter) | ||
except StopIteration: | ||
break | ||
buffer.append(data) | ||
if len(buffer) == self.__batch_size: | ||
break | ||
|
||
if len(buffer) > 0: | ||
self.__index += 1 | ||
return buffer | ||
|
||
if self.__end_value is None: | ||
raise StopIteration | ||
else: | ||
return self.__end_value() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1 @@ | ||
arekit>=0.25.1 | ||
source_iter>=0.24.2 |