Skip to content

Commit

Permalink
feat: in memory cache for drives and transmitters
Browse files Browse the repository at this point in the history
  • Loading branch information
froid1911 committed Mar 29, 2024
1 parent e30a177 commit 1e1211b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 19 deletions.
3 changes: 3 additions & 0 deletions api/src/graphql/generated/drive/nexus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ export interface NexusGenObjects {
purchasePrice: number; // Float!
purchaseProceeds: number; // Float!
realizedSurplus: number; // Float!
salesProceeds?: number | null; // Float
spvId: string; // ID!
totalDiscount: number; // Float!
}
Expand Down Expand Up @@ -737,6 +738,7 @@ export interface NexusGenFieldTypes {
purchasePrice: number; // Float!
purchaseProceeds: number; // Float!
realizedSurplus: number; // Float!
salesProceeds: number | null; // Float
spvId: string; // ID!
totalDiscount: number; // Float!
}
Expand Down Expand Up @@ -1227,6 +1229,7 @@ export interface NexusGenFieldTypeNames {
purchasePrice: 'Float'
purchaseProceeds: 'Float'
realizedSurplus: 'Float'
salesProceeds: 'Float'
spvId: 'ID'
totalDiscount: 'Float'
}
Expand Down
1 change: 1 addition & 0 deletions api/src/graphql/generated/drive/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type FixedIncome {
purchasePrice: Float!
purchaseProceeds: Float!
realizedSurplus: Float!
salesProceeds: Float
spvId: ID!
totalDiscount: Float!
}
Expand Down
2 changes: 1 addition & 1 deletion api/src/modules/document-drive/drive-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export const getDrive = queryField('drive', {
resolve: async (_parent, args, ctx) => {
try {
const drive = await ctx.prisma.document.getDrive(ctx.driveId ?? '1');
return drive.global;
return drive;
} catch (e) {
return null;
}
Expand Down
120 changes: 102 additions & 18 deletions api/src/modules/document/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,39 @@ import {
PullResponderTransmitter,
IReceiver,
InternalTransmitter,


} from 'document-drive';

import { PrismaStorage } from 'document-drive/storage/prisma';
import * as DocumentModelsLibs from 'document-model-libs/document-models';
import { module as DocumentModelLib } from 'document-model/document-model';
import { DocumentModel, Operation } from 'document-model/document';
import { DocumentModel, Operation, State } from 'document-model/document';
import {
Listener,
ListenerFilter,
actions,
reducer,
DocumentDriveState,
DocumentDriveAction
} from 'document-model-libs/document-drive';


import { init } from './listenerManager';
import { getChildLogger } from '../../logger';

const logger = getChildLogger({ msgPrefix: 'Document Model' });

export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
const documentModels = [
DocumentModelLib,
...Object.values(DocumentModelsLibs),
] as DocumentModel[];

let transmitters: Record<string, Record<string, PullResponderTransmitter>> = {};
let lastAcceessedTransmitter: Record<string, Record<string, number>> = {};
let drives: Record<string, DocumentDriveState> = {};

const driveServer = new DocumentDriveServer(
documentModels,
new PrismaStorage(prisma as PrismaClient),
Expand All @@ -42,13 +51,49 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
await init(driveServer, prisma);
}

function clearDriveCache() {
drives = {};
}

function clearTransmitterCache() {
transmitters = {};
lastAcceessedTransmitter = {};
}

function getTransmitter(driveId: string, transmitterId: string) {
if (!transmitters[driveId]) {
return undefined;
}
const transmitter = transmitters[driveId][transmitterId];
return transmitter
}

function setTransmitter(driveId: string, transmitterId: string, transmitter: PullResponderTransmitter) {
if (!transmitters[driveId]) {
transmitters[driveId] = {};
}
transmitters[driveId][transmitterId] = transmitter;
}

function updateLastAccessedTransmitter(driveId: string, transmitterId: string) {
if (!lastAcceessedTransmitter[driveId]) {
lastAcceessedTransmitter[driveId] = {};
}
lastAcceessedTransmitter[driveId][transmitterId] = Date.now();
}
initialize();

setInterval(() => {
clearDriveCache();
clearTransmitterCache();
}, 1000 * 60 * 15);

return {
addDrive: async (args: DriveInput) => {
try {
await driveServer.addDrive(args);
await initialize();
clearDriveCache();
} catch (e) {
throw new Error("Couldn't add drive");
}
Expand All @@ -59,6 +104,7 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
deleteDrive: async (id: string) => {
try {
await driveServer.deleteDrive(id);
clearDriveCache();
} catch (e) {
throw new Error("Couldn't delete drive");
}
Expand All @@ -67,24 +113,41 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
},
getDrive: async (id: string) => {
try {
const { state } = await driveServer.getDrive(id);
return state;
let drive = drives[id];
console.log(drive);
if (!drive) {
const { state } = await driveServer.getDrive(id);
drives[id] = state.global;
return state.global;
} else {
return drive;
}


} catch (e) {
throw new Error("Couldn't get drive");
}
},
getDrives: async () => {
try {
const drives = await driveServer.getDrives();
return drives;
let driveIds = Object.keys(drives);
if (driveIds.length > 0) {
return driveIds;
}
driveIds = await driveServer.getDrives()
driveIds.forEach((driveId) => {
transmitters[driveId] = {};
})

return driveIds;
} catch (e) {
throw new Error("Couldn't get drives");
}
},

pushUpdates: async (
driveId: string,
operations: Operation[],
operations: Operation<DocumentDriveAction>[],
documentId?: string,
) => {
if (!documentId) {
Expand All @@ -111,13 +174,22 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
listenerId: string,
since?: string,
): Promise<StrandUpdate[]> => {
const transmitter = (await driveServer.getTransmitter(
driveId,
listenerId,
)) as PullResponderTransmitter;

console.log(transmitters)
let transmitter: PullResponderTransmitter | undefined = getTransmitter(driveId, listenerId);
if (!transmitter) {
throw new Error(`Transmitter with id ${listenerId} not found`);
transmitter = await driveServer.getTransmitter(
driveId,
listenerId,
) as PullResponderTransmitter;

if (!transmitter) {
throw new Error(`Transmitter with id ${listenerId} not found`);
}
setTransmitter(driveId, listenerId, transmitter);
updateLastAccessedTransmitter(driveId, listenerId);
}

if (transmitter.getStrands) {
const result = await transmitter.getStrands(since || undefined);
return result;
Expand All @@ -131,12 +203,17 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
listenerId: string,
revisions: ListenerRevision[],
) => {
const transmitter = (await driveServer.getTransmitter(
driveId,
listenerId,
)) as PullResponderTransmitter;
let transmitter: PullResponderTransmitter | undefined = getTransmitter(driveId, listenerId);
if (!transmitter) {
throw new Error(`Transmitter with id ${listenerId} not found`);
transmitter = await driveServer.getTransmitter(
driveId,
listenerId,
) as PullResponderTransmitter;
if (!transmitter) {
throw new Error(`Transmitter with id ${listenerId} not found`);
}
setTransmitter(driveId, listenerId, transmitter);
updateLastAccessedTransmitter(driveId, listenerId);
}
const result = await transmitter.processAcknowledge(
driveId,
Expand Down Expand Up @@ -172,7 +249,9 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
let drive = await driveServer.getDrive(driveId);
drive = reducer(drive, actions.addListener({ listener }));
const operation = drive.operations.local.slice().pop();

if (!operation) {
throw new Error("Operation couldnt be applied")
}
await driveServer.addDriveOperations(driveId, [operation]);
return listener;
},
Expand All @@ -184,8 +263,13 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
let drive = await driveServer.getDrive(driveId);
drive = reducer(drive, actions.removeListener({ listenerId }));
const operation = drive.operations.local.slice().pop();
if (!operation) {
throw new Error("Operation couldnt be applied")
}

await driveServer.addDriveOperations(driveId, [operation]);
delete transmitters[driveId][listenerId];
delete lastAcceessedTransmitter[driveId][listenerId];
return listenerId;
},

Expand All @@ -202,5 +286,5 @@ export function getDocumentDriveCRUD(prisma: Prisma.TransactionClient) {
};
return response;
},
};
}
}

0 comments on commit 1e1211b

Please sign in to comment.