Skip to content

Commit

Permalink
fix(moac): serialize volume creation
Browse files Browse the repository at this point in the history
As part of that, request cache has been implemented, that is used to
detect duplicate CSI requests. All of this improves stability of the
moac if it gets under the load.

e2e stress tests are out of scope and will be implemented on behalf
of a different ticket.

Minor improvement losely related to the changes is workq getting
converted to typescript to take advantage of type checking.

Resolves: CAS-673
  • Loading branch information
Jan Kryl committed Feb 1, 2021
1 parent d23adb4 commit 647b540
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 116 deletions.
1 change: 1 addition & 0 deletions csi/moac/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
/volumes.js
/volume_operator.js
/watcher.js
/workq.js
/*.js.map
102 changes: 94 additions & 8 deletions csi/moac/csi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,36 @@ function createK8sVolumeObject (volume: Volume): K8sVolume {
return obj;
}

// Duplicate request cache entry helps to detect retransmits of the same request
//
// This may seem like a useless thing but k8s is agressive on retransmitting
// requests. The first retransmit happens just a tens of ms after the original
// request. Having many requests that are the same in progress creates havoc
// and forces mayastor to execute repeating code.
//
// NOTE: Assumption is that k8s doesn't submit duplicate request for the same
// volume (the same uuid) with different parameters.
//
class Request {
uuid: string; // ID of the object in the operation
op: string; // name of the operation
callbacks: CsiDoneCb[]; // callbacks to call when done

constructor (uuid: string, op: string, cb: CsiDoneCb) {
this.uuid = uuid;
this.op = op;
this.callbacks = [cb];
}

wait (cb: CsiDoneCb) {
this.callbacks.push(cb);
}

done (err: any, resp?: any) {
this.callbacks.forEach((cb) => cb(err, resp));
}
}

// CSI Controller implementation.
//
// It implements Identity and Controller grpc services from csi proto file.
Expand All @@ -127,6 +157,7 @@ class CsiServer {
private sockPath: string;
private nextListContextId: number;
private listContexts: Record<string, ListContext>;
private duplicateRequestCache: Request[];

// Creates new csi server
//
Expand All @@ -139,6 +170,7 @@ class CsiServer {
this.sockPath = sockPath;
this.nextListContextId = 1;
this.listContexts = {};
this.duplicateRequestCache = [];

// The data returned by identity service should be kept in sync with
// responses for the same methods on storage node.
Expand Down Expand Up @@ -255,6 +287,32 @@ class CsiServer {
this.ready = false;
}

// Find outstanding request by uuid and operation type.
_findRequest (uuid: string, op: string): Request | undefined {
return this.duplicateRequestCache.find((e) => e.uuid === uuid && e.op === op);
}

_beginRequest (uuid: string, op: string, cb: CsiDoneCb): Request | undefined {
let request = this._findRequest(uuid, op);
if (request) {
log.debug(`Duplicate ${op} volume request detected`);
request.wait(cb);
return;
}
request = new Request(uuid, op, cb);
this.duplicateRequestCache.push(request);
return request;
}

// Remove request entry from the cache and call done callbacks.
_endRequest (request: Request, err: any, resp?: any) {
let idx = this.duplicateRequestCache.indexOf(request);
if (idx >= 0) {
this.duplicateRequestCache.splice(idx, 1);
}
request.done(err, resp);
}

//
// Implementation of CSI identity methods
//
Expand Down Expand Up @@ -400,6 +458,12 @@ class CsiServer {
count = 1;
}

// If this is a duplicate request then assure it is executed just once.
let request = this._beginRequest(uuid, 'create', cb);
if (!request) {
return;
}

// create the volume
let volume;
try {
Expand All @@ -412,12 +476,14 @@ class CsiServer {
protocol: protocol
});
} catch (err) {
return cb(err);
this._endRequest(request, err);
return;
}

// This was used in the old days for NBD protocol
const accessibleTopology: TopologyKeys[] = [];
cb(null, {

this._endRequest(request, null, {
volume: {
capacityBytes: volume.getSize(),
volumeId: uuid,
Expand All @@ -437,13 +503,19 @@ class CsiServer {

log.debug(`Request to destroy volume "${args.volumeId}"`);

// If this is a duplicate request then assure it is executed just once.
let request = this._beginRequest(args.volumeId, 'delete', cb);
if (!request) {
return;
}

try {
await this.volumes.destroyVolume(args.volumeId);
} catch (err) {
return cb(err);
return this._endRequest(request, err);
}
log.info(`Volume "${args.volumeId}" destroyed`);
cb(null);
this._endRequest(request, null);
}

async listVolumes (call: any, cb: CsiDoneCb) {
Expand Down Expand Up @@ -542,6 +614,12 @@ class CsiServer {
return cb(err);
}

// If this is a duplicate request then assure it is executed just once.
let request = this._beginRequest(args.volumeId, 'publish', cb);
if (!request) {
return;
}

const publishContext: any = {};
try {
publishContext.uri = await volume.publish(protocol);
Expand All @@ -551,15 +629,16 @@ class CsiServer {
} catch (err) {
if (err.code === grpc.status.ALREADY_EXISTS) {
log.debug(`Volume "${args.volumeId}" already published on this node`);
cb(null, { publishContext });
this._endRequest(request, null, { publishContext });
} else {
cb(err);
this._endRequest(request, err);
}
return;
}

log.info(`Published volume "${args.volumeId}" over ${protocol}`);
cb(null, { publishContext });
this._endRequest(request, null, { publishContext });
}

async controllerUnpublishVolume (call: any, cb: CsiDoneCb) {
Expand All @@ -580,13 +659,20 @@ class CsiServer {
} catch (err) {
return cb(err);
}

// If this is a duplicate request then assure it is executed just once.
let request = this._beginRequest(args.volumeId, 'unpublish', cb);
if (!request) {
return;
}

try {
await volume.unpublish();
} catch (err) {
return cb(err);
return this._endRequest(request, err);
}
log.info(`Unpublished volume "${args.volumeId}"`);
cb(null, {});
this._endRequest(request, null, {});
}

async validateVolumeCapabilities (call: any, cb: CsiDoneCb) {
Expand Down
39 changes: 20 additions & 19 deletions csi/moac/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ import assert from 'assert';
import { Pool } from './pool';
import { Nexus } from './nexus';
import { Replica } from './replica';
import { Workq } from './workq';

const EventEmitter = require('events');
const Workq = require('./workq');
const log = require('./logger').Logger('node');
const { GrpcClient, GrpcCode, GrpcError } = require('./grpc_client');

// Type used in workq for calling grpc
type GrpcCallArgs = {
method: string;
args: any;
}

// Object represents mayastor storage node.
//
// Node emits following events:
Expand All @@ -25,7 +31,7 @@ export class Node extends EventEmitter {
syncBadLimit: number;
endpoint: string | null;
client: any;
workq: any;
workq: Workq;
syncFailed: number;
syncTimer: NodeJS.Timeout | null;
nexus: Nexus[];
Expand All @@ -49,7 +55,7 @@ export class Node extends EventEmitter {

this.endpoint = null;
this.client = null; // grpc client handle
this.workq = new Workq(); // work queue for serializing grpc calls
this.workq = new Workq('grpc call'); // work queue for serializing grpc calls
// We don't want to switch all objects to offline state when moac starts
// just because a node is not reachable from the beginning. That's why we
// set syncFailed to syncBadLimit + 1.
Expand Down Expand Up @@ -129,17 +135,19 @@ export class Node extends EventEmitter {
// @returns A promise that evals to return value of gRPC method.
//
async call(method: string, args: any): Promise<any> {
return this.workq.push({ method, args }, this._call.bind(this));
return this.workq.push({ method, args }, (args: GrpcCallArgs) => {
return this._call(args.method, args.args);
});
}

async _call(ctx: any) {
async _call(method: string, args: any): Promise<any> {
if (!this.client) {
throw new GrpcError(
GrpcCode.INTERNAL,
`Broken connection to mayastor on node "${this.name}"`
);
}
return this.client.call(ctx.method, ctx.args);
return this.client.call(method, args);
}

// Sync triggered by the timer. It ensures that the sync does run in
Expand All @@ -149,7 +157,9 @@ export class Node extends EventEmitter {
this.syncTimer = null;

try {
await this.workq.push({}, this._sync.bind(this));
await this.workq.push(null, () => {
return this._sync();
});
nextSync = this.syncPeriod;
} catch (err) {
// We don't want to cover up unexpected errors. But it's hard to
Expand Down Expand Up @@ -180,20 +190,11 @@ export class Node extends EventEmitter {
log.debug(`Syncing the node "${this.name}"`);

// TODO: Harden checking of outputs of the methods below
let reply = await this._call({
method: 'listNexus',
args: {}
});
let reply = await this._call('listNexus', {});
const nexus = reply.nexusList;
reply = await this._call({
method: 'listPools',
args: {}
});
reply = await this._call('listPools', {});
const pools = reply.pools;
reply = await this._call({
method: 'listReplicas',
args: {}
});
reply = await this._call('listReplicas', {});
const replicas = reply.replicas;

// Move the the node to online state before we attempt to merge objects
Expand Down
6 changes: 3 additions & 3 deletions csi/moac/node_operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import {
CustomResourceCache,
CustomResourceMeta,
} from './watcher';
import { Workq } from './workq';

const yaml = require('js-yaml');
const EventStream = require('./event_stream');
const log = require('./logger').Logger('node-operator');
const Workq = require('./workq');

const RESOURCE_NAME: string = 'mayastornode';
const crdNode = yaml.safeLoad(
Expand Down Expand Up @@ -75,7 +75,7 @@ export class NodeOperator {
watcher: CustomResourceCache<NodeResource>; // k8s resource watcher for nodes
registry: any;
namespace: string;
workq: any; // for serializing node operations
workq: Workq; // for serializing node operations
eventStream: any; // events from the registry

// Create node operator object.
Expand All @@ -92,7 +92,7 @@ export class NodeOperator {
) {
assert(registry);
this.namespace = namespace;
this.workq = new Workq();
this.workq = new Workq('mayastornode');
this.registry = registry;
this.watcher = new CustomResourceCache(
this.namespace,
Expand Down
4 changes: 2 additions & 2 deletions csi/moac/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
},
"scripts": {
"prepare": "./bundle_protos.sh",
"clean": "rm -f csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js *.js.map",
"purge": "rm -rf node_modules proto csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js *.js.map",
"clean": "rm -f csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js workq.js *.js.map",
"purge": "rm -rf node_modules proto csi.js node.js replica.js pool.js nexus.js watcher.js node_operator.js pool_operator.js volume.js volumes.js volume_operator.js workq.js *.js.map",
"compile": "tsc --pretty",
"start": "./index.js",
"test": "mocha test/index.js",
Expand Down
6 changes: 3 additions & 3 deletions csi/moac/pool_operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import {
CustomResourceCache,
CustomResourceMeta,
} from './watcher';
import { Workq } from './workq';

const yaml = require('js-yaml');
const log = require('./logger').Logger('pool-operator');
const EventStream = require('./event_stream');
const Workq = require('./workq');

const RESOURCE_NAME: string = 'mayastorpool';
const POOL_FINALIZER = 'finalizer.mayastor.openebs.io';
Expand Down Expand Up @@ -125,7 +125,7 @@ export class PoolOperator {
watcher: CustomResourceCache<PoolResource>; // k8s resource watcher for pools
registry: any; // registry containing info about mayastor nodes
eventStream: any; // A stream of node and pool events.
workq: any; // for serializing pool operations
workq: Workq; // for serializing pool operations

// Create pool operator.
//
Expand All @@ -142,7 +142,7 @@ export class PoolOperator {
this.namespace = namespace;
this.registry = registry; // registry containing info about mayastor nodes
this.eventStream = null; // A stream of node and pool events.
this.workq = new Workq(); // for serializing pool operations
this.workq = new Workq('mayastorpool'); // for serializing pool operations
this.watcher = new CustomResourceCache(
this.namespace,
RESOURCE_NAME,
Expand Down
Loading

0 comments on commit 647b540

Please sign in to comment.