Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add processing signature #82

Merged
merged 6 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cshelve/_azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# - The Azure SDK for Python: `pip install cshelve[azure-blob]`
# - If you want to use passwordless authentication, you also need to install the Azure CLI: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli
"""
import logging
import functools
import io
import os
Expand Down
7 changes: 4 additions & 3 deletions cshelve/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from logging import Logger
from typing import Dict

from ._data_processing import DataProcessing
from ._data_processing import DataProcessing, SIGNATURES
from .exceptions import UnknownCompressionAlgorithmError


ALGORITHMS_NAME_KEY = "algorithm"
COMPRESSION_LEVEL_KEY = "level"
DATA_PROCESSING_NAME = SIGNATURES["COMPRESSION"]


def configure(
Expand All @@ -36,8 +37,8 @@ def configure(
if compression := supported_algorithms.get(algorithm):
logger.debug(f"Configuring compression algorithm: {algorithm}")
compression_fct, decompression_fct = compression(config)
data_processing.add_pre_processing(compression_fct)
data_processing.add_post_processing(decompression_fct)
data_processing.add_pre_processing(DATA_PROCESSING_NAME, compression_fct)
data_processing.add_post_processing(DATA_PROCESSING_NAME, decompression_fct)
logger.debug(f"Compression algorithm {algorithm} configured.")
else:
raise UnknownCompressionAlgorithmError(
Expand Down
75 changes: 52 additions & 23 deletions cshelve/_data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,38 @@

Examples:
>>> dp = DataProcessing()
>>> dp.add_pre_processing(lambda x: x + b'1')
>>> dp.add_post_processing(lambda x: x + b'2')
>>> dp.add_pre_processing('add_1', lambda x: x + b'1')
>>> dp.add_post_processing('add_2', lambda x: x + b'2')
>>> pre_processed = dp.apply_pre_processing(b'0')
>>> pre_processed
b'01'
>>> post_processed = dp.apply_post_processing(pre_processed)
>>> post_processed
b'012'
"""
from typing import Callable, Optional, List
from collections import namedtuple
from typing import Callable, List


_Process = namedtuple(
"_Process",
["binary_signature", "function"],
)


# Algorithm signatures to applied to the data.
# Used with a XOR to ensure that the data is processed correctly.
SIGNATURES = {"COMPRESSION": 0b00000001, "ENCRYPTION": 0b00000010}


class DataProcessing:
"""
A class to handle pre-processing and post-processing of data.
"""

def __init__(
self,
pre_processing: Optional[List[Callable[[bytes], bytes]]] = None,
post_processing: Optional[List[Callable[[bytes], bytes]]] = None,
):
def __init__(self):
"""
Initializes the DataProcessing class with optional pre-processing and post-processing lists.
Initializes the DataProcessing class.

Examples:
>>> dp = DataProcessing()
Expand All @@ -35,22 +43,29 @@ def __init__(
>>> dp.post_processing
[]
"""
self.pre_processing = pre_processing if pre_processing is not None else []
self.post_processing = post_processing if post_processing is not None else []
self.pre_processing: List[_Process] = []
self.post_processing: List[_Process] = []

def add_pre_processing(self, func: Callable[[bytes], bytes]) -> None:
def add_pre_processing(
self, binary_signature: bytes, func: Callable[[bytes], bytes]
) -> None:
"""
Adds a function to the pre-processing list.

Examples:
>>> dp = DataProcessing()
>>> dp.add_pre_processing(lambda x: x + 1)
>>> signature = 0b00000001 # Please use the SIGNATURES dict
>>> dp.add_pre_processing(signature, lambda x: x + 1)
>>> len(dp.pre_processing)
1
"""
self.pre_processing.append(func)
self.pre_processing.append(
_Process(binary_signature=binary_signature, function=func)
)

def add_post_processing(self, func: Callable[[bytes], bytes]) -> None:
def add_post_processing(
self, binary_signature: bytes, func: Callable[[bytes], bytes]
) -> None:
"""
Adds a function to the post-processing list.

Expand All @@ -59,11 +74,14 @@ def add_post_processing(self, func: Callable[[bytes], bytes]) -> None:

Examples:
>>> dp = DataProcessing()
>>> dp.add_post_processing(lambda x: x * 2)
>>> signature = 0b00000001 # Please use the SIGNATURES dict
>>> dp.add_post_processing(signature, lambda x: x * 2)
>>> len(dp.post_processing)
1
"""
self.post_processing.append(func)
self.post_processing.append(
_Process(binary_signature=binary_signature, function=func)
)

def apply_pre_processing(self, data: bytes) -> bytes:
"""
Expand All @@ -76,12 +94,16 @@ def apply_pre_processing(self, data: bytes) -> bytes:
The processed data.

Examples:
>>> dp = DataProcessing(pre_processing=[lambda x: x + 1, lambda x: x * 2])
>>> dp = DataProcessing()
>>> signature_add = 0b00000001 # Please use the SIGNATURES dict
>>> signature_mult = 0b00000010 # Please use the SIGNATURES dict
>>> dp.add_pre_processing(SIGNATURES["COMPRESSION"], lambda x: x + 1)
>>> dp.add_pre_processing('mult_by_2', lambda x: x * 2)
>>> dp.apply_pre_processing(1)
4
"""
for func in self.pre_processing:
data = func(data)
for p in self.pre_processing:
data = p.function(data)
return data

def apply_post_processing(self, data: bytes) -> bytes:
Expand All @@ -95,10 +117,17 @@ def apply_post_processing(self, data: bytes) -> bytes:
The processed data.

