Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sample data] handle index aliases when installing/uninstalling datasets #122689

Merged
merged 9 commits into from
Jan 17, 2022
13 changes: 13 additions & 0 deletions src/plugins/home/server/services/sample_data/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export class SampleDataInstallError extends Error {
constructor(message: string, public readonly httpCode: number) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { IScopedClusterClient, Logger } from 'kibana/server';
import type { DataIndexSchema } from './sample_dataset_registry_types';
import {
translateTimeRelativeToDifference,
translateTimeRelativeToWeek,
} from './translate_timestamp';
import { loadData } from './load_data';

export const insertDataIntoIndex = ({
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was extracted from src/plugins/home/server/services/sample_data/routes/install.ts

dataIndexConfig,
logger,
esClient,
index,
nowReference,
}: {
dataIndexConfig: DataIndexSchema;
index: string;
nowReference: string;
esClient: IScopedClusterClient;
logger: Logger;
}) => {
const updateTimestamps = (doc: any) => {
dataIndexConfig.timeFields
.filter((timeFieldName: string) => doc[timeFieldName])
.forEach((timeFieldName: string) => {
doc[timeFieldName] = dataIndexConfig.preserveDayOfWeekTimeOfDay
? translateTimeRelativeToWeek(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
)
: translateTimeRelativeToDifference(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
);
});
return doc;
};

const bulkInsert = async (docs: unknown[]) => {
const insertCmd = { index: { _index: index } };
const bulk: unknown[] = [];
docs.forEach((doc: unknown) => {
bulk.push(insertCmd);
bulk.push(updateTimestamps(doc));
});

const { body: resp } = await esClient.asCurrentUser.bulk({
body: bulk,
});

if (resp.errors) {
const errMsg = `sample_data install errors while bulk inserting. Elasticsearch response: ${JSON.stringify(
resp,
null,
''
)}`;
logger.warn(errMsg);
return Promise.reject(
new Error(`Unable to load sample data into index "${index}", see kibana logs for details`)
);
}
};
return loadData(dataIndexConfig.dataPath, bulkInsert); // this returns a Promise
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import { createUnzip } from 'zlib';

const BULK_INSERT_SIZE = 500;

export function loadData(path: any, bulkInsert: (docs: any[]) => Promise<void>) {
export function loadData(
path: string,
bulkInsert: (docs: unknown[]) => Promise<void>
): Promise<number> {
return new Promise((resolve, reject) => {
let count: number = 0;
let docs: any[] = [];
Expand Down
161 changes: 26 additions & 135 deletions src/plugins/home/server/services/sample_data/routes/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,12 @@
* Side Public License, v 1.
*/

import { Readable } from 'stream';
import { schema } from '@kbn/config-schema';
import { IRouter, Logger, IScopedClusterClient } from 'src/core/server';
import { IRouter, Logger } from 'src/core/server';
import { SampleDatasetSchema } from '../lib/sample_dataset_registry_types';
import { createIndexName } from '../lib/create_index_name';
import {
dateToIso8601IgnoringTime,
translateTimeRelativeToDifference,
translateTimeRelativeToWeek,
} from '../lib/translate_timestamp';
import { loadData } from '../lib/load_data';
import { SampleDataUsageTracker } from '../usage/usage';
import { getSavedObjectsClient } from './utils';
import { getUniqueObjectTypes } from '../lib/utils';

const insertDataIntoIndex = (
dataIndexConfig: any,
index: string,
nowReference: string,
esClient: IScopedClusterClient,
logger: Logger
) => {
function updateTimestamps(doc: any) {
dataIndexConfig.timeFields
.filter((timeFieldName: string) => doc[timeFieldName])
.forEach((timeFieldName: string) => {
doc[timeFieldName] = dataIndexConfig.preserveDayOfWeekTimeOfDay
? translateTimeRelativeToWeek(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
)
: translateTimeRelativeToDifference(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
);
});
return doc;
}

const bulkInsert = async (docs: any) => {
const insertCmd = { index: { _index: index } };
const bulk: any[] = [];
docs.forEach((doc: any) => {
bulk.push(insertCmd);
bulk.push(updateTimestamps(doc));
});

const { body: resp } = await esClient.asCurrentUser.bulk({
body: bulk,
});

if (resp.errors) {
const errMsg = `sample_data install errors while bulk inserting. Elasticsearch response: ${JSON.stringify(
resp,
null,
''
)}`;
logger.warn(errMsg);
return Promise.reject(
new Error(`Unable to load sample data into index "${index}", see kibana logs for details`)
);
}
};
return loadData(dataIndexConfig.dataPath, bulkInsert); // this returns a Promise
};
import { getSampleDataInstaller } from './utils';
import { SampleDataInstallError } from '../errors';

export function createInstallRoute(
router: IRouter,
Expand All @@ -95,86 +34,38 @@ export function createInstallRoute(
if (!sampleDataset) {
return res.notFound();
}

// @ts-ignore Custom query validation used
const now = query.now ? new Date(query.now) : new Date();
const nowReference = dateToIso8601IgnoringTime(now);
const counts = {};
for (let i = 0; i < sampleDataset.dataIndices.length; i++) {
const dataIndexConfig = sampleDataset.dataIndices[i];
const index = createIndexName(sampleDataset.id, dataIndexConfig.id);

// clean up any old installation of dataset
try {
await context.core.elasticsearch.client.asCurrentUser.indices.delete({
index,
});
} catch (err) {
// ignore delete errors
}
const sampleDataInstaller = getSampleDataInstaller({
datasetId: sampleDataset.id,
sampleDatasets,
logger,
context,
});

try {
await context.core.elasticsearch.client.asCurrentUser.indices.create({
index,
try {
const installResult = await sampleDataInstaller.install(params.id, now);
// track the usage operation in a non-blocking way
usageTracker.addInstall(params.id);
return res.ok({
body: {
elasticsearchIndicesCreated: installResult.createdDocsPerIndex,
kibanaSavedObjectsLoaded: installResult.createdSavedObjects,
},
});
} catch (e) {
if (e instanceof SampleDataInstallError) {
return res.customError({
body: {
settings: { index: { number_of_shards: 1, auto_expand_replicas: '0-1' } },
mappings: { properties: dataIndexConfig.fields },
message: e.message,
},
statusCode: e.httpCode,
});
} catch (err) {
const errMsg = `Unable to create sample data index "${index}", error: ${err.message}`;
logger.warn(errMsg);
return res.customError({ body: errMsg, statusCode: err.status });
}

try {
const count = await insertDataIntoIndex(
dataIndexConfig,
index,
nowReference,
context.core.elasticsearch.client,
logger
);
(counts as any)[index] = count;
} catch (err) {
const errMsg = `sample_data install errors while loading data. Error: ${err}`;
throw new Error(errMsg);
}
throw e;
}

const { getImporter } = context.core.savedObjects;
const objectTypes = getUniqueObjectTypes(sampleDataset.savedObjects);
const savedObjectsClient = getSavedObjectsClient(context, objectTypes);
const importer = getImporter(savedObjectsClient);

const savedObjects = sampleDataset.savedObjects.map(({ version, ...obj }) => obj);
const readStream = Readable.from(savedObjects);

try {
const { errors = [] } = await importer.import({
readStream,
overwrite: true,
createNewCopies: false,
});
if (errors.length > 0) {
const errMsg = `sample_data install errors while loading saved objects. Errors: ${JSON.stringify(
errors.map(({ type, id, error }) => ({ type, id, error })) // discard other fields
)}`;
logger.warn(errMsg);
return res.customError({ body: errMsg, statusCode: 500 });
}
} catch (err) {
const errMsg = `import failed, error: ${err.message}`;
throw new Error(errMsg);
}
usageTracker.addInstall(params.id);

// FINALLY
return res.ok({
body: {
elasticsearchIndicesCreated: counts,
kibanaSavedObjectsLoaded: sampleDataset.savedObjects.length,
},
});
}
);
}
76 changes: 22 additions & 54 deletions src/plugins/home/server/services/sample_data/routes/uninstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
* Side Public License, v 1.
*/

import { isBoom } from '@hapi/boom';
import { schema } from '@kbn/config-schema';
import type { IRouter, Logger } from 'src/core/server';
import { SampleDatasetSchema } from '../lib/sample_dataset_registry_types';
import { createIndexName } from '../lib/create_index_name';
import { SampleDataUsageTracker } from '../usage/usage';
import { findSampleObjects } from '../lib/find_sample_objects';
import { getUniqueObjectTypes } from '../lib/utils';
import { getSavedObjectsClient } from './utils';
import { getSampleDataInstaller } from './utils';
import { SampleDataInstallError } from '../errors';

export function createUninstallRoute(
router: IRouter,
Expand All @@ -31,62 +28,33 @@ export function createUninstallRoute(
},
async (context, request, response) => {
const sampleDataset = sampleDatasets.find(({ id }) => id === request.params.id);

if (!sampleDataset) {
return response.notFound();
}

for (let i = 0; i < sampleDataset.dataIndices.length; i++) {
const dataIndexConfig = sampleDataset.dataIndices[i];
const index = createIndexName(sampleDataset.id, dataIndexConfig.id);

try {
// TODO: don't delete the index if sample data exists in other spaces (#116677)
await context.core.elasticsearch.client.asCurrentUser.indices.delete({ index });
} catch (err) {
// if the index doesn't exist, ignore the error and proceed
if (err.body.status !== 404) {
return response.customError({
statusCode: err.body.status,
body: {
message: `Unable to delete sample data index "${index}", error: ${err.body.error.type}`,
},
});
}
}
}

const objects = sampleDataset.savedObjects.map(({ type, id }) => ({ type, id }));
const objectTypes = getUniqueObjectTypes(objects);
const client = getSavedObjectsClient(context, objectTypes);
const findSampleObjectsResult = await findSampleObjects({ client, logger, objects });

const objectsToDelete = findSampleObjectsResult.filter(({ foundObjectId }) => foundObjectId);
const deletePromises = objectsToDelete.map(({ type, foundObjectId }) =>
client.delete(type, foundObjectId!).catch((err) => {
// if the object doesn't exist, ignore the error and proceed
if (isBoom(err) && err.output.statusCode === 404) {
return;
}
throw err;
})
);
const sampleDataInstaller = getSampleDataInstaller({
datasetId: sampleDataset.id,
sampleDatasets,
logger,
context,
});

try {
await Promise.all(deletePromises);
} catch (err) {
return response.customError({
statusCode: err.body.status,
body: {
message: `Unable to delete sample dataset saved objects, error: ${err.body.error.type}`,
},
});
await sampleDataInstaller.uninstall(request.params.id);
// track the usage operation in a non-blocking way
usageTracker.addUninstall(request.params.id);
return response.noContent();
} catch (e) {
if (e instanceof SampleDataInstallError) {
return response.customError({
body: {
message: e.message,
},
statusCode: e.httpCode,
});
}
throw e;
}

// track the usage operation in a non-blocking way
usageTracker.addUninstall(request.params.id);

return response.noContent();
}
);
}
Loading