Skip to content

Commit

Permalink
[Fleet] Migrate ES client (#92805)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
Alejandro Fernández Gómez and kibanamachine authored Mar 8, 2021
1 parent 68ed17a commit 2aa0267
Show file tree
Hide file tree
Showing 28 changed files with 465 additions and 632 deletions.
2 changes: 1 addition & 1 deletion api_docs/fleet.json
Original file line number Diff line number Diff line change
Expand Up @@ -2453,7 +2453,7 @@
"description": [],
"source": {
"path": "x-pack/plugins/fleet/server/services/package_policy.ts",
"lineNumber": 594
"lineNumber": 592
},
"signature": [
"PackagePolicyService"
Expand Down
3 changes: 1 addition & 2 deletions x-pack/plugins/fleet/server/routes/agent_policy/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ export const createAgentPolicyHandler: RequestHandler<
> = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const user = (await appContextService.getSecurity()?.authc.getCurrentUser(request)) || undefined;
const withSysMonitoring = request.query.sys_monitoring ?? false;
try {
Expand All @@ -130,7 +129,7 @@ export const createAgentPolicyHandler: RequestHandler<
if (withSysMonitoring && newSysPackagePolicy !== undefined && agentPolicy !== undefined) {
newSysPackagePolicy.policy_id = agentPolicy.id;
newSysPackagePolicy.namespace = agentPolicy.namespace;
await packagePolicyService.create(soClient, esClient, callCluster, newSysPackagePolicy, {
await packagePolicyService.create(soClient, esClient, newSysPackagePolicy, {
user,
bumpRevision: false,
});
Expand Down
78 changes: 38 additions & 40 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,59 @@ import { defaultIngestErrorHandler } from '../../errors';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*';

interface ESDataStreamInfoResponse {
data_streams: Array<{
interface ESDataStreamInfo {
name: string;
timestamp_field: {
name: string;
timestamp_field: {
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}>;
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}

interface ESDataStreamStatsResponse {
data_streams: Array<{
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}>;
interface ESDataStreamStats {
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}

export const getListHandler: RequestHandler = async (context, request, response) => {
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;

const body: GetDataStreamsResponse = {
data_streams: [],
};

try {
// Get matching data streams, their stats, and package SOs
const [
{ data_streams: dataStreamsInfo },
{ data_streams: dataStreamStats },
{
body: { data_streams: dataStreamsInfo },
},
{
body: { data_streams: dataStreamStats },
},
packageSavedObjects,
] = await Promise.all([
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
}) as Promise<ESDataStreamInfoResponse>,
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
}) as Promise<ESDataStreamStatsResponse>,
esClient.indices.getDataStream({ name: DATA_STREAM_INDEX_PATTERN }),
esClient.indices.dataStreamsStats({ name: DATA_STREAM_INDEX_PATTERN }),
getPackageSavedObjects(context.core.savedObjects.client),
]);
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');

const dataStreamsInfoByName = keyBy<ESDataStreamInfo>(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy<ESDataStreamStats>(dataStreamStats, 'data_stream');

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
Expand All @@ -99,8 +95,10 @@ export const getListHandler: RequestHandler = async (context, request, response)

// Query backing indices to extract data stream dataset, namespace, and type values
const {
aggregations: { dataset, namespace, type },
} = await callCluster('search', {
body: {
aggregations: { dataset, namespace, type },
},
} = await esClient.search({
index: dataStream.indices.map((index) => index.index_name),
body: {
size: 0,
Expand Down
18 changes: 9 additions & 9 deletions x-pack/plugins/fleet/server/routes/epm/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
TypeOf<typeof InstallPackageFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const { pkgkey } = request.params;
const { pkgName, pkgVersion } = splitPkgKey(pkgkey);
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
Expand All @@ -235,7 +235,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
installSource: 'registry',
savedObjectsClient,
pkgkey,
callCluster,
esClient,
force: request.body?.force,
});
const body: InstallPackageResponse = {
Expand All @@ -250,7 +250,7 @@ export const installPackageFromRegistryHandler: RequestHandler<
pkgName,
pkgVersion,
installedPkg,
callCluster,
esClient,
});

return defaultResult;
Expand Down Expand Up @@ -278,10 +278,10 @@ export const bulkInstallPackagesFromRegistryHandler: RequestHandler<
TypeOf<typeof BulkUpgradePackagesFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const bulkInstalledResponses = await bulkInstallPackages({
savedObjectsClient,
callCluster,
esClient,
packagesToUpgrade: request.body.packages,
});
const payload = bulkInstalledResponses.map(bulkInstallServiceResponseToHttpEntry);
Expand All @@ -303,14 +303,14 @@ export const installPackageByUploadHandler: RequestHandler<
});
}
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const contentType = request.headers['content-type'] as string; // from types it could also be string[] or undefined but this is checked later
const archiveBuffer = Buffer.from(request.body);
try {
const res = await installPackage({
installSource: 'upload',
savedObjectsClient,
callCluster,
esClient,
archiveBuffer,
contentType,
});
Expand All @@ -329,8 +329,8 @@ export const deletePackageHandler: RequestHandler<
try {
const { pkgkey } = request.params;
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const res = await removeInstallation({ savedObjectsClient, pkgkey, callCluster });
const esClient = context.core.elasticsearch.client.asCurrentUser;
const res = await removeInstallation({ savedObjectsClient, pkgkey, esClient });
const body: DeletePackageResponse = {
response: res,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jest.mock('../../services/package_policy', (): {
compilePackagePolicyInputs: jest.fn((packageInfo, dataInputs) => Promise.resolve(dataInputs)),
buildPackagePolicyFromPackage: jest.fn(),
bulkCreate: jest.fn(),
create: jest.fn((soClient, esClient, callCluster, newData) =>
create: jest.fn((soClient, esClient, newData) =>
Promise.resolve({
...newData,
inputs: newData.inputs.map((input) => ({
Expand Down Expand Up @@ -204,7 +204,8 @@ describe('When calling package policy', () => {
);
await routeHandler(context, request, response);
expect(response.ok).toHaveBeenCalled();
expect(packagePolicyServiceMock.create.mock.calls[0][3]).toEqual({

expect(packagePolicyServiceMock.create.mock.calls[0][2]).toEqual({
policy_id: 'a5ca00c0-b30c-11ea-9732-1bb05811278c',
description: '',
enabled: true,
Expand Down
13 changes: 3 additions & 10 deletions x-pack/plugins/fleet/server/routes/package_policy/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ export const createPackagePolicyHandler: RequestHandler<
> = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const user = (await appContextService.getSecurity()?.authc.getCurrentUser(request)) || undefined;
try {
const newData = await packagePolicyService.runExternalCallbacks(
Expand All @@ -90,15 +89,9 @@ export const createPackagePolicyHandler: RequestHandler<
);

// Create package policy
const packagePolicy = await packagePolicyService.create(
soClient,
esClient,
callCluster,
newData,
{
user,
}
);
const packagePolicy = await packagePolicyService.create(soClient, esClient, newData, {
user,
});
const body: CreatePackagePolicyResponse = { item: packagePolicy };
return response.ok({
body,
Expand Down
8 changes: 3 additions & 5 deletions x-pack/plugins/fleet/server/routes/setup/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ export const createFleetSetupHandler: RequestHandler<
try {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
await setupIngestManager(soClient, esClient, callCluster);
await setupFleet(soClient, esClient, callCluster, {
await setupIngestManager(soClient, esClient);
await setupFleet(soClient, esClient, {
forceRecreate: request.body?.forceRecreate ?? false,
});

Expand All @@ -80,11 +79,10 @@ export const createFleetSetupHandler: RequestHandler<
export const FleetSetupHandler: RequestHandler = async (context, request, response) => {
const soClient = context.core.savedObjects.client;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;

try {
const body: PostIngestSetupResponse = { isInitialized: true };
await setupIngestManager(soClient, esClient, callCluster);
await setupIngestManager(soClient, esClient);
return response.ok({
body,
});
Expand Down
11 changes: 0 additions & 11 deletions x-pack/plugins/fleet/server/services/api_keys/security.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { Request } from '@hapi/hapi';
import { KibanaRequest } from '../../../../../../src/core/server';
import type { SavedObjectsClientContract } from '../../../../../../src/core/server';
import { FleetAdminUserInvalidError, isESClientError } from '../../errors';
import type { CallESAsCurrentUser } from '../../types';
import { appContextService } from '../app_context';
import { outputService } from '../output';

Expand Down Expand Up @@ -56,16 +55,6 @@ export async function createAPIKey(
throw err;
}
}
export async function authenticate(callCluster: CallESAsCurrentUser) {
try {
await callCluster('transport.request', {
path: '/_security/_authenticate',
method: 'GET',
});
} catch (e) {
throw new Error('ApiKey is not valid: impossible to authenticate user');
}
}

export async function invalidateAPIKeys(soClient: SavedObjectsClientContract, ids: string[]) {
const adminUser = await outputService.getAdminUser(soClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
* 2.0.
*/

import type { SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';

import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type {
EsAssetReference,
InstallablePackage,
RegistryDataStream,
} from '../../../../../common/types/models';
import type { CallESAsCurrentUser } from '../../../../types';
import { getInstallation } from '../../packages';
import { saveInstalledEsRefs } from '../../packages/install';
import { getAsset } from '../transform/common';
Expand All @@ -33,7 +32,7 @@ interface IlmPathDataset {
export const installIlmForDataStream = async (
registryPackage: InstallablePackage,
paths: string[],
callCluster: CallESAsCurrentUser,
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
) => {
const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name });
Expand All @@ -46,7 +45,7 @@ export const installIlmForDataStream = async (

// delete all previous ilm
await deleteIlms(
callCluster,
esClient,
previousInstalledIlmEsAssets.map((asset) => asset.id)
);
// install the latest dataset
Expand Down Expand Up @@ -86,7 +85,7 @@ export const installIlmForDataStream = async (
);

const installationPromises = ilmInstallations.map(async (ilmInstallation) => {
return handleIlmInstall({ callCluster, ilmInstallation });
return handleIlmInstall({ esClient, ilmInstallation });
});

installedIlms = await Promise.all(installationPromises).then((results) => results.flat());
Expand All @@ -111,13 +110,13 @@ export const installIlmForDataStream = async (
};

async function handleIlmInstall({
callCluster,
esClient,
ilmInstallation,
}: {
callCluster: CallESAsCurrentUser;
esClient: ElasticsearchClient;
ilmInstallation: IlmInstallation;
}): Promise<EsAssetReference> {
await callCluster('transport.request', {
await esClient.transport.request({
method: 'PUT',
path: `/_ilm/policy/${ilmInstallation.installationName}`,
body: ilmInstallation.content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
* 2.0.
*/

import type { SavedObjectsClientContract } from 'kibana/server';
import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';

import { ElasticsearchAssetType } from '../../../../types';
import type { CallESAsCurrentUser, EsAssetReference } from '../../../../types';
import type { EsAssetReference } from '../../../../types';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common/constants';

export const deleteIlms = async (callCluster: CallESAsCurrentUser, ilmPolicyIds: string[]) => {
export const deleteIlms = async (esClient: ElasticsearchClient, ilmPolicyIds: string[]) => {
await Promise.all(
ilmPolicyIds.map(async (ilmPolicyId) => {
await callCluster('transport.request', {
method: 'DELETE',
path: `_ilm/policy/${ilmPolicyId}`,
ignore: [404, 400],
});
await esClient.transport.request(
{
method: 'DELETE',
path: `_ilm/policy/${ilmPolicyId}`,
},
{
ignore: [404, 400],
}
);
})
);
};
Expand Down
Loading

0 comments on commit 2aa0267

Please sign in to comment.