Skip to content

Commit

Permalink
refactor(/class/eventManagement/~): split in smaller fn for transacti…
Browse files Browse the repository at this point in the history
…on backup, improved type usage (#278)
  • Loading branch information
Rossb0b authored Aug 8, 2024
1 parent 0d1a1e0 commit 7f92b56
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 85 deletions.
227 changes: 149 additions & 78 deletions src/class/eventManagement/dispatcher/transaction-handler.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
import { EventsHandler } from "./events.class.js";
import type {
DispatcherChannelMessages,
DispatcherTransactionMetadata,
DistributedEventMessage,
GenericEvent,
IncomerChannelMessages,
PartialLogger,
Expand Down Expand Up @@ -103,6 +105,29 @@ export type TransactionHandlerOptions<T extends GenericEvent = GenericEvent> = {
idleTime?: number;
};

export interface RedistributeUnresolvedSpreadTransactionOptions {
backupIncomerTransaction: Transaction<"incomer">;
isoListener: RegisteredIncomer,
relatedDispatcherTransactionId: string,
backupTransactionId: string;
}

export interface RedistributeUnresolvedSpreadTransactionResponse<T extends GenericEvent = GenericEvent> {
dispatcherTransactionUUID: string;
event: Omit<DistributedEventMessage, "redisMetadata"> & {
redisMetadata: Omit<DispatcherTransactionMetadata, "iteration" | "transactionId">
};
redisMetadata: DispatchEventOptions<T>["redisMetadata"];
}

export interface RedistributeResolvedSpreadTransactionOptions {
isoListener: RegisteredIncomer,
backupIncomerTransaction: Transaction<"incomer">;
backupTransactionId: string;
relatedDispatcherTransaction: Transaction<"dispatcher">,
relatedDispatcherTransactionId: string;
}

export class TransactionHandler<T extends GenericEvent = GenericEvent> {
readonly prefix: string;
readonly formattedPrefix: string;
Expand Down Expand Up @@ -462,54 +487,40 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
)("Main transaction redistributed to an Incomer"));
}

