Skip to content

Commit

Permalink
Wip: fixing locking.
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Aug 10, 2022
1 parent 097f1ce commit ca44318
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 132 deletions.
2 changes: 1 addition & 1 deletion src/agent/service/nodesCrossSignClaim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function nodesCrossSignClaim({
true,
);
try {
await db.withTransactionF((tran) => {
await db.withTransactionF(async (tran) => {
const readStatus = await genClaims.read();
// If nothing to read, end and destroy
if (readStatus.done) {
Expand Down
2 changes: 1 addition & 1 deletion src/agent/service/nodesHolePunchMessageSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function nodesHolePunchMessageSend({
// Firstly, check if this node is the desired node
// If so, then we want to make this node start sending hole punching packets
// back to the source node.
await db.withTransactionF((tran) => {
await db.withTransactionF(async (tran) => {
if (keyManager.getNodeId().equals(targetId)) {
const [host, port] = networkUtils.parseAddress(
call.request.getProxyAddress(),
Expand Down
2 changes: 1 addition & 1 deletion src/agent/service/vaultsGitInfoGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function vaultsGitInfoGet({
): Promise<void> => {
const genWritable = grpcUtils.generatorWritable(call, true);
try {
await db.withTransactionF((tran) => {
await db.withTransactionF(async (tran) => {
const vaultIdFromName = await vaultManager.getVaultId(
call.request.getVault()?.getNameOrId() as VaultName,
tran,
Expand Down
2 changes: 1 addition & 1 deletion src/agent/service/vaultsGitPackGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function vaultsGitPackGet({
const nodeId = connectionInfo.remoteNodeId;
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId);
const nameOrId = meta.get('vaultNameOrId').pop()!.toString();
await db.withTransactionF((tran) => {
await db.withTransactionF(async (tran) => {
const vaultIdFromName = await vaultManager.getVaultId(
nameOrId as VaultName,
tran,
Expand Down
2 changes: 1 addition & 1 deletion src/agent/service/vaultsScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function vaultsScan({
}
const nodeId = connectionInfo.remoteNodeId;
try {
await db.withTransactionF((tran) => {
await db.withTransactionF(async (tran) => {
const listResponse = vaultManager.handleScanVaults(nodeId, tran);
for await (const {
vaultId,
Expand Down
67 changes: 27 additions & 40 deletions src/discovery/Discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import {
status,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { IdInternal } from '@matrixai/id';
import { Lock } from '@matrixai/async-locks';
import * as idUtils from '@matrixai/id/dist/utils';
import * as resources from '@matrixai/resources';
import * as discoveryUtils from './utils';
import * as discoveryErrors from './errors';
import * as nodesErrors from '../nodes/errors';
Expand Down Expand Up @@ -91,7 +89,6 @@ class Discovery {
protected discoveryProcess: Promise<void>;
protected queuePlug = promise<void>();
protected queueDrained = promise<void>();
protected lock: Lock = new Lock();

public constructor({
keyManager,
Expand Down Expand Up @@ -420,22 +417,19 @@ class Discovery {
}

/**
* Simple check for whether the Discovery Queue is empty. Uses a
* transaction lock to ensure consistency.
* Simple check for whether the Discovery Queue is empty.
*/
protected async queueIsEmpty(): Promise<boolean> {
return await this.lock.withF(async () => {
let nextDiscoveryQueueId: DiscoveryQueueId | undefined;
const keyIterator = this.db.iterator(this.discoveryQueueDbPath, {
limit: 1,
values: false,
});
for await (const [keyPath] of keyIterator) {
const key = keyPath[0] as Buffer;
nextDiscoveryQueueId = IdInternal.fromBuffer<DiscoveryQueueId>(key);
}
return nextDiscoveryQueueId == null;
let nextDiscoveryQueueId: DiscoveryQueueId | undefined;
const keyIterator = this.db.iterator(this.discoveryQueueDbPath, {
limit: 1,
values: false,
});
for await (const [keyPath] of keyIterator) {
const key = keyPath[0] as Buffer;
nextDiscoveryQueueId = IdInternal.fromBuffer<DiscoveryQueueId>(key);
}
return nextDiscoveryQueueId == null;
}

/**
Expand All @@ -446,25 +440,23 @@ class Discovery {
protected async pushKeyToDiscoveryQueue(
gestaltKey: GestaltKey,
): Promise<void> {
await resources.withF(
[this.db.transaction(), this.lock.lock()],
async ([tran]) => {
const valueIterator = tran.iterator<GestaltKey>(
this.discoveryQueueDbPath,
{ valueAsBuffer: false },
);
for await (const [, value] of valueIterator) {
if (value === gestaltKey) {
return;
}
await this.db.withTransactionF(async (tran) => {
await tran.lock(gestaltKey);
const valueIterator = tran.iterator<GestaltKey>(
this.discoveryQueueDbPath,
{ valueAsBuffer: false },
);
for await (const [, value] of valueIterator) {
if (value === gestaltKey) {
return;
}
const discoveryQueueId = this.discoveryQueueIdGenerator();
await tran.put(
[...this.discoveryQueueDbPath, idUtils.toBuffer(discoveryQueueId)],
gestaltKey,
);
},
);
}
const discoveryQueueId = this.discoveryQueueIdGenerator();
await tran.put(
[...this.discoveryQueueDbPath, idUtils.toBuffer(discoveryQueueId)],
gestaltKey,
);
});
this.queuePlug.resolveP();
}

Expand All @@ -476,12 +468,7 @@ class Discovery {
protected async removeKeyFromDiscoveryQueue(
keyId: DiscoveryQueueId,
): Promise<void> {
await this.lock.withF(async () => {
await this.db.del([
...this.discoveryQueueDbPath,
idUtils.toBuffer(keyId),
]);
});
await this.db.del([...this.discoveryQueueDbPath, idUtils.toBuffer(keyId)]);
}

/**
Expand Down
85 changes: 32 additions & 53 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DB, DBTransaction, KeyPath, LevelPath } from '@matrixai/db';
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type {
NotificationId,
Notification,
Expand All @@ -12,13 +12,11 @@ import type NodeConnectionManager from '../nodes/NodeConnectionManager';
import type { NodeId } from '../nodes/types';
import Logger from '@matrixai/logger';
import { IdInternal } from '@matrixai/id';
import { Lock, LockBox } from '@matrixai/async-locks';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { utils as idUtils } from '@matrixai/id';
import { withF } from '@matrixai/resources';
import * as notificationsUtils from './utils';
import * as notificationsErrors from './errors';
import * as notificationsPB from '../proto/js/polykey/v1/notifications/notifications_pb';
Expand Down Expand Up @@ -78,7 +76,6 @@ class NotificationsManager {
protected nodeManager: NodeManager;
protected nodeConnectionManager: NodeConnectionManager;
protected messageCap: number;
protected locks: LockBox<Lock> = new LockBox();

/**
* Top level stores MESSAGE_COUNT_KEY -> number (of messages)
Expand Down Expand Up @@ -123,36 +120,30 @@ class NotificationsManager {
public async start({
fresh = false,
}: { fresh?: boolean } = {}): Promise<void> {
await withF(
[
this.db.transaction(),
this.locks.lock([
[...this.notificationsDbPath, MESSAGE_COUNT_KEY],
Lock,
]),
],
async ([tran]) => {
this.logger.info(`Starting ${this.constructor.name}`);
if (fresh) {
await tran.clear(this.notificationsDbPath);
}
await this.db.withTransactionF(async (tran) => {
await tran.lock(
[...this.notificationsDbPath, MESSAGE_COUNT_KEY].toString(),
);
this.logger.info(`Starting ${this.constructor.name}`);
if (fresh) {
await tran.clear(this.notificationsDbPath);
}

// Getting latest ID and creating ID generator
let latestId: NotificationId | undefined;
const keyIterator = tran.iterator(this.notificationsMessagesDbPath, {
limit: 1,
reverse: true,
values: false,
});
for await (const [keyPath] of keyIterator) {
const key = keyPath[0] as Buffer;
latestId = IdInternal.fromBuffer<NotificationId>(key);
}
this.notificationIdGenerator =
notificationsUtils.createNotificationIdGenerator(latestId);
this.logger.info(`Started ${this.constructor.name}`);
},
);
// Getting latest ID and creating ID generator
let latestId: NotificationId | undefined;
const keyIterator = tran.iterator(this.notificationsMessagesDbPath, {
limit: 1,
reverse: true,
values: false,
});
for await (const [keyPath] of keyIterator) {
const key = keyPath[0] as Buffer;
latestId = IdInternal.fromBuffer<NotificationId>(key);
}
this.notificationIdGenerator =
notificationsUtils.createNotificationIdGenerator(latestId);
this.logger.info(`Started ${this.constructor.name}`);
});
}

public async stop() {
Expand All @@ -168,20 +159,6 @@ class NotificationsManager {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new notificationsErrors.ErrorNotificationsNotRunning())
public async withTransactionF<T>(
...params: [...keys: Array<KeyPath>, f: (tran: DBTransaction) => Promise<T>]
): Promise<T> {
const f = params.pop() as (tran: DBTransaction) => Promise<T>;
const lockRequests = (params as Array<KeyPath>).map<[KeyPath, typeof Lock]>(
(key) => [key, Lock],
);
return withF(
[this.db.transaction(), this.locks.lock(...lockRequests)],
([tran]) => f(tran),
);
}

/**
* Send a notification to another node
* The `data` parameter must match one of the NotificationData types outlined in ./types
Expand Down Expand Up @@ -218,10 +195,12 @@ class NotificationsManager {
): Promise<void> {
const messageCountPath = [...this.notificationsDbPath, MESSAGE_COUNT_KEY];
if (tran == null) {
return this.withTransactionF(messageCountPath, (tran) =>
return this.db.withTransactionF(async (tran) =>
this.receiveNotification(notification, tran),
);
}

await tran.lock(messageCountPath.toString());
const nodePerms = await this.acl.getNodePerm(
nodesUtils.decodeNodeId(notification.senderId)!,
);
Expand Down Expand Up @@ -269,7 +248,7 @@ class NotificationsManager {
tran?: DBTransaction;
} = {}): Promise<Array<Notification>> {
if (tran == null) {
return this.withTransactionF((tran) =>
return this.db.withTransactionF((tran) =>
this.readNotifications({ unread, number, order, tran }),
);
}
Expand Down Expand Up @@ -309,7 +288,7 @@ class NotificationsManager {
tran?: DBTransaction,
): Promise<Notification | undefined> {
if (tran == null) {
return this.withTransactionF((tran) =>
return this.db.withTransactionF((tran) =>
this.findGestaltInvite(fromNode, tran),
);
}
Expand All @@ -331,10 +310,10 @@ class NotificationsManager {
public async clearNotifications(tran?: DBTransaction): Promise<void> {
const messageCountPath = [...this.notificationsDbPath, MESSAGE_COUNT_KEY];
if (tran == null) {
return this.withTransactionF(messageCountPath, (tran) =>
this.clearNotifications(tran),
);
return this.db.withTransactionF((tran) => this.clearNotifications(tran));
}

await tran.lock(messageCountPath.toString());
const notificationIds = await this.getNotificationIds('all', tran);
const numMessages = await tran.get<number>(messageCountPath);
if (numMessages !== undefined) {
Expand Down
Loading

0 comments on commit ca44318

Please sign in to comment.