Skip to content

Commit

Permalink
refactor(moac): drop support for NBD in CSI
Browse files Browse the repository at this point in the history
NBD does not work well with new nexus feature when nexus is created
only if it is used (mounted). For NBD we need to know which node the
nexus is located and the location must not change throughout the
lifetime of the volume. Apparently that does not work now when nexus
is created on whatever node is available and the location can change
between mounts of the volume.

And as part of that convert csi.js to typescript.

Resolves: CAS-672
  • Loading branch information
Jan Kryl committed Jan 28, 2021
1 parent a7eb859 commit da73b74
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 236 deletions.
3 changes: 2 additions & 1 deletion csi/moac/.gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/node_modules/
/proto/
/result
/watcher.js
/csi.js
/nexus.js
/node.js
/node_operator.js
Expand All @@ -11,4 +11,5 @@
/volume.js
/volumes.js
/volume_operator.js
/watcher.js
/*.js.map
146 changes: 83 additions & 63 deletions csi/moac/csi.js → csi/moac/csi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

'use strict';

const assert = require('assert');
import assert from 'assert';
import * as _ from 'lodash';
import * as path from 'path';
import { Volume } from './volume';
import { Volumes } from './volumes';

const fs = require('fs').promises;
const path = require('path');
const protoLoader = require('@grpc/proto-loader');
const grpc = require('grpc-uds');
const log = require('./logger').Logger('csi');
Expand All @@ -30,8 +34,33 @@ const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
});
const csi = grpc.loadPackageDefinition(packageDefinition).csi.v1;

// Done callback in CSI methods
type CsiDoneCb = (err: any, resp?: any) => void;
// CSI method signature
type CsiMethod = (args: any, cb: CsiDoneCb) => void;

// Limited definition of topology key from CSI spec.
type TopologyKeys = {
segments: Record<string, string>
};

// Simplified definition of K8s object as defined in the CSI spec.
type K8sVolume = {
volumeId: string,
capacityBytes: number,
accessibleTopology: TopologyKeys[],
};

// When list volumes method does not fit into one reply we store the context
// for the next retrieval.
type ListContext = {
volumes: {
volume: K8sVolume
}[]
};

// Parse mayastor node ID (i.e. mayastor://node-name) and return the node name.
function parseMayastorNodeId (nodeId) {
function parseMayastorNodeId (nodeId: string) {
const parts = nodeId.split('/');

if (
Expand All @@ -51,8 +80,8 @@ function parseMayastorNodeId (nodeId) {
// Check that the list of volume capabilities does not contain unsupported
// capability. Throws grpc error if a capability is not supported.
//
// @param {string[]} caps Volume capabilities as described in CSI spec.
function checkCapabilities (caps) {
// @param caps Volume capabilities as described in CSI spec.
function checkCapabilities (caps: any[]) {
if (!caps) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
Expand All @@ -76,17 +105,12 @@ function checkCapabilities (caps) {
//
// @param {object} volume Volume object.
// @returns {object} K8s CSI volume object.
function createK8sVolumeObject (volume) {
const obj = {
function createK8sVolumeObject (volume: Volume): K8sVolume {
const obj: K8sVolume = {
volumeId: volume.uuid,
capacityBytes: volume.getSize(),
accessibleTopology: []
};
if (volume.protocol.toLowerCase() === 'nbd') {
obj.accessibleTopology.push({
segments: { 'kubernetes.io/hostname': volume.getNodeName() }
});
}
return obj;
}

Expand All @@ -96,11 +120,18 @@ function createK8sVolumeObject (volume) {
// It relies on volume manager, when serving incoming CSI requests, that holds
// information about volumes and provides methods to manipulate them.
class CsiServer {
private server: any;
private ready: boolean;
private registry: any;
private volumes: Volumes | null;
private sockPath: string;
private nextListContextId: number;
private listContexts: Record<string, ListContext>;

// Creates new csi server
//
// @param {string} sockPath Unix domain socket for csi server to listen on.
constructor (sockPath) {
assert.strictEqual(typeof sockPath, 'string');
// @param sockPath Unix domain socket for csi server to listen on.
constructor (sockPath: string) {
this.server = new grpc.Server();
this.ready = false;
this.registry = null;
Expand All @@ -119,8 +150,7 @@ class CsiServer {

// Wrap all controller methods by a check for readiness of the csi server
// and request/response logging to avoid repeating code.
const self = this;
const controllerMethods = {};
const controllerMethods: Record<string, CsiMethod> = {};
let methodNames = [
'createVolume',
'deleteVolume',
Expand All @@ -131,19 +161,22 @@ class CsiServer {
'getCapacity',
'controllerGetCapabilities'
];
// Note: what used to be elegant in JS is a type disaster in TS.
// Dynamic wrapper for calling methods defined on an object.
methodNames.forEach((name) => {
controllerMethods[name] = function checkReady (args, cb) {
controllerMethods[name] = (args, cb) => {
log.trace(`CSI ${name} request: ${JSON.stringify(args)}`);

if (!self.ready) {
if (!this.ready) {
return cb(
new GrpcError(
grpc.status.UNAVAILABLE,
'Not ready for serving requests'
)
);
}
return self[name](args, (err, resp) => {
let csiMethod = <CsiMethod> this[name as keyof CsiServer].bind(this);
return csiMethod(args, (err: any, resp: any) => {
if (err) {
if (!(err instanceof GrpcError)) {
err = new GrpcError(
Expand Down Expand Up @@ -199,19 +232,18 @@ class CsiServer {

// Stop the grpc server.
async stop () {
const self = this;
return new Promise((resolve, reject) => {
log.info('Shutting down grpc server');
self.server.tryShutdown(resolve);
this.server.tryShutdown(resolve);
});
}

// Switch csi server to ready state (returned by identity.probe() method).
// This will enable serving grpc controller service requests.
//
// @param {object} registry Object holding node, replica, pool and nexus objects.
// @param {object} volumes Volume manager.
makeReady (registry, volumes) {
// @param registry Object holding node, replica, pool and nexus objects.
// @param volumes Volume manager.
makeReady (registry: any, volumes: Volumes) {
this.ready = true;
this.registry = registry;
this.volumes = volumes;
Expand All @@ -227,7 +259,7 @@ class CsiServer {
// Implementation of CSI identity methods
//

getPluginInfo (_, cb) {
getPluginInfo (_: any, cb: CsiDoneCb) {
log.debug(
`getPluginInfo request (name=${PLUGIN_NAME}, version=${VERSION})`
);
Expand All @@ -238,7 +270,7 @@ class CsiServer {
});
}

getPluginCapabilities (_, cb) {
getPluginCapabilities (_: any, cb: CsiDoneCb) {
const caps = ['CONTROLLER_SERVICE', 'VOLUME_ACCESSIBILITY_CONSTRAINTS'];
log.debug('getPluginCapabilities request: ' + caps.join(', '));
cb(null, {
Expand All @@ -248,7 +280,7 @@ class CsiServer {
});
}

probe (_, cb) {
probe (_: any, cb: CsiDoneCb) {
log.debug(`probe request (ready=${this.ready})`);
cb(null, { ready: { value: this.ready } });
}
Expand All @@ -257,7 +289,7 @@ class CsiServer {
// Implementation of CSI controller methods
//

async controllerGetCapabilities (_, cb) {
async controllerGetCapabilities (_: any, cb: CsiDoneCb) {
const caps = [
'CREATE_DELETE_VOLUME',
'PUBLISH_UNPUBLISH_VOLUME',
Expand All @@ -272,8 +304,9 @@ class CsiServer {
});
}

async createVolume (call, cb) {
async createVolume (call: any, cb: CsiDoneCb) {
const args = call.request;
assert(this.volumes);

log.debug(
`Request to create volume "${args.name}" with size ` +
Expand Down Expand Up @@ -382,13 +415,8 @@ class CsiServer {
return cb(err);
}

// Enforce local access to the volume for NBD protocol
const accessibleTopology = [];
if (protocol.toLowerCase() === 'nbd') {
accessibleTopology.push({
segments: { 'kubernetes.io/hostname': volume.getNodeName() }
});
}
// This was used in the old days for NBD protocol
const accessibleTopology: TopologyKeys[] = [];
cb(null, {
volume: {
capacityBytes: volume.getSize(),
Expand All @@ -403,8 +431,9 @@ class CsiServer {
});
}

async deleteVolume (call, cb) {
async deleteVolume (call: any, cb: CsiDoneCb) {
const args = call.request;
assert(this.volumes);

log.debug(`Request to destroy volume "${args.volumeId}"`);

Expand All @@ -414,12 +443,13 @@ class CsiServer {
return cb(err);
}
log.info(`Volume "${args.volumeId}" destroyed`);
cb();
cb(null);
}

async listVolumes (call, cb) {
async listVolumes (call: any, cb: CsiDoneCb) {
assert(this.volumes);
const args = call.request;
let ctx = {};
let ctx: ListContext;

if (args.startingToken) {
ctx = this.listContexts[args.startingToken];
Expand Down Expand Up @@ -452,18 +482,19 @@ class CsiServer {

// TODO: purge list contexts older than .. (1 min)
if (ctx.volumes.length > 0) {
const ctxId = this.nextListContextId++;
const ctxId = (this.nextListContextId++).toString();
this.listContexts[ctxId] = ctx;
cb(null, {
entries: entries,
nextToken: ctxId.toString()
nextToken: ctxId,
});
} else {
cb(null, { entries: entries });
}
}

async controllerPublishVolume (call, cb) {
async controllerPublishVolume (call: any, cb: CsiDoneCb) {
assert(this.volumes);
const args = call.request;

log.debug(
Expand Down Expand Up @@ -492,19 +523,6 @@ class CsiServer {
new GrpcError(grpc.status.INVALID_ARGUMENT, 'missing storage protocol')
);
}
if (protocol.toLowerCase() === 'nbd') {
const nodeName = volume.getNodeName();
if (nodeId !== nodeName) {
return cb(
new GrpcError(
grpc.status.INVALID_ARGUMENT,
`Cannot publish the volume "${args.volumeId}" on a different ` +
`node "${nodeId}" than it was created "${nodeName}" when using ` +
`local access protocol ${protocol}`
)
);
}
}
if (args.readonly) {
return cb(
new GrpcError(
Expand All @@ -524,7 +542,7 @@ class CsiServer {
return cb(err);
}

const publishContext = {};
const publishContext: any = {};
try {
publishContext.uri = await volume.publish(protocol);
log.debug(
Expand All @@ -544,7 +562,8 @@ class CsiServer {
cb(null, { publishContext });
}

async controllerUnpublishVolume (call, cb) {
async controllerUnpublishVolume (call: any, cb: CsiDoneCb) {
assert(this.volumes);
const args = call.request;

log.debug(`Request to unpublish volume "${args.volumeId}"`);
Expand All @@ -570,7 +589,8 @@ class CsiServer {
cb(null, {});
}

async validateVolumeCapabilities (call, cb) {
async validateVolumeCapabilities (call: any, cb: CsiDoneCb) {
assert(this.volumes);
const args = call.request;

log.debug(`Request to validate volume capabilities for "${args.volumeId}"`);
Expand All @@ -584,9 +604,9 @@ class CsiServer {
);
}
const caps = args.volumeCapabilities.filter(
(cap) => cap.accessMode.mode === 'SINGLE_NODE_WRITER'
(cap: any) => cap.accessMode.mode === 'SINGLE_NODE_WRITER'
);
const resp = {};
const resp: any = {};
if (caps.length > 0) {
resp.confirmed = { volumeCapabilities: caps };
} else {
Expand All @@ -601,7 +621,7 @@ class CsiServer {
//
// XXX Is the caller interested in total capacity (sum of all pools) or
// a capacity usable by a single volume?
async getCapacity (call, cb) {
async getCapacity (call: any, cb: CsiDoneCb) {
let nodeName;
const args = call.request;

Expand Down
10 changes: 4 additions & 6 deletions csi/moac/nexus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Nexus object implementation.

const _ = require('lodash');
const assert = require('assert');
import assert from 'assert';
import * as _ from 'lodash';

const { GrpcCode, GrpcError, mayastor } = require('./grpc_client');
const log = require('./logger').Logger('nexus');

Expand All @@ -10,15 +11,12 @@ import { Replica } from './replica';
// Protocol used to export nexus (volume)
export enum Protocol {
Unknown = 'unknown',
Nbd = 'nbd',
Iscsi = 'iscsi',
Nvmf = 'nvmf',
}

export function protocolFromString(val: string): Protocol {
if (val == Protocol.Nbd) {
return Protocol.Nbd;
} else if (val == Protocol.Iscsi) {
if (val == Protocol.Iscsi) {
return Protocol.Iscsi;
} else if (val == Protocol.Nvmf) {
return Protocol.Nvmf;
Expand Down
Loading

0 comments on commit da73b74

Please sign in to comment.