Skip to content

Commit

Permalink
feat: encrypted database
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Dec 23, 2024
1 parent c955eb9 commit f9d7cdd
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 0 deletions.
50 changes: 50 additions & 0 deletions json_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
jsonify_recursively, get_key_recursively, get_key_recursively_fuzzy, \
get_value_recursively_fuzzy, get_value_recursively
from json_database.xdg_utils import xdg_cache_home, xdg_data_home, xdg_config_home
from json_database.crypto import decrypt_from_json, encrypt_as_json

LOG = logging.getLogger("JsonDatabase")
LOG.setLevel("INFO")
Expand Down Expand Up @@ -104,6 +105,39 @@ def __exit__(self, _type, value, traceback):
raise SessionError


class EncryptedJsonStorage(JsonStorage):
"""persistent python dict, stored AES encrypted to file"""

def __init__(self, encrypt_key: str, path: str, disable_lock=False):
self.encrypt_key = encrypt_key
super().__init__(path, disable_lock)

def load_local(self, path):
"""
Load local json file into self.
Args:
path (str): file to load
"""
super().load_local(path)
# decrypt after load
decrypted = decrypt_from_json(self.encrypt_key, dict(self))
self.clear()
self.update(decrypted)

def store(self, path=None):
"""
store the json db locally.
"""
decrypted = dict(self)
encrypted = encrypt_as_json(self.encrypt_key, dict(self))
self.clear()
self.update(encrypted) # encrypt before storage
super().store()
self.clear()
self.update(decrypted) # keep it decrypted in memory


class JsonDatabase(dict):
""" searchable persistent dict """

Expand Down Expand Up @@ -295,6 +329,22 @@ def __init__(self,
super().__init__(path, disable_lock=disable_lock)


class EncryptedJsonStorageXDG(EncryptedJsonStorage):
""" xdg respectful persistent dicts """

def __init__(self,
encrypt_key: str,
name: str,
xdg_folder=xdg_data_home(),
disable_lock=False,
subfolder="json_database",
extension="ejson"):
self.name = name
path = join(xdg_folder, subfolder, f"{name}.{extension}")
super().__init__(encrypt_key=encrypt_key, path=path,
disable_lock=disable_lock)


class JsonDatabaseXDG(JsonDatabase):
""" xdg respectful json database """

Expand Down
112 changes: 112 additions & 0 deletions json_database/crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
import zlib
from binascii import hexlify
from binascii import unhexlify
from json_database.exceptions import EncryptionKeyError, DecryptionKeyError

try:
# pycryptodomex
from Cryptodome.Cipher import AES
except ImportError:
# pycrypto + pycryptodome
try:
from Crypto.Cipher import AES
except:
AES = None


def encrypt(key, text, nonce=None):
if AES is None:
raise ImportError("run pip install pycryptodomex")
if not isinstance(text, bytes):
text = bytes(text, encoding="utf-8")
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
ciphertext, tag = cipher.encrypt_and_digest(text)
return ciphertext, tag, cipher.nonce


def decrypt(key, ciphertext, tag, nonce):
if AES is None:
raise ImportError("run pip install pycryptodomex")
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
cipher = AES.new(key, AES.MODE_GCM, nonce)
data = cipher.decrypt_and_verify(ciphertext, tag)
text = data.decode(encoding="utf-8")
return text


def encrypt_as_json(key, data):
if isinstance(data, dict):
data = json.dumps(data)
if len(key) > 16:
key = key[0:16]
ciphertext = encrypt_bin(key, data)
nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:]
return json.dumps({"ciphertext": hexlify(ciphertext).decode('utf-8'),
"tag": hexlify(tag).decode('utf-8'),
"nonce": hexlify(nonce).decode('utf-8')})


def decrypt_from_json(key, data):
if isinstance(data, str):
data = json.loads(data)
if len(key) > 16:
key = key[0:16]
ciphertext = unhexlify(data["ciphertext"])
if data.get("tag") is None: # web crypto
ciphertext, tag = ciphertext[:-16], ciphertext[-16:]
else:
tag = unhexlify(data["tag"])
nonce = unhexlify(data["nonce"])
try:
return decrypt(key, ciphertext, tag, nonce)
except:
raise DecryptionKeyError


def encrypt_bin(key, data):
if len(key) > 16:
key = key[0:16]
try:
data = compress_payload(data)
ciphertext, tag, nonce = encrypt(key, data)
except:
raise EncryptionKeyError
return nonce + ciphertext + tag


def decrypt_bin(key, ciphertext):
if len(key) > 16:
key = key[0:16]

nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:]

try:
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
cipher = AES.new(key, AES.MODE_GCM, nonce)
data = cipher.decrypt_and_verify(ciphertext, tag)
return decompress_payload(data)
except:
raise DecryptionKeyError


def compress_payload(text):
# Compressing text
if isinstance(text, str):
decompressed = text.encode("utf-8")
else:
decompressed = text
return zlib.compress(decompressed)


def decompress_payload(compressed):
# Decompressing text
if isinstance(compressed, str):
# assume hex
compressed = unhexlify(compressed)
return zlib.decompress(compressed)

8 changes: 8 additions & 0 deletions json_database/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ class SessionError(RuntimeError):

class MatchError(ValueError):
""" could not match an item in db """


class DecryptionKeyError(KeyError):
""" Could not decrypt payload """


class EncryptionKeyError(KeyError):
""" Could not encrypt payload """

0 comments on commit f9d7cdd

Please sign in to comment.