Skip to content

Commit

Permalink
[Monitoring] Revert direct shipping code (#72505)
Browse files Browse the repository at this point in the history
* Backout these changes

* Fix test
  • Loading branch information
chrisronline authored Jul 22, 2020
1 parent 82dd173 commit 670520a
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 202 deletions.
35 changes: 0 additions & 35 deletions x-pack/plugins/monitoring/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,6 @@ describe('config schema', () => {
},
"enabled": true,
},
"elasticsearch": Object {
"apiVersion": "master",
"customHeaders": Object {},
"healthCheck": Object {
"delay": "PT2.5S",
},
"ignoreVersionMismatch": false,
"logFetchCount": 10,
"logQueries": false,
"pingTimeout": "PT30S",
"preserveHost": true,
"requestHeadersWhitelist": Array [
"authorization",
],
"requestTimeout": "PT30S",
"shardTimeout": "PT30S",
"sniffInterval": false,
"sniffOnConnectionFault": false,
"sniffOnStart": false,
"ssl": Object {
"alwaysPresentCertificate": false,
"keystore": Object {},
"truststore": Object {},
"verificationMode": "full",
},
"startupTimeout": "PT5S",
},
"enabled": true,
"kibana": Object {
"collection": Object {
Expand Down Expand Up @@ -125,17 +98,13 @@ describe('createConfig()', () => {
it('should wrap in Elasticsearch config', async () => {
const config = createConfig(
configSchema.validate({
elasticsearch: {
hosts: 'http://localhost:9200',
},
ui: {
elasticsearch: {
hosts: 'http://localhost:9200',
},
},
})
);
expect(config.elasticsearch.hosts).toEqual(['http://localhost:9200']);
expect(config.ui.elasticsearch.hosts).toEqual(['http://localhost:9200']);
});

Expand All @@ -147,9 +116,6 @@ describe('createConfig()', () => {
};
const config = createConfig(
configSchema.validate({
elasticsearch: {
ssl,
},
ui: {
elasticsearch: {
ssl,
Expand All @@ -162,7 +128,6 @@ describe('createConfig()', () => {
key: 'contents-of-packages/kbn-dev-utils/certs/elasticsearch.key',
certificateAuthorities: ['contents-of-packages/kbn-dev-utils/certs/ca.crt'],
});
expect(config.elasticsearch.ssl).toEqual(expected);
expect(config.ui.elasticsearch.ssl).toEqual(expected);
});
});
2 changes: 0 additions & 2 deletions x-pack/plugins/monitoring/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export const monitoringElasticsearchConfigSchema = elasticsearchConfigSchema.ext

export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
elasticsearch: monitoringElasticsearchConfigSchema,
ui: schema.object({
enabled: schema.boolean({ defaultValue: true }),
ccs: schema.object({
Expand Down Expand Up @@ -86,7 +85,6 @@ export type MonitoringConfig = ReturnType<typeof createConfig>;
export function createConfig(config: TypeOf<typeof configSchema>) {
return {
...config,
elasticsearch: new ElasticsearchConfig(config.elasticsearch as ElasticsearchConfigType),
ui: {
...config.ui,
elasticsearch: new MonitoringElasticsearchConfig(config.ui.elasticsearch),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

import { noop } from 'lodash';
import sinon from 'sinon';
import moment from 'moment';
import expect from '@kbn/expect';
import { BulkUploader } from '../bulk_uploader';
import { MONITORING_SYSTEM_API_VERSION } from '../../../common/constants';

const FETCH_INTERVAL = 300;
const CHECK_DELAY = 500;
Expand Down Expand Up @@ -314,92 +312,5 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('uses a direct connection to the monitoring cluster, when configured', (done) => {
const dateInIndex = '2020.02.10';
const oldNow = moment.now;
moment.now = () => 1581310800000;
const prodClusterUuid = '1sdfd5';
const prodCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('monitoring.bulk')
.callsFake((arg) => {
let resolution = null;
if (arg === 'info') {
resolution = { cluster_uuid: prodClusterUuid };
}
return new Promise((resolve) => resolve(resolution));
}),
};
const monitoringCluster = {
callWithInternalUser: sinon
.stub()
.withArgs('bulk')
.callsFake(() => {
return new Promise((resolve) => setTimeout(resolve, CHECK_DELAY + 1));
}),
};

const collectorFetch = sinon.stub().returns({
type: 'kibana_stats',
result: { type: 'kibana_stats', payload: { testData: 12345 } },
});

const collectors = new MockCollectorSet(server, [
{
fetch: collectorFetch,
isReady: () => true,
formatForBulkUpload: (result) => result,
isUsageCollector: false,
},
]);
const customServer = {
...server,
elasticsearchPlugin: {
createCluster: () => monitoringCluster,
getCluster: (name) => {
if (name === 'admin' || name === 'data') {
return prodCluster;
}
return monitoringCluster;
},
},
config: {
get: (key) => {
if (key === 'monitoring.elasticsearch') {
return {
hosts: ['http://localhost:9200'],
username: 'tester',
password: 'testing',
ssl: {},
};
}
return null;
},
},
};
const kbnServerStatus = { toJSON: () => ({ overall: { state: 'green' } }) };
const kbnServerVersion = 'master';
const uploader = new BulkUploader({
...customServer,
interval: FETCH_INTERVAL,
kbnServerStatus,
kbnServerVersion,
});
uploader.start(collectors);
setTimeout(() => {
uploader.stop();
const firstCallArgs = monitoringCluster.callWithInternalUser.firstCall.args;
expect(firstCallArgs[0]).to.be('bulk');
expect(firstCallArgs[1].body[0].index._index).to.be(
`.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateInIndex}`
);
expect(firstCallArgs[1].body[1].type).to.be('kibana_stats');
expect(firstCallArgs[1].body[1].cluster_uuid).to.be(prodClusterUuid);
moment.now = oldNow;
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { defaultsDeep, uniq, compact, get } from 'lodash';
import { defaultsDeep, uniq, compact } from 'lodash';

import {
TELEMETRY_COLLECTION_INTERVAL,
KIBANA_STATS_TYPE_MONITORING,
} from '../../common/constants';

import { sendBulkPayload, monitoringBulk } from './lib';
import { hasMonitoringCluster } from '../es_client/instantiate_client';

/*
* Handles internal Kibana stats collection and uploading data to Monitoring
Expand All @@ -31,13 +30,11 @@ import { hasMonitoringCluster } from '../es_client/instantiate_client';
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
constructor({ config, log, interval, elasticsearch, kibanaStats }) {
constructor({ log, interval, elasticsearch, kibanaStats }) {
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}

this._hasDirectConnectionToMonitoringCluster = false;
this._productionClusterUuid = null;
this._timer = null;
// Hold sending and fetching usage until monitoring.bulk is successful. This means that we
// send usage data on the second tick. But would save a lot of bandwidth fetching usage on
Expand All @@ -54,15 +51,6 @@ export class BulkUploader {
plugins: [monitoringBulk],
});

if (hasMonitoringCluster(config.elasticsearch)) {
this._log.info(`Detected direct connection to monitoring cluster`);
this._hasDirectConnectionToMonitoringCluster = true;
this._cluster = elasticsearch.legacy.createClient('monitoring-direct', config.elasticsearch);
elasticsearch.legacy.client.callAsInternalUser('info').then((data) => {
this._productionClusterUuid = get(data, 'cluster_uuid');
});
}

this.kibanaStats = kibanaStats;
this.kibanaStatusGetter = null;
}
Expand Down Expand Up @@ -181,14 +169,7 @@ export class BulkUploader {
}

async _onPayload(payload) {
return await sendBulkPayload(
this._cluster,
this._interval,
payload,
this._log,
this._hasDirectConnectionToMonitoringCluster,
this._productionClusterUuid
);
return await sendBulkPayload(this._cluster, this._interval, payload, this._log);
}

getKibanaStats(type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { chunk, get } from 'lodash';
import {
MONITORING_SYSTEM_API_VERSION,
KIBANA_SYSTEM_ID,
KIBANA_STATS_TYPE_MONITORING,
KIBANA_SETTINGS_TYPE,
} from '../../../common/constants';

const SUPPORTED_TYPES = [KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE];
export function formatForNormalBulkEndpoint(payload, productionClusterUuid) {
const dateSuffix = moment.utc().format('YYYY.MM.DD');
return chunk(payload, 2).reduce((accum, chunk) => {
const type = get(chunk[0], 'index._type');
if (!type || !SUPPORTED_TYPES.includes(type)) {
return accum;
}

const { timestamp } = chunk[1];

accum.push({
index: {
_index: `.monitoring-kibana-${MONITORING_SYSTEM_API_VERSION}-${dateSuffix}`,
},
});
accum.push({
[type]: chunk[1],
type,
timestamp,
cluster_uuid: productionClusterUuid,
});
return accum;
}, []);
}
import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';

/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export async function sendBulkPayload(
cluster,
interval,
payload,
log,
hasDirectConnectionToMonitoringCluster = false,
productionClusterUuid = null
) {
if (hasDirectConnectionToMonitoringCluster) {
if (productionClusterUuid === null) {
log.warn(
`Unable to determine production cluster uuid to use for shipping monitoring data. Kibana monitoring data will appear in a standalone cluster in the Stack Monitoring UI.`
);
}
const formattedPayload = formatForNormalBulkEndpoint(payload, productionClusterUuid);
return await cluster.callAsInternalUser('bulk', {
body: formattedPayload,
});
}

export async function sendBulkPayload(cluster, interval, payload) {
return cluster.callAsInternalUser('monitoring.bulk', {
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
Expand Down

0 comments on commit 670520a

Please sign in to comment.