Skip to content

Commit

Permalink
Modify flow of the service map initialization to handle less permissive
Browse files Browse the repository at this point in the history
default security settings. The kibana user is responsible for creating
and index to the `apm-service-connections` data index, while the apm
user is resposible for kicking off the scheduled task and reading from
`apm-*` indices.
  • Loading branch information
ogupte committed Dec 21, 2019
1 parent d7734ff commit f0aec81
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 134 deletions.
8 changes: 0 additions & 8 deletions x-pack/legacy/plugins/apm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { APMPluginContract } from '../../../plugins/apm/server';
import { LegacyPluginInitializer } from '../../../../src/legacy/types';
import mappings from './mappings.json';
import { makeApmUsageCollector } from './server/lib/apm_telemetry';
import { initializeServiceMaps } from './server/lib/service_map/initialize_service_maps';

export const apm: LegacyPluginInitializer = kibana => {
return new kibana.Plugin({
Expand Down Expand Up @@ -78,15 +77,12 @@ export const apm: LegacyPluginInitializer = kibana => {
autocreateApmIndexPattern: Joi.boolean().default(true),

// service map

serviceMapEnabled: Joi.boolean().default(false)
}).default();
},

// TODO: get proper types
init(server: Server) {
const config = server.config();

server.plugins.xpack_main.registerFeature({
id: 'apm',
name: i18n.translate('xpack.apm.featureRegistry.apmFeatureName', {
Expand Down Expand Up @@ -123,10 +119,6 @@ export const apm: LegacyPluginInitializer = kibana => {
.apm as APMPluginContract;

apmPlugin.registerLegacyAPI({ server });

if (config.get('xpack.apm.serviceMapEnabled')) {
initializeServiceMaps(server);
}
}
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { useUrlParams } from '../../../hooks/useUrlParams';
import { Controls } from './Controls';
import { Cytoscape } from './Cytoscape';
import { PlatinumLicensePrompt } from './PlatinumLicensePrompt';
import { useApmPluginContext } from '../../../hooks/useApmPluginContext';
import { callApi } from '../../../services/rest/callApi';

interface ServiceMapProps {
serviceName?: string;
Expand Down Expand Up @@ -51,9 +53,21 @@ export function ServiceMap({ serviceName }: ServiceMapProps) {
[uiFilters]
);

const { http } = useApmPluginContext().core;
const { data: serviceMapStartResponse } = useFetcher(async () => {
const response = await callApi<{
taskStatus: 'initialized' | 'active';
}>(http, {
method: 'GET',
pathname: `/api/apm/service-map-start-task`
});
return response;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [http]);

const { data } = useFetcher(
callApmApi => {
if (start && end) {
if (start && end && serviceMapStartResponse) {
return callApmApi({
pathname: '/api/apm/service-map',
params: {
Expand All @@ -68,23 +82,36 @@ export function ServiceMap({ serviceName }: ServiceMapProps) {
});
}
},
[start, end, uiFiltersOmitEnv, environment, serviceName]
[
start,
end,
uiFiltersOmitEnv,
environment,
serviceName,
serviceMapStartResponse
]
);

const elements = Array.isArray(data) ? data : [];
const license = useLicense();
const isValidPlatinumLicense =
license?.isActive && license?.type === 'platinum';
true ||
(license?.isActive &&
(license?.type === 'platinum' || license?.type === 'trial'));

return isValidPlatinumLicense ? (
<Cytoscape
elements={elements}
serviceName={serviceName}
style={cytoscapeDivStyle}
>
<Controls />
</Cytoscape>
) : (
<PlatinumLicensePrompt />
return (
<>
{isValidPlatinumLicense ? (
<Cytoscape
elements={elements}
serviceName={serviceName}
style={cytoscapeDivStyle}
>
<Controls />
</Cytoscape>
) : (
<PlatinumLicensePrompt />
)}
</>
);
}
7 changes: 6 additions & 1 deletion x-pack/legacy/plugins/apm/server/lib/helpers/es_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
IndexDocumentParams,
IndicesDeleteParams,
IndicesCreateParams,
BulkIndexDocumentsParams
BulkIndexDocumentsParams,
IndicesExistsParams
} from 'elasticsearch';
import { merge } from 'lodash';
import { cloneDeep, isString } from 'lodash';
Expand Down Expand Up @@ -143,6 +144,10 @@ export function getESClient(
onRequest('indices.create', params);
return esClient('indices.create', params);
},
indexExists: (params: IndicesExistsParams) => {
onRequest('indices.exists', params);
return esClient('indices.exists', params);
},
bulk: (params: BulkIndexDocumentsParams) => {
onRequest('bulk', params);
return esClient('bulk', params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,30 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Server } from 'hapi';
import { APMPluginContract } from '../../../../../../plugins/apm/server';
import { getInternalSavedObjectsClient } from '../helpers/saved_objects_client';
import { CallCluster } from '../../../../../../../src/legacy/core_plugins/elasticsearch';
import { TIMESTAMP } from '../../../common/elasticsearch_fieldnames';
import { Setup } from '../helpers/setup_request';

export async function createServiceConnectionsIndex(server: Server) {
const callCluster = server.plugins.elasticsearch.getCluster('data')
.callWithInternalUser;
const apmPlugin = server.newPlatform.setup.plugins.apm as APMPluginContract;
export async function createServiceConnectionsIndex(setup: Setup) {
const { internalClient, indices } = setup;
const index = indices.apmServiceConnectionsIndex;

try {
const apmIndices = await apmPlugin.getApmIndices(
getInternalSavedObjectsClient(server)
);
const index = apmIndices.apmServiceConnectionsIndex;
const indexExists = await callCluster('indices.exists', { index });
if (!indexExists) {
const result = await createNewIndex(index, callCluster);
const indexExists = await internalClient.indexExists({ index });

if (!result.acknowledged) {
const resultError =
result && result.error && JSON.stringify(result.error);
throw new Error(
`Unable to create APM Service Connections index '${index}': ${resultError}`
);
}
if (!indexExists) {
const result = await createNewIndex(index, internalClient);

if (!result.acknowledged) {
const resultError =
result && result.error && JSON.stringify(result.error);
throw new Error(
`Unable to create APM Service Connections index '${index}': ${resultError}`
);
}
} catch (error) {
server.log(
['apm', 'error'],
`Could not create APM Service Connections: ${error.message}`
);
throw error;
}
}

function createNewIndex(index: string, callWithInternalUser: CallCluster) {
return callWithInternalUser('indices.create', {
function createNewIndex(index: string, client: Setup['client']) {
return client.indicesCreate({
index,
body: {
settings: { 'index.auto_expand_replicas': '0-1' },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,149 @@
*/

import { Server } from 'hapi';
// @ts-ignore
import { TaskManager, RunContext } from '../legacy/plugins/task_manager';
import { CoreSetup, Logger } from 'src/core/server';
import { Observable } from 'rxjs';
import { PluginSetupContract as TaskManagerPluginContract } from '../../../../task_manager/plugin';
import { RunContext, ConcreteTaskInstance } from '../../../../task_manager';
import { RunFunction } from '../../../../task_manager/task';
import { APMConfig } from '../../../../../../plugins/apm/server';
import { runServiceMapTask } from './run_service_map_task';
import {
SERVICE_MAP_TASK_TYPE,
SERVICE_MAP_TASK_ID
} from '../../../common/service_map_constants';
import { createServiceConnectionsIndex } from './create_service_connections_index';
import { setupRequest, Setup } from '../helpers/setup_request';

function isLessThan1Hour(unixTimestamp = 0) {
const hourMilliseconds = 60 * 60 * 1000;
return Date.now() - unixTimestamp < hourMilliseconds;
}

export async function initializeServiceMaps(server: Server) {
await createServiceConnectionsIndex(server);

const taskManager = server.plugins.task_manager;
if (taskManager) {
taskManager.registerTaskDefinitions({
[SERVICE_MAP_TASK_TYPE]: {
title: 'ApmServiceMapTask',
type: SERVICE_MAP_TASK_TYPE,
description: 'Extract connections in traces for APM service maps',
timeout: '5m',
createTaskRunner({ taskInstance }: RunContext) {
return {
async run() {
const { state } = taskInstance;
const { latestTransactionTime } = await runServiceMapTask(
server,
isLessThan1Hour(state.latestTransactionTime)
? state.latestTransactionTime
: 'now-1h'
);
return { state: { latestTransactionTime } };
let scopedRunFunction:
| ((taskInstance: ConcreteTaskInstance) => ReturnType<RunFunction>)
| undefined;

function isTaskActive() {
return scopedRunFunction !== undefined;
}

async function runTask(setup: Setup, state: Record<string, any> = {}) {
const nextState: typeof state = {
...state,
isActive: true
};
try {
const { latestTransactionTime } = await runServiceMapTask(
setup,
isLessThan1Hour(state.latestTransactionTime)
? state.latestTransactionTime
: 'now-1h'
);
nextState.latestTransactionTime = latestTransactionTime;
} catch (error) {
scopedRunFunction = undefined;
return { state: nextState, error };
}
return { state: nextState };
}

async function scheduleTask(
taskManager: TaskManagerPluginContract,
runFn: NonNullable<typeof scopedRunFunction>,
initialState: Record<string, any> = {}
) {
scopedRunFunction = runFn;
return await taskManager.ensureScheduled({
id: SERVICE_MAP_TASK_ID,
taskType: SERVICE_MAP_TASK_TYPE,
schedule: { interval: '1m' },
scope: ['apm'],
params: {},
state: initialState
});
}

export async function initializeServiceMaps(
core: CoreSetup,
{
config$,
logger,
__LEGACY
}: {
config$: Observable<APMConfig>;
logger: Logger;
__LEGACY: { server: Server };
}
) {
config$.subscribe(config => {
const server = __LEGACY.server;
const router = core.http.createRouter();

if (!config['xpack.apm.serviceMapEnabled']) {
return;
}

const taskManager = server.plugins.task_manager;
if (taskManager) {
taskManager.registerTaskDefinitions({
[SERVICE_MAP_TASK_TYPE]: {
title: 'ApmServiceMapTask',
type: SERVICE_MAP_TASK_TYPE,
description: 'Extract connections in traces for APM service maps',
timeout: '2m',
maxAttempts: 1,
createTaskRunner({ taskInstance }: RunContext) {
return {
run: async () => {
if (!scopedRunFunction) {
return;
}
return await scopedRunFunction(taskInstance);
}
};
}
}
});

router.get(
{
path: '/api/apm/service-map-start-task',
validate: false
},
async (context, request, response) => {
if (isTaskActive()) {
return response.ok({ body: { taskStatus: 'active' } });
}
try {
const setup = await setupRequest(
{
...context,
__LEGACY,
params: { query: { _debug: false } },
config,
logger
},
request
);
await createServiceConnectionsIndex(setup);
const { state: initialState } = await runTask(setup); // initial task run
await scheduleTask(
taskManager,
(taskInstance: ConcreteTaskInstance) =>
runTask(setup, taskInstance.state), // maintain scope in subsequent task runs
initialState
);
return response.ok({ body: { taskStatus: 'initialized' } });
} catch (error) {
logger.error(error);
if (error.statusCode === 403) {
return response.forbidden({ body: error });
}
};
return response.internalError({ body: error });
}
}
}
});

return await taskManager.ensureScheduled({
id: SERVICE_MAP_TASK_ID,
taskType: SERVICE_MAP_TASK_TYPE,
schedule: {
interval: '1m'
},
scope: ['apm'],
params: {},
state: {}
});
}
);
}
});
}
Loading

0 comments on commit f0aec81

Please sign in to comment.