Skip to content

Commit

Permalink
Implement detection of rotor ciphers
Browse files Browse the repository at this point in the history
In addition to the detection of ACA ciphers, this commit provides
the facilities to train, evaluate and predict 5 rotor ciphers.
These ciphers are: Enigma, M209, Purple, Sigaba and Typex.

This commit includes the following improvements as well:
- Fix plaintext dataset download
- Rework input pipeline
- Set the start method of the `multiprocessing` library
- Improve code structure
  • Loading branch information
MaikBastian committed Jun 14, 2024
1 parent bb83922 commit d0bd889
Show file tree
Hide file tree
Showing 24 changed files with 2,951 additions and 995 deletions.
12 changes: 11 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
.idea/*
venv/*
*.pyc
data/logs/
data/gutenberg_en/*
data/gutenberg_en.zip
data/validation_data/
data/rotor_ciphertexts/
*.h5
weights.zip
cipherTypeDetection/*_logs/**
cipherTypeDetection/correlations.pickle
cipherTypeDetection/run_trainings.sh
cipherTypeDetection/run_trainings.sh
.DS_Store
*~
.vscode/launch.json

data/*tensorboard_logs/
data/_parameters.txt

277 changes: 142 additions & 135 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

import cipherTypeDetection.eval as cipherEval
import cipherTypeDetection.config as config
from cipherTypeDetection.rotorDifferentiationEnsemble import RotorDifferentiationEnsemble
from cipherTypeDetection.transformer import MultiHeadSelfAttention, TransformerBlock, TokenAndPositionEmbedding
from cipherTypeDetection.ensembleModel import EnsembleModel

import pandas as pd


# init fast api
app = FastAPI()
Expand All @@ -38,22 +41,83 @@
async def startup_event():
"""The models are loaded with hardcoded names. Change in future if multiple models are available."""
model_path = "data/models"
models["Transformer"] = (tf.keras.models.load_model(os.path.join(model_path, "t96_transformer_final_100.h5"), custom_objects={
'TokenAndPositionEmbedding': TokenAndPositionEmbedding, 'MultiHeadSelfAttention': MultiHeadSelfAttention,
'TransformerBlock': TransformerBlock}), False, True)
models["FFNN"] = (tf.keras.models.load_model(os.path.join(model_path, "t128_ffnn_final_100.h5")), True, False)
models["LSTM"] = (tf.keras.models.load_model(os.path.join(model_path, "t129_lstm_final_100.h5")), False, True)
optimizer = Adam(learning_rate=config.learning_rate, beta_1=config.beta_1, beta_2=config.beta_2, epsilon=config.epsilon,
amsgrad=config.amsgrad)
models["Transformer"] = (
tf.keras.models.load_model(
os.path.join(model_path, "transformer_var_10000000.h5"),
custom_objects={
'TokenAndPositionEmbedding': TokenAndPositionEmbedding,
'TransformerBlock': TransformerBlock}),
False,
True)
models["FFNN"] = (
tf.keras.models.load_model(
os.path.join(model_path, "ffnn_var_10000000.h5")
),
True,
False)
models["LSTM"] = (
tf.keras.models.load_model(
os.path.join(model_path, "lstm_var_10000000.h5")),
False,
True)
optimizer = Adam(
learning_rate=config.learning_rate,
beta_1=config.beta_1,
beta_2=config.beta_2,
epsilon=config.epsilon,
amsgrad=config.amsgrad)
for _, item in models.items():
item[0].compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=[
"accuracy", SparseTopKCategoricalAccuracy(k=3, name="k3_accuracy")])
# TODO add this in production when having at least 32 GB RAM
with open(os.path.join(model_path, "t99_rf_final_100.h5"), "rb") as f:
item[0].compile(
optimizer=optimizer,
loss="sparse_categorical_crossentropy",
metrics=[
"accuracy",
SparseTopKCategoricalAccuracy(k=3, name="k3_accuracy")])
with open(os.path.join(model_path, "rf_var_1000000.h5"), "rb") as f:
models["RF"] = (pickle.load(f), True, False)
with open(os.path.join(model_path, "t128_nb_final_100.h5"), "rb") as f:
with open(os.path.join(model_path, "nb_var_10000000.h5"), "rb") as f:
models["NB"] = (pickle.load(f), True, False)

with open(os.path.join(model_path, "svm_rotor_only_1000_16000.h5"), "rb") as f:
models["Rotor-SVM"] = pickle.load(f)

class ArchitectureError(Exception):
def __init__(self, response):
Exception.__init__(self)
self.response = response

def validate_architectures(architectures):
max_architectures = len(models.keys())
# Warn if the provided number of architectures is out of the expected bounds
if not 0 < len(architectures) <= max_architectures:
response = JSONResponse(
{
"success": False,
"payload": None,
"error_msg": "The number of architectures must be between 1 and %d." %
max_architectures},
status_code=status.HTTP_400_BAD_REQUEST)
raise ArchitectureError(response)

# Warn about duplicate architectures
if len(set(architectures)) != len(architectures):
response = JSONResponse({"success": False,
"payload": None,
"error_msg": "Multiple architectures of the same type are not "
"allowed!"},
status_code=status.HTTP_400_BAD_REQUEST)
raise ArchitectureError(response)

# Check if the provided architectures are known
for architecture in architectures:
if architecture not in models.keys():
response = JSONResponse(
{
"success": False,
"payload": None,
"error_msg": "The architecture '%s' does not exist!" %
architecture},
status_code=status.HTTP_400_BAD_REQUEST)
raise ArchitectureError(response)

class APIResponse(BaseModel):
"""Define api response model."""
Expand All @@ -65,135 +129,78 @@ class APIResponse(BaseModel):
@app.exception_handler(Exception)
async def exception_handler(request, exc):
"""Define exception response format."""
return JSONResponse({"success": False, "payload": None, "error_msg": str(exc)}, status_code=status.HTTP_400_BAD_REQUEST)
# TODO: does not work (exceptions are still thrown), specific exceptions work -- todo: FIX ;D

# todo: custom error handling for unified error responses and no closing on error
# https://fastapi.tiangolo.com/tutorial/handling-errors/#override-the-default-exception-handlers
return JSONResponse({"success": False, "payload": None, "error_msg": str(
exc)}, status_code=status.HTTP_400_BAD_REQUEST)


@app.get("/get_available_architectures", response_model=APIResponse)
async def get_available_architectures():
return {"success": True, "payload": list(models.keys())}


visible_models = [model for model in list(models.keys()) if model is not "Rotor-SVM"]
return {"success": True, "payload": visible_models}
@app.get("/evaluate/single_line/ciphertext", response_model=APIResponse)
async def evaluate_single_line_ciphertext(ciphertext: str, architecture: List[str] = Query([])):
max_architectures = len(models.keys())
if not 0 < len(architecture) <= max_architectures:
return JSONResponse({"success": False, "payload": None, "error_msg": "The number of architectures must be between 1 and %d." % max_architectures},
status_code=status.HTTP_400_BAD_REQUEST)
cipher_types = get_cipher_types_to_use(["aca"]) # aca stands for all implemented ciphers
if len(architecture) == 1:
if architecture[0] not in models.keys():
return JSONResponse({"success": False, "payload": None, "error_msg": "The architecture '%s' does not exist!" % architecture[0]},
status_code=status.HTTP_400_BAD_REQUEST)
model, feature_engineering, pad_input = models[architecture[0]]
cipherEval.architecture = architecture[0]
config.FEATURE_ENGINEERING = feature_engineering
config.PAD_INPUT = pad_input
else:
if len(set(architecture)) != len(architecture):
return JSONResponse({"success": False, "payload": None, "error_msg": "Multiple architectures of the same type are not "
"allowed!"}, status_code=status.HTTP_400_BAD_REQUEST)
cipher_indices = []
for cipher_type in cipher_types:
cipher_indices.append(config.CIPHER_TYPES.index(cipher_type))
model_list = []
architecture_list = []
for val in architecture:
if val not in models.keys():
return JSONResponse({"success": False, "payload": None, "error_msg": "The architecture '%s' does not exist!" % val},
status_code=status.HTTP_400_BAD_REQUEST)
model_list.append(models[val][0])
architecture_list.append(val)
cipherEval.architecture = "Ensemble"
model = EnsembleModel(model_list, architecture_list, "weighted", cipher_indices)

rotor_only_model = models["Rotor-SVM"]

# use plural inside function
architectures = architecture
try:
validate_architectures(architectures)
except ArchitectureError as error:
return error.response

try:
# call prediction method
result = cipherEval.predict_single_line(SimpleNamespace(
ciphertext = ciphertext,
file = None, # todo: needs fileupload first (either set ciphertext OR file, never both)
ciphers = cipher_types,
batch_size = 128,
verbose = False
), model)
# only use architectures that can predict aca ciphers
architectures = [architecture
for architecture in architectures
if architecture in ("Transformer", "FFNN", "LSTM", "RF", "NB")]

aca_cipher_types = list(range(56))
rotor_cipher_types = list(range(56, 61))

if len(architectures) == 0:
return {}
elif len(architectures) == 1:
architecture = architectures[0]
model, feature_engineering, pad_input = models[architecture]
config.FEATURE_ENGINEERING = feature_engineering
config.PAD_INPUT = pad_input
else:
model_list = []
architecture_list = []
for architecture in architectures:
model_list.append(models[architecture][0])
architecture_list.append(architecture)
architecture = "Ensemble"
model = EnsembleModel(
model_list,
architecture_list,
"weighted",
aca_cipher_types + rotor_cipher_types)

# Embed all models in RotorDifferentiationEnsemble to improve recognition of rotor ciphers
model = RotorDifferentiationEnsemble(architecture, model, rotor_only_model)
architecture = "Ensemble"

all_cipher_types = aca_cipher_types + rotor_cipher_types
cipher_names = [config.CIPHER_TYPES[cipher_index] for cipher_index in all_cipher_types]

eval_args = SimpleNamespace(
ciphertext=ciphertext,
# todo: needs fileupload first (either set ciphertext OR file, never both)
file=None,
ciphers=cipher_names,
batch_size=128,
verbose=False
)
prediction = cipherEval.predict_single_line(eval_args, model, architecture)

return {"success": True, "payload": prediction}
except BaseException as e:
# only use these lines for debugging. Never in production environment due to security reasons!
# only use these lines for debugging. Never in production environment due
# to security reasons!
import traceback
traceback.print_exc()
return JSONResponse({"success": False, "payload": None, "error_msg": repr(e)}, status_code=500)
# return JSONResponse(None, status_code=500)
return {"success": True, "payload": result}


###########################################################

def get_cipher_types_to_use(cipher_types):
if config.MTC3 in cipher_types:
del cipher_types[cipher_types.index(config.MTC3)]
cipher_types.append(config.CIPHER_TYPES[0])
cipher_types.append(config.CIPHER_TYPES[1])
cipher_types.append(config.CIPHER_TYPES[2])
cipher_types.append(config.CIPHER_TYPES[3])
cipher_types.append(config.CIPHER_TYPES[4])
if config.ACA in cipher_types:
del cipher_types[cipher_types.index(config.ACA)]
cipher_types.append(config.CIPHER_TYPES[0])
cipher_types.append(config.CIPHER_TYPES[1])
cipher_types.append(config.CIPHER_TYPES[2])
cipher_types.append(config.CIPHER_TYPES[3])
cipher_types.append(config.CIPHER_TYPES[4])
cipher_types.append(config.CIPHER_TYPES[5])
cipher_types.append(config.CIPHER_TYPES[6])
cipher_types.append(config.CIPHER_TYPES[7])
cipher_types.append(config.CIPHER_TYPES[8])
cipher_types.append(config.CIPHER_TYPES[9])
cipher_types.append(config.CIPHER_TYPES[10])
cipher_types.append(config.CIPHER_TYPES[11])
cipher_types.append(config.CIPHER_TYPES[12])
cipher_types.append(config.CIPHER_TYPES[13])
cipher_types.append(config.CIPHER_TYPES[14])
cipher_types.append(config.CIPHER_TYPES[15])
cipher_types.append(config.CIPHER_TYPES[16])
cipher_types.append(config.CIPHER_TYPES[17])
cipher_types.append(config.CIPHER_TYPES[18])
cipher_types.append(config.CIPHER_TYPES[19])
cipher_types.append(config.CIPHER_TYPES[20])
cipher_types.append(config.CIPHER_TYPES[21])
cipher_types.append(config.CIPHER_TYPES[22])
cipher_types.append(config.CIPHER_TYPES[23])
cipher_types.append(config.CIPHER_TYPES[24])
cipher_types.append(config.CIPHER_TYPES[25])
cipher_types.append(config.CIPHER_TYPES[26])
cipher_types.append(config.CIPHER_TYPES[27])
cipher_types.append(config.CIPHER_TYPES[28])
cipher_types.append(config.CIPHER_TYPES[29])
cipher_types.append(config.CIPHER_TYPES[30])
cipher_types.append(config.CIPHER_TYPES[31])
cipher_types.append(config.CIPHER_TYPES[32])
cipher_types.append(config.CIPHER_TYPES[33])
cipher_types.append(config.CIPHER_TYPES[34])
cipher_types.append(config.CIPHER_TYPES[35])
cipher_types.append(config.CIPHER_TYPES[36])
cipher_types.append(config.CIPHER_TYPES[37])
cipher_types.append(config.CIPHER_TYPES[38])
cipher_types.append(config.CIPHER_TYPES[39])
cipher_types.append(config.CIPHER_TYPES[40])
cipher_types.append(config.CIPHER_TYPES[41])
cipher_types.append(config.CIPHER_TYPES[42])
cipher_types.append(config.CIPHER_TYPES[43])
cipher_types.append(config.CIPHER_TYPES[44])
cipher_types.append(config.CIPHER_TYPES[45])
cipher_types.append(config.CIPHER_TYPES[46])
cipher_types.append(config.CIPHER_TYPES[47])
cipher_types.append(config.CIPHER_TYPES[48])
cipher_types.append(config.CIPHER_TYPES[49])
cipher_types.append(config.CIPHER_TYPES[50])
cipher_types.append(config.CIPHER_TYPES[51])
cipher_types.append(config.CIPHER_TYPES[52])
cipher_types.append(config.CIPHER_TYPES[53])
cipher_types.append(config.CIPHER_TYPES[54])
cipher_types.append(config.CIPHER_TYPES[55])
return cipher_types
return JSONResponse({"success": False, "payload": None,
"error_msg": repr(e)}, status_code=500)

7 changes: 7 additions & 0 deletions cipherImplementations/cadenus.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,10 @@ def decrypt(self, ciphertext, key):
for i in range(len(pt_table)):
plaintext += pt_table[i]
return np.array(plaintext)

@property
def needs_plaintext_of_specific_length(self):
return True

def truncate_plaintext(self, plaintext, key_length):
return plaintext[:key_length * 25]
11 changes: 11 additions & 0 deletions cipherImplementations/cipher.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,14 @@ def filter(self, plaintext, keep_unknown_symbols=False):
if not keep_unknown_symbols:
return remove_unknown_symbols(plaintext, self.alphabet)
return plaintext

@property
def needs_plaintext_of_specific_length(self):
"""Indicates whether the cipher needs a plaintext of a specific length. Should
be overridden by subclasses that need their plaintext input to be of specific length."""
return False

def truncate_plaintext(self, plaintext, key_length):
"""Returns the plaintext truncated to a specific length, possibly depending
on `key_length`. Should be overridden by subclasses."""
raise Exception('Interface method called')
10 changes: 10 additions & 0 deletions cipherImplementations/headlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,13 @@ def decrypt(self, ciphertext, key):
plaintext.append(space_index)
cnt += 1
return np.array(plaintext)

@property
def needs_plaintext_of_specific_length(self):
return True

def truncate_plaintext(self, plaintext, key_length):
space_index = self.alphabet.index(b' ')
spaces = np.count_nonzero(plaintext == space_index)
remainder = (len(plaintext) - spaces) % 5
return plaintext[:len(plaintext) - remainder]
7 changes: 7 additions & 0 deletions cipherImplementations/nihilistTransposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ def decrypt(self, ciphertext, key):
plaintext[position + np.where(key == i)[0][0]] = columnar_ct[cntr]
cntr += 1
return np.array(plaintext)

@property
def needs_plaintext_of_specific_length(self):
return True

def truncate_plaintext(self, plaintext, key_length):
return plaintext[:key_length * key_length]
8 changes: 8 additions & 0 deletions cipherImplementations/routeTransposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,11 @@ def encrypt(self, plaintext, key):

def decrypt(self, ciphertext, key):
raise Exception('not implemented yet...')

@property
def needs_plaintext_of_specific_length(self):
return True

def truncate_plaintext(self, plaintext, key_length):
remainder = len(plaintext) % key_length
return plaintext[:len(plaintext) - remainder]
Loading

0 comments on commit d0bd889

Please sign in to comment.