Examples:
>>> dp = DataProcessing(post_processing=[lambda x: x / 2, lambda x: x - 1])
>>> dp = DataProcessing()
>>> signature_minus = 0b00000001 # Please use the SIGNATURES dict
>>> signature_div = 0b00000010 # Please use the SIGNATURES dict
>>> dp.add_post_processing(signature_div, lambda x: x / 2)
>>> dp.add_post_processing(signature_minus, lambda x: x - 1)
>>> dp.apply_post_processing(4)
1.0
"""
for func in self.post_processing:
data = func(data)
for p in self.post_processing:
data = p.function(data)
return data

def pre_processing_signature(self):
return [p.binary_signature for p in self.pre_processing]
7 changes: 4 additions & 3 deletions cshelve/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from logging import Logger
from typing import Dict

from ._data_processing import DataProcessing
from ._data_processing import DataProcessing, SIGNATURES
from .exceptions import (
UnknownEncryptionAlgorithmError,
MissingEncryptionKeyError,
Expand All @@ -21,6 +21,7 @@
# User can provide the key via the INI file or environment variable.
KEY_KEY = "key"
ENVIRONMENT_KEY = "environment_key"
DATA_PROCESSING_NAME = SIGNATURES["ENCRYPTION"]


# Normally the 'tag' uses 16 bytes and the 'nonce' 12 bytes.
Expand Down Expand Up @@ -60,8 +61,8 @@ def configure(
fct, algo_signature = supported_algorithms[algorithm]
logger.debug(f"Configuring encryption algorithm: {algorithm}")
crypt_fct, decrypt_fct = fct(algo_signature, config, key)
data_processing.add_pre_processing(crypt_fct)
data_processing.add_post_processing(decrypt_fct)
data_processing.add_pre_processing(DATA_PROCESSING_NAME, crypt_fct)
data_processing.add_post_processing(DATA_PROCESSING_NAME, decrypt_fct)
logger.debug(f"Encryption algorithm {algorithm} configured.")
else:
raise UnknownEncryptionAlgorithmError(
Expand Down
13 changes: 8 additions & 5 deletions tests/units/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def test_default_zlib_config(data_processing):

assert len(data_processing.post_processing) == 1
assert len(data_processing.pre_processing) == 1
assert data_processing.pre_processing[0].func == zlib.compress
assert data_processing.post_processing[0].func == zlib.decompress
assert data_processing.pre_processing[0].function.func == zlib.compress
assert data_processing.post_processing[0].function.func == zlib.decompress
assert (
data_processing.pre_processing[0].keywords["level"]
data_processing.pre_processing[0].function.keywords["level"]
== zlib.Z_DEFAULT_COMPRESSION
)
assert data_processing.post_processing[0].keywords == {}
assert data_processing.post_processing[0].function.keywords == {}

first_pre_processing_applied = id(data_processing.pre_processing[0])
first_post_processing_applied = id(data_processing.post_processing[0])
Expand All @@ -72,7 +72,10 @@ def test_zlib_level(data_processing):

configure(logger, data_processing, config)

assert data_processing.pre_processing[0].keywords["level"] == compression_level
assert (
data_processing.pre_processing[0].function.keywords["level"]
== compression_level
)


def test_unknowned_algorithm(data_processing):
Expand Down
29 changes: 19 additions & 10 deletions tests/units/test_data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,45 @@


def test_add_pre_processing():
binary_signature = 0b00000001
dp = DataProcessing()
dp.add_pre_processing(lambda x: x + 1)
dp.add_pre_processing(binary_signature, lambda x: x + 1)
assert len(dp.pre_processing) == 1
assert dp.pre_processing[0](1) == 2
assert dp.pre_processing[0].binary_signature == binary_signature
assert dp.pre_processing[0].function(1) == 2


def test_add_post_processing():
binary_signature = 0b00000010
dp = DataProcessing()
dp.add_post_processing(lambda x: x * 2)
dp.add_post_processing(binary_signature, lambda x: x * 2)
assert len(dp.post_processing) == 1
assert dp.post_processing[0](2) == 4
assert dp.post_processing[0].binary_signature == binary_signature
assert dp.post_processing[0].function(2) == 4


def test_apply_pre_processing():
dp = DataProcessing(pre_processing=[lambda x: x + 1, lambda x: x * 2])
dp = DataProcessing()
dp.add_pre_processing(0b00000001, lambda x: x + 1)
dp.add_pre_processing(0b00000010, lambda x: x * 2)
result = dp.apply_pre_processing(1)
assert result == 4 # (1 + 1) * 2


def test_apply_post_processing():
dp = DataProcessing(post_processing=[lambda x: x / 2, lambda x: x - 1])
dp = DataProcessing()
dp.add_post_processing(0b00000001, lambda x: x / 2)
dp.add_post_processing(0b00000010, lambda x: x - 1)
result = dp.apply_post_processing(4)
assert result == 1 # (4 / 2) - 1


def test_full_processing():
dp = DataProcessing(
pre_processing=[lambda x: x + 1, lambda x: x * 2],
post_processing=[lambda x: x / 2, lambda x: x - 1],
)
dp = DataProcessing()
dp.add_pre_processing(0b00000001, lambda x: x + 1)
dp.add_pre_processing(0b00000010, lambda x: x * 2)
dp.add_post_processing(0b00000100, lambda x: x / 2)
dp.add_post_processing(0b00001000, lambda x: x - 1)

data = 1
pre_processed_data = dp.apply_pre_processing(data)
Expand Down
Loading