private async handleBackupIncomerTransactions(options: ResolveTransactions) {
private async handleBackupIncomerTransactions(options: ResolveTransactions): Promise<ResolveTransactions> {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];

for (const [backupTransactionId, backupIncomerTransaction] of backupIncomerTransactions.entries()) {
if (backupIncomerTransaction.redisMetadata.mainTransaction) {
const isoPublisherIncomer = findISOIncomer({
const isoPublisher = findISOIncomer({
incomers: [...incomers.values()],
incomerName: backupIncomerTransaction.redisMetadata.incomerName,
eventName: backupIncomerTransaction.name,
key: "eventsCast"
});

if (!isoPublisherIncomer) {
if (!isoPublisher) {
continue;
}

const concernedIncomerStore = new TransactionStore({
prefix: `${isoPublisherIncomer.prefix ? `${isoPublisherIncomer.prefix}-` : ""}${isoPublisherIncomer.providedUUID}`,
instance: "incomer"
});

toResolve.push(
concernedIncomerStore.setTransaction({
...backupIncomerTransaction,
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: isoPublisherIncomer.providedUUID
}
}, backupTransactionId),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
);
await this.redistributeMainTransaction(isoPublisher, backupIncomerTransaction, backupTransactionId);

backupIncomerTransactions.delete(backupTransactionId);

continue;
}

if (backupIncomerTransaction.redisMetadata.relatedTransaction) {
const isoListenerIncomer = findISOIncomer({
const isoListener = findISOIncomer({
incomers: [...incomers.values()],
incomerName: backupIncomerTransaction.redisMetadata.incomerName,
eventName: backupIncomerTransaction.name,
key: "eventsSubscribe"
});

if (!isoListenerIncomer) {
if (!isoListener) {
continue;
}

Expand All @@ -523,82 +534,39 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
}

if (!backupIncomerTransaction.redisMetadata.resolved) {
const { providedUUID, prefix } = isoListenerIncomer;

const concernedIncomerChannel = this.incomerChannelHandler.get(providedUUID) ??
this.incomerChannelHandler.set({ uuid: providedUUID, prefix });

const dispatcherTransactionUUID = randomUUID();
const event = {
...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"],
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: this.privateUUID,
to: isoListenerIncomer.providedUUID
}
} as any;

const redisMetadata = {
mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction,
relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction,
eventTransactionId: null,
resolved: backupIncomerTransaction.redisMetadata.resolved
};

toResolve.push([
this.eventsHandler.dispatch({
channel: concernedIncomerChannel,
store: this.dispatcherTransactionStore,
redisMetadata,
event,
dispatcherTransactionUUID
}),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId),
this.dispatcherTransactionStore.deleteTransaction(relatedDispatcherTransactionId)
]);
const { dispatcherTransactionUUID, event, redisMetadata } = await this.redistributeUnresolvedSpreadTransaction({
backupIncomerTransaction,
isoListener,
relatedDispatcherTransactionId,
backupTransactionId
});

dispatcherTransactions.set(dispatcherTransactionUUID, {
...event,
redisMetadata: {
...event.redisMetadata,
...redisMetadata
}
});
} as any);
backupIncomerTransactions.delete(backupTransactionId);
dispatcherTransactions.delete(relatedDispatcherTransactionId);

continue;
}

const concernedIncomerStore = new TransactionStore({
prefix: `${isoListenerIncomer.prefix ? `${isoListenerIncomer.prefix}-` : ""}${isoListenerIncomer.providedUUID}`,
instance: "incomer"
await this.redistributeResolvedSpreadTransaction({
isoListener,
backupIncomerTransaction,
backupTransactionId,
relatedDispatcherTransaction,
relatedDispatcherTransactionId
});

toResolve.push(
concernedIncomerStore.setTransaction({
...backupIncomerTransaction,
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: this.privateUUID,
to: isoListenerIncomer.providedUUID
}
}, backupIncomerTransaction.redisMetadata.transactionId),
this.dispatcherTransactionStore.updateTransaction(relatedDispatcherTransactionId, {
...relatedDispatcherTransaction,
redisMetadata: {
...relatedDispatcherTransaction.redisMetadata,
to: isoListenerIncomer.providedUUID
}
} as Transaction<"dispatcher">),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
);

dispatcherTransactions.set(relatedDispatcherTransactionId, {
...relatedDispatcherTransaction,
redisMetadata: {
...relatedDispatcherTransaction.redisMetadata,
to: isoListenerIncomer.providedUUID
to: isoListener.providedUUID
}
} as Transaction<"dispatcher">);
}
Expand All @@ -609,7 +577,110 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
return { incomers, backupIncomerTransactions, dispatcherTransactions };
}

private async resolveSpreadTransactions(options: ResolveTransactions) {
private async redistributeMainTransaction(
isoPublisher: RegisteredIncomer,
backupIncomerTransaction: Transaction<"incomer">,
backupTransactionId: string
): Promise<void> {
const concernedIncomerStore = new TransactionStore({
prefix: `${isoPublisher.prefix ? `${isoPublisher.prefix}-` : ""}${isoPublisher.providedUUID}`,
instance: "incomer"
});

await Promise.all([
concernedIncomerStore.setTransaction({
...backupIncomerTransaction,
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: isoPublisher.providedUUID
}
}, backupTransactionId),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
]);
}

private async redistributeUnresolvedSpreadTransaction(options: RedistributeUnresolvedSpreadTransactionOptions):
Promise<RedistributeUnresolvedSpreadTransactionResponse<T>> {
const { backupIncomerTransaction, isoListener, relatedDispatcherTransactionId, backupTransactionId } = options;
const { providedUUID, prefix } = isoListener;

const relatedChannel = this.incomerChannelHandler.get(providedUUID) ??
this.incomerChannelHandler.set({ uuid: providedUUID, prefix });

const dispatcherTransactionUUID = randomUUID();
const event: Omit<DistributedEventMessage, "redisMetadata"> & {
redisMetadata: Omit<DispatcherTransactionMetadata, "iteration" | "transactionId">
} = {
...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"],
redisMetadata: {
...backupIncomerTransaction.redisMetadata as
IncomerHandlerTransaction["incomerDistributedEventTransaction"]["redisMetadata"],
origin: this.privateUUID,
to: isoListener.providedUUID
}
};

const redisMetadata = {
mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction,
relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction,
eventTransactionId: null,
resolved: backupIncomerTransaction.redisMetadata.resolved
};

await Promise.all([
this.eventsHandler.dispatch({
channel: relatedChannel,
store: this.dispatcherTransactionStore,
redisMetadata,
event,
dispatcherTransactionUUID
}),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId),
this.dispatcherTransactionStore.deleteTransaction(relatedDispatcherTransactionId)
]);

return {
dispatcherTransactionUUID,
event,
redisMetadata
};
}

private async redistributeResolvedSpreadTransaction(options: RedistributeResolvedSpreadTransactionOptions) {
const {
isoListener,
backupIncomerTransaction,
backupTransactionId,
relatedDispatcherTransaction,
relatedDispatcherTransactionId
} = options;

const relatedStore = new TransactionStore({
prefix: `${isoListener.prefix ? `${isoListener.prefix}-` : ""}${isoListener.providedUUID}`,
instance: "incomer"
});

await Promise.all([
relatedStore.setTransaction({
...backupIncomerTransaction,
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: this.privateUUID,
to: isoListener.providedUUID
}
}, backupIncomerTransaction.redisMetadata.transactionId),
this.dispatcherTransactionStore.updateTransaction(relatedDispatcherTransactionId, {
...relatedDispatcherTransaction,
redisMetadata: {
...relatedDispatcherTransaction.redisMetadata,
to: isoListener.providedUUID
}
} as Transaction<"dispatcher">),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
]);
}

private async resolveSpreadTransactions(options: ResolveTransactions): Promise<ResolveTransactions> {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];
Expand Down
14 changes: 7 additions & 7 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,6 @@ export class Incomer <

public async close() {
try {
await this.externals?.close();

await this.subscriber.unsubscribe(this.dispatcherChannelName, this.incomerChannelName);
this.subscriber.removeAllListeners("message");

clearInterval(this.checkTransactionsStateInterval);
this.checkTransactionsStateInterval = undefined;

Expand All @@ -430,7 +425,12 @@ export class Incomer <
this.checkDispatcherStateTimeout = undefined;
}

await this.cleaupTransactions();
await this.externals?.close();

await this.subscriber.unsubscribe(this.dispatcherChannelName, this.incomerChannelName);
this.subscriber.removeAllListeners("message");

await this.cleanupTransactions();

this.logger.info("Incomer closed successfully");
}
Expand All @@ -441,7 +441,7 @@ export class Incomer <
}
}

private async cleaupTransactions() {
private async cleanupTransactions() {
if (this.incomerChannel) {
await this.incomerChannel.publish({
name: "CLOSE",
Expand Down

0 comments on commit 7f92b56

Please sign in to comment.