Skip to content

Commit

Permalink
Merge pull request #356 add comments about parallel calls of encoders…
Browse files Browse the repository at this point in the history
…/decoders
  • Loading branch information
rekby authored Oct 5, 2023
2 parents 5da10be + 6177adb commit 8aad508
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added comments about calls encoders/decoders in parallel from multiply threads

## 3.5.1 ##
* Fixed access to connection if connection cannot be found by node id

Expand Down
18 changes: 16 additions & 2 deletions ydb/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ def reader(
consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicReaderAsyncIO:

if not decoder_executor:
Expand All @@ -194,8 +197,12 @@ def writer(
auto_seqno: bool = True,
auto_created_at: bool = True,
codec: Optional[TopicCodec] = None, # default mean auto-select
# encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel.
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
# custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None,
) -> TopicWriterAsyncIO:
args = locals().copy()
del args["self"]
Expand Down Expand Up @@ -319,7 +326,10 @@ def reader(
consumer: str,
buffer_size_bytes: int = 50 * 1024 * 1024,
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicReader:
if not decoder_executor:
Expand All @@ -343,7 +353,11 @@ def writer(
auto_seqno: bool = True,
auto_created_at: bool = True,
codec: Optional[TopicCodec] = None, # default mean auto-select
# encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
# the func will be called from multiply threads in parallel.
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
# custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
) -> TopicWriter:
args = locals().copy()
Expand Down

0 comments on commit 8aad508

Please sign in to comment.