Skip to content

Commit

Permalink
[Files] Add support for using ES Data Streams for files (metadata + b…
Browse files Browse the repository at this point in the history
…lob storage) (#160437)

## Summary

Adds support to the Files plugin for DataStreams. Changes include:

- use `op_type` of `create` on all document creations
- When the `indexIsAlias` option is used:
- check for index existence will not be done, thus Index will NOT be
automatically created if it does not yet exist
- `@timestamp` top-level document property will be written for the
metadata as well as each file chunk
- Fixes `ElasticsearchBlobStorageClient.createIndexIfNotExists()` to
ensure it is executed no more than once per index name
- added `.catch(wrapErrorAndReThrow)` to several `esClient` calls in
order to get better stack traces for failures


### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
paul-tavares and kibanamachine authored Jun 30, 2023
1 parent 4486383 commit a458b99
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from '.
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index';
import * as cborx from 'cbor-x';
import { IndexRequest } from '@elastic/elasticsearch/lib/api/types';

describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
Expand Down Expand Up @@ -282,12 +284,14 @@ describe('ContentStream', () => {
});

it('should emit an error event', async () => {
client.index.mockRejectedValueOnce('some error');
client.index.mockRejectedValueOnce(new Error('some error'));

stream.end('data');
const error = await new Promise((resolve) => stream.once('error', resolve));

expect(error).toBe('some error');
expect((error as Error).toString()).toEqual(
'FilesPluginError: ContentStream.indexChunk(): some error'
);
});

it('should remove all previous chunks before writing', async () => {
Expand Down Expand Up @@ -405,5 +409,15 @@ describe('ContentStream', () => {

expect(deleteRequest).toHaveProperty('query.bool.must.match.bid', 'something');
});

it('should write @timestamp if `indexIsAlias` is true', async () => {
stream = new ContentStream(client, undefined, 'somewhere', logger, undefined, true);
stream.end('some data');
await new Promise((resolve) => stream.once('finish', resolve));
const docBuffer = (client.index.mock.calls[0][0] as IndexRequest).document as Buffer;
const docData = cborx.decode(docBuffer);

expect(docData).toHaveProperty('@timestamp');
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Duplex, Writable, Readable } from 'stream';

import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { inspect } from 'util';
import { wrapErrorAndReThrow } from '../../../../file_client/utils';
import type { FileChunkDocument } from '../mappings';

type Callback = (error?: Error) => void;
Expand Down Expand Up @@ -238,27 +239,29 @@ export class ContentStream extends Duplex {
}

private async indexChunk({ bid, data, id, index }: IndexRequestParams, last?: true) {
await this.client.index(
{
id,
index,
document: cborx.encode(
last
? {
data,
bid,
last,
}
: { data, bid }
),
},
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
await this.client
.index(
{
id,
index,
op_type: 'create',
document: cborx.encode({
data,
bid,
// Mark it as last?
...(last ? { last } : {}),
// Add `@timestamp` for Index Alias/DS?
...(this.indexIsAlias ? { '@timestamp': new Date().toISOString() } : {}),
}),
},
}
);
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
},
}
)
.catch(wrapErrorAndReThrow.withMessagePrefix('ContentStream.indexChunk(): '));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,51 @@

import { Readable } from 'stream';
import { promisify } from 'util';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { Semaphore } from '@kbn/std';

import { ElasticsearchBlobStorageClient } from './es';
import { errors } from '@elastic/elasticsearch';

const setImmediate = promisify(global.setImmediate);

describe('ElasticsearchBlobStorageClient', () => {
let esClient: ElasticsearchClient;
let blobStoreClient: ElasticsearchBlobStorageClient;
let esClient: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
let semaphore: Semaphore;
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
semaphore = new Semaphore(1);
esClient = elasticsearchServiceMock.createElasticsearchClient();
blobStoreClient = new ElasticsearchBlobStorageClient(
// Exposed `clearCache()` which resets the cache for the memoized `createIndexIfNotExists()` method
class ElasticsearchBlobStorageClientWithCacheClear extends ElasticsearchBlobStorageClient {
static clearCache() {
// @ts-expect-error TS2722: Cannot invoke an object which is possibly 'undefined' (??)
this.createIndexIfNotExists.cache.clear();
}
}

const createBlobStoreClient = (index?: string, indexIsAlias: boolean = false) => {
ElasticsearchBlobStorageClientWithCacheClear.clearCache();

return new ElasticsearchBlobStorageClientWithCacheClear(
esClient,
index,
undefined,
undefined,
loggingSystemMock.createLogger(),
semaphore
logger,
semaphore,
indexIsAlias
);
};

beforeEach(() => {
semaphore = new Semaphore(1);
logger = loggingSystemMock.createLogger();
esClient = elasticsearchServiceMock.createElasticsearchClient();
});

test('limits max concurrent uploads', async () => {
const blobStoreClient = createBlobStoreClient();
const acquireSpy = jest.spyOn(semaphore, 'acquire');
(esClient.index as jest.Mock).mockImplementation(() => {
esClient.index.mockImplementation(() => {
return new Promise((res, rej) => setTimeout(() => rej('failed'), 100));
});
const [p1, p2, ...rest] = [
Expand All @@ -54,4 +70,59 @@ describe('ElasticsearchBlobStorageClient', () => {
await Promise.all(rest);
expect(esClient.index).toHaveBeenCalledTimes(4);
});

describe('.createIndexIfNotExists()', () => {
let data: Readable;

beforeEach(() => {
data = Readable.from(['test']);
});

it('should create index if it does not exist', async () => {
esClient.indices.exists.mockResolvedValue(false);
const blobStoreClient = await createBlobStoreClient('foo1');

await blobStoreClient.upload(data);
expect(logger.info).toHaveBeenCalledWith(
'Creating [foo1] index for Elasticsearch blob store.'
);

// Calling a second time should do nothing
logger.info.mockClear();
await blobStoreClient.upload(data);

expect(logger.info).not.toHaveBeenCalledWith(
'Creating [foo1] index for Elasticsearch blob store.'
);
});

it('should not create index if it already exists', async () => {
esClient.indices.exists.mockResolvedValue(true);
await createBlobStoreClient('foo1').upload(data);

expect(logger.debug).toHaveBeenCalledWith('[foo1] already exists. Nothing to do');
});

it('should not create index if `indexIsAlias` is `true`', async () => {
await createBlobStoreClient('foo1', true).upload(data);

expect(logger.debug).toHaveBeenCalledWith(
'No need to create index [foo1] as it is an Alias or DS.'
);
});

it('should not reject if it is unable to create the index (best effort)', async () => {
esClient.indices.exists.mockResolvedValue(false);
esClient.indices.create.mockRejectedValue(
new errors.ResponseError({
statusCode: 400,
} as ConstructorParameters<typeof errors.ResponseError>[0])
);
await createBlobStoreClient('foo1', false).upload(data);

expect(logger.warn).toHaveBeenCalledWith(
'Unable to create blob storage index [foo1], it may have been created already.'
);
});
});
});
39 changes: 25 additions & 14 deletions src/plugins/files/server/blob_storage_service/adapters/es/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/

import assert from 'assert';
import { once } from 'lodash';
import { errors } from '@elastic/elasticsearch';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import { Semaphore } from '@kbn/std';
Expand All @@ -16,6 +15,7 @@ import { pipeline } from 'stream/promises';
import { promisify } from 'util';
import { lastValueFrom, defer } from 'rxjs';
import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools';
import { memoize } from 'lodash';
import { FilesPlugin } from '../../../plugin';
import { FILE_UPLOAD_PERFORMANCE_EVENT_NAME } from '../../../performance';
import type { BlobStorageClient } from '../../types';
Expand Down Expand Up @@ -65,23 +65,29 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
}

/**
* This function acts as a singleton i.t.o. execution: it can only be called once.
* Subsequent calls should not re-execute it.
*
* There is a known issue where calling this function simultaneously can result
* in a race condition where one of the calls will fail because the index is already
* being created. This is only an issue for the very first time the index is being
* created.
* This function acts as a singleton i.t.o. execution: it can only be called once per index.
* Subsequent calls for the same index should not re-execute it.
*/
private static createIndexIfNotExists = once(
async (index: string, esClient: ElasticsearchClient, logger: Logger): Promise<void> => {
protected static createIndexIfNotExists = memoize(
async (
index: string,
esClient: ElasticsearchClient,
logger: Logger,
indexIsAlias: boolean
): Promise<void> => {
// We don't attempt to create the index if it is an Alias/DS
if (indexIsAlias) {
logger.debug(`No need to create index [${index}] as it is an Alias or DS.`);
return;
}

try {
if (await esClient.indices.exists({ index })) {
logger.debug(`${index} already exists.`);
logger.debug(`[${index}] already exists. Nothing to do`);
return;
}

logger.info(`Creating ${index} for Elasticsearch blob store.`);
logger.info(`Creating [${index}] index for Elasticsearch blob store.`);

await esClient.indices.create({
index,
Expand All @@ -96,7 +102,9 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
});
} catch (e) {
if (e instanceof errors.ResponseError && e.statusCode === 400) {
logger.warn('Unable to create blob storage index, it may have been created already.');
logger.warn(
`Unable to create blob storage index [${index}], it may have been created already.`
);
}
// best effort
}
Expand All @@ -109,7 +117,8 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
await ElasticsearchBlobStorageClient.createIndexIfNotExists(
this.index,
this.esClient,
this.logger
this.logger,
this.indexIsAlias
);

const processUpload = async () => {
Expand All @@ -123,6 +132,7 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
parameters: {
maxChunkSize: this.chunkSize,
},
indexIsAlias: this.indexIsAlias,
});

const start = performance.now();
Expand Down Expand Up @@ -183,6 +193,7 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
client: this.esClient,
index: this.index,
logger: this.logger.get('content-stream-delete'),
indexIsAlias: this.indexIsAlias,
});
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await promisify(dest.end.bind(dest, '', 'utf8'))();
Expand Down
5 changes: 4 additions & 1 deletion src/plugins/files/server/file/file.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { FileMetadataClient } from '../file_client';
import { SavedObjectsFileMetadataClient } from '../file_client/file_metadata_client/adapters/saved_objects';
import { File as IFile } from '../../common';
import { createFileHashTransform } from '..';
import { FilesPluginError } from '../file_client/utils';

const setImmediate = promisify(global.setImmediate);

Expand Down Expand Up @@ -82,7 +83,9 @@ describe('File', () => {
const [{ returnValue: blobStore }] = createBlobSpy.getCalls();
const blobStoreSpy = sandbox.spy(blobStore, 'delete');
expect(blobStoreSpy.calledOnce).toBe(false);
await expect(file.uploadContent(Readable.from(['test']))).rejects.toThrow(new Error('test'));
await expect(file.uploadContent(Readable.from(['test']))).rejects.toThrow(
new FilesPluginError('ContentStream.indexChunk(): test')
);
await setImmediate();
expect(blobStoreSpy.calledOnce).toBe(true);
});
Expand Down
9 changes: 5 additions & 4 deletions src/plugins/files/server/file_client/create_es_file_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ export interface CreateEsFileClientArgs {
*/
elasticsearchClient: ElasticsearchClient;
/**
* Treat the indices provided as Aliases. If set to true, ES `search()` will be used to
* retrieve the file info and content instead of `get()`. This is needed to ensure the
* content can be retrieved in cases where an index may have rolled over (ES `get()`
* needs a "real" index)
* Treat the indices provided as Aliases/Datastreams.
* When set to `true`:
* - additional ES calls will be made to get the real backing indexes
* - will not check if indexes exists and attempt to create them if not
* - an additional `@timestamp` property will be written to all documents (at root of document)
*/
indexIsAlias?: boolean;
/**
Expand Down
Loading

0 comments on commit a458b99

Please sign in to comment.