Skip to content

Commit

Permalink
feat: Integrate recordless (temp) inst feature limits
Browse files Browse the repository at this point in the history
  • Loading branch information
KallynGowdy committed Sep 27, 2023
1 parent e4308e7 commit 3ce908c
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 8 deletions.
176 changes: 176 additions & 0 deletions src/aux-records/websockets/WebsocketController.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,98 @@ describe('WebsocketController', () => {
expect(await instStore.getInstByName(null, inst)).toBe(null);
});

it('should return a not_authorized error if recordless insts are not allowed', async () => {
store.subscriptionConfiguration = merge(
createTestSubConfiguration(),
{
defaultFeatures: {
tempInsts: {
allowed: false,
},
},
} as Partial<SubscriptionConfiguration>
);

await connectionStore.saveConnection(device1Info);

await server.watchBranch(device1Info.serverConnectionId, {
type: 'repo/watch_branch',
recordName: null,
inst,
branch: 'doesNotExist',
});

expect(
messenger.getEvents(device1Info.serverConnectionId)
).toEqual([
[
WebsocketEventTypes.Error,
-1,
{
success: false,
errorCode: 'not_authorized',
errorMessage: 'Temporary insts are not allowed.',
},
],
]);

expect(
await instStore.getBranchByName(null, inst, 'doesNotExist')
).toEqual(null);
// Should not create an inst when the record name is null.
expect(await instStore.getInstByName(null, inst)).toBe(null);
});

it('should return a not_authorized error if the maximum number of connections for recordless insts is reached', async () => {
store.subscriptionConfiguration = merge(
createTestSubConfiguration(),
{
defaultFeatures: {
tempInsts: {
allowed: true,
maxActiveConnectionsPerInst: 1,
},
},
} as Partial<SubscriptionConfiguration>
);

await connectionStore.saveConnection(device1Info);
await connectionStore.saveConnection(device2Info);

await server.watchBranch(device1Info.serverConnectionId, {
type: 'repo/watch_branch',
recordName: null,
inst,
branch: 'doesNotExist',
});

expect(
messenger.getEvents(device1Info.serverConnectionId)
).toEqual([]);

await server.watchBranch(device2Info.serverConnectionId, {
type: 'repo/watch_branch',
recordName: null,
inst,
branch: 'doesNotExist',
});

expect(
messenger.getEvents(device2Info.serverConnectionId)
).toEqual([
[
WebsocketEventTypes.Error,
-1,
{
success: false,
errorCode: 'not_authorized',
errorMessage:
'The maximum number of active connections to this inst has been reached.',
},
],
]);
});

describe('temp', () => {
it('should load the branch like normal if the branch is temporary', async () => {
await connectionStore.saveConnection(device1Info);
Expand Down Expand Up @@ -2205,6 +2297,90 @@ describe('WebsocketController', () => {
instSizeInBytes: 3,
});
});

it('should return a not_authorized error if recordless insts are not allowed', async () => {
store.subscriptionConfiguration = merge(
createTestSubConfiguration(),
{
defaultFeatures: {
tempInsts: {
allowed: false,
},
},
} as Partial<SubscriptionConfiguration>
);

await connectionStore.saveConnection(device1Info);

await server.addUpdates(device1Info.serverConnectionId, {
type: 'repo/add_updates',
recordName: null,
inst,
branch: 'doesNotExist',
updates: ['111', '222'],
updateId: 0,
});

expect(
messenger.getEvents(device1Info.serverConnectionId)
).toEqual([
[
WebsocketEventTypes.Error,
-1,
{
success: false,
errorCode: 'not_authorized',
errorMessage: 'Temporary insts are not allowed.',
},
],
]);

expect(
await instStore.getBranchByName(null, inst, 'doesNotExist')
).toEqual(null);
// Should not create an inst when the record name is null.
expect(await instStore.getInstByName(null, inst)).toBe(null);
});

it('should return a not_authorized error if it would exceed the maximum inst size for recordless insts', async () => {
store.subscriptionConfiguration = merge(
createTestSubConfiguration(),
{
defaultFeatures: {
tempInsts: {
allowed: true,
maxBytesPerInst: 1,
},
},
} as Partial<SubscriptionConfiguration>
);

await connectionStore.saveConnection(device1Info);

await server.addUpdates(device1Info.serverConnectionId, {
type: 'repo/add_updates',
recordName: null,
inst,
branch: 'doesNotExist',
updates: ['111', '222'],
updateId: 0,
});

expect(
messenger.getEvents(device1Info.serverConnectionId)
).toEqual([
[
WebsocketEventTypes.Error,
-1,
{
success: false,
errorCode: 'not_authorized',
errorMessage:
'The maximum number of bytes per inst has been reached.',
},
],
]);
});
});

describe('records', () => {
Expand Down
69 changes: 61 additions & 8 deletions src/aux-records/websockets/WebsocketController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@ export class WebsocketController {
}

const config = await this._config.getSubscriptionConfiguration();

if (!event.recordName) {
if (config.defaultFeatures?.tempInsts?.allowed === false) {
await this.sendError(connectionId, -1, {
success: false,
errorCode: 'not_authorized',
errorMessage: 'Temporary insts are not allowed.',
});
return;
}
}

const instResult = await this._getOrCreateInst(
event.recordName,
event.inst,
Expand All @@ -322,20 +334,35 @@ export class WebsocketController {
const inst = instResult.inst;
const features = instResult.features;

let maxConnections: number = null;

if (
features &&
typeof features.insts.maxActiveConnectionsPerInst === 'number'
) {
maxConnections = features.insts.maxActiveConnectionsPerInst;
} else if (
!event.recordName &&
typeof config.defaultFeatures?.tempInsts
?.maxActiveConnectionsPerInst === 'number'
) {
maxConnections =
config.defaultFeatures.tempInsts.maxActiveConnectionsPerInst;
}

if (maxConnections) {
const count = await this._connectionStore.countConnectionsByBranch(
'branch',
event.recordName,
event.inst,
event.branch
);
if (count >= features.insts.maxActiveConnectionsPerInst) {
if (count >= maxConnections) {
await this.sendError(connectionId, -1, {
success: false,
errorCode: 'subscription_limit_reached',
errorCode: features
? 'subscription_limit_reached'
: 'not_authorized',
errorMessage:
'The maximum number of active connections to this inst has been reached.',
});
Expand Down Expand Up @@ -553,6 +580,17 @@ export class WebsocketController {
const config = await this._config.getSubscriptionConfiguration();
let features: FeaturesConfiguration = null;

if (!event.recordName) {
if (config.defaultFeatures?.tempInsts?.allowed === false) {
await this.sendError(connectionId, -1, {
success: false,
errorCode: 'not_authorized',
errorMessage: 'Temporary insts are not allowed.',
});
return;
}
}

if (!branch) {
console.log(
`[CausalRepoServer] [namespace: ${event.recordName}/${event.inst}/${event.branch}, connectionId: ${connectionId}] Branch not found!`
Expand Down Expand Up @@ -700,7 +738,23 @@ export class WebsocketController {
branch.linkedInst.subscriptionType
);
}
if (features) {

let maxInstSize: number = null;

if (
features &&
typeof features.insts.maxBytesPerInst === 'number'
) {
maxInstSize = features.insts.maxBytesPerInst;
} else if (
!event.recordName &&
typeof config.defaultFeatures?.tempInsts?.maxBytesPerInst ===
'number'
) {
maxInstSize = config.defaultFeatures.tempInsts.maxBytesPerInst;
}

if (maxInstSize) {
const currentSize = branch.temporary
? await this._temporaryStore.getInstSize(
event.recordName,
Expand All @@ -710,13 +764,12 @@ export class WebsocketController {
event.recordName,
event.inst
);
if (
typeof features.insts.maxBytesPerInst === 'number' &&
currentSize + updateSize > features.insts.maxBytesPerInst
) {
if (currentSize + updateSize > maxInstSize) {
await this.sendError(connectionId, -1, {
success: false,
errorCode: 'subscription_limit_reached',
errorCode: features
? 'subscription_limit_reached'
: 'not_authorized',
errorMessage:
'The maximum number of bytes per inst has been reached.',
});
Expand Down

0 comments on commit 3ce908c

Please sign in to comment.