Skip to content

Commit

Permalink
Merge branch 'main' into de-8-13/enable-additional-connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliidm authored May 9, 2024
2 parents 3ecdc99 + f82d640 commit beee520
Show file tree
Hide file tree
Showing 68 changed files with 585 additions and 431 deletions.
3 changes: 2 additions & 1 deletion test/functional/apps/dashboard/group2/dashboard_filtering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) {
const dashboardPanelActions = getService('dashboardPanelActions');
const PageObjects = getPageObjects(['common', 'dashboard', 'header', 'visualize', 'timePicker']);

describe('dashboard filtering', function () {
// Failing: See https://github.com/elastic/kibana/issues/160062
describe.skip('dashboard filtering', function () {
const populateDashboard = async () => {
await PageObjects.dashboard.clickNewDashboard();
await PageObjects.timePicker.setDefaultDataRange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ export const getIntegrationsResponseRt = rt.exact(

export const degradedDocsRt = rt.type({
dataset: rt.string,
percentage: rt.number,
count: rt.number,
totalDocs: rt.number,
percentage: rt.number,
});

export type DegradedDocs = rt.TypeOf<typeof degradedDocsRt>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ import {
} from '../../../common/es_fields';
import { createDatasetQualityESClient, wildcardQuery } from '../../utils';

interface ResultBucket {
dataset: string;
count: number;
}

export async function getDegradedDocsPaginated(options: {
esClient: ElasticsearchClient;
type?: DataStreamType;
start?: number;
end?: number;
start: number;
end: number;
datasetQuery?: string;
after?: {
dataset: string;
namespace: string;
degradedDocs?: { dataset: string; namespace: string };
totalDocs?: { dataset: string; namespace: string };
};
prevResults?: DegradedDocs[];
prevResults?: { degradedDocs: ResultBucket[]; totalDocs: ResultBucket[] };
}): Promise<DegradedDocs[]> {
const {
esClient,
Expand All @@ -37,74 +42,122 @@ export async function getDegradedDocsPaginated(options: {
start,
end,
after,
prevResults = [],
prevResults = { degradedDocs: [], totalDocs: [] },
} = options;

const datasetQualityESClient = createDatasetQualityESClient(esClient);

const response = await datasetQualityESClient.search({
index: '*',
size: 0,
query: {
bool: {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
filter: [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)],
const datasetFilter = {
...(datasetQuery
? {
should: [
...wildcardQuery(DATA_STREAM_DATASET, datasetQuery),
...wildcardQuery(DATA_STREAM_NAMESPACE, datasetQuery),
],
minimum_should_match: 1,
}
: {}),
};

const otherFilters = [...rangeQuery(start, end), ...termQuery(DATA_STREAM_TYPE, type)];

const aggs = (afterKey?: { dataset: string; namespace: string }) => ({
datasets: {
composite: {
...(afterKey ? { after: afterKey } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: 'data_stream.dataset' } } },
{ namespace: { terms: { field: 'data_stream.namespace' } } },
],
},
},
aggs: {
datasets: {
composite: {
...(after ? { after } : {}),
size: 10000,
sources: [
{ dataset: { terms: { field: DATA_STREAM_DATASET } } },
{ namespace: { terms: { field: DATA_STREAM_NAMESPACE } } },
],
});

const response = await datasetQualityESClient.msearch({ index: `${type}-*` }, [
// degraded docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
must: { exists: { field: _IGNORED } },
},
aggs: {
degraded: {
filter: {
exists: {
field: _IGNORED,
},
},
},
},
aggs: aggs(after?.degradedDocs),
},
// total docs per dataset
{
size: 0,
query: {
bool: {
...datasetFilter,
filter: otherFilters,
},
},
aggs: aggs(after?.totalDocs),
},
});
]);

const currDegradedDocs =
response.aggregations?.datasets.buckets.map((bucket) => ({
response.responses[0].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
percentage: (bucket.degraded.doc_count * 100) / bucket.doc_count,
count: bucket.degraded.doc_count,
count: bucket.doc_count,
})) ?? [];

const degradedDocs = [...prevResults, ...currDegradedDocs];
const degradedDocs = [...prevResults.degradedDocs, ...currDegradedDocs];

const currTotalDocs =
response.responses[1].aggregations?.datasets.buckets.map((bucket) => ({
dataset: `${type}-${bucket.key.dataset}-${bucket.key.namespace}`,
count: bucket.doc_count,
})) ?? [];

if (response.aggregations?.datasets.after_key) {
const totalDocs = [...prevResults.totalDocs, ...currTotalDocs];

if (
response.responses[0].aggregations?.datasets.after_key ||
response.responses[1].aggregations?.datasets.after_key
) {
return getDegradedDocsPaginated({
esClient,
type,
start,
end,
datasetQuery,
after: {
dataset: response.aggregations?.datasets.after_key.dataset as string,
namespace: response.aggregations?.datasets.after_key.namespace as string,
degradedDocs:
(response.responses[0].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.degradedDocs,
totalDocs:
(response.responses[1].aggregations?.datasets.after_key as {
dataset: string;
namespace: string;
}) || after?.totalDocs,
},
prevResults: degradedDocs,
prevResults: { degradedDocs, totalDocs },
});
}

return degradedDocs;
const degradedDocsMap = degradedDocs.reduce(
(acc, curr) => ({
...acc,
[curr.dataset]: curr.count,
}),
{}
);

return totalDocs.map((curr) => {
const degradedDocsCount = degradedDocsMap[curr.dataset as keyof typeof degradedDocsMap] || 0;

return {
...curr,
totalDocs: curr.count,
count: degradedDocsCount,
percentage: (degradedDocsCount / curr.count) * 100,
};
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const degradedDocsRoute = createDatasetQualityServerRoute({
endpoint: 'GET /internal/dataset_quality/data_streams/degraded_docs',
params: t.type({
query: t.intersection([
t.partial(rangeRt.props),
rangeRt,
typeRt,
t.partial({
datasetQuery: t.string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { ESSearchRequest, InferSearchResponseOf } from '@kbn/es-types';
import { ElasticsearchClient } from '@kbn/core/server';
import { Indices } from '@elastic/elasticsearch/lib/api/types';

type DatasetQualityESSearchParams = ESSearchRequest & {
size: number;
Expand All @@ -21,5 +22,15 @@ export function createDatasetQualityESClient(esClient: ElasticsearchClient) {
): Promise<InferSearchResponseOf<TDocument, TParams>> {
return esClient.search<TDocument>(searchParams) as Promise<any>;
},
async msearch<TDocument, TParams extends DatasetQualityESSearchParams>(
index = {} as { index?: Indices },
searches: TParams[]
): Promise<{
responses: Array<InferSearchResponseOf<TDocument, TParams>>;
}> {
return esClient.msearch({
searches: searches.map((search) => [index, search]).flat(),
}) as Promise<any>;
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ describe('correctCommonEsqlMistakes', () => {

it('replaces aliasing via the AS keyword with the = operator', () => {
expectQuery(`FROM logs-* | STATS COUNT() AS count`, 'FROM logs-*\n| STATS count = COUNT()');

expectQuery(`FROM logs-* | STATS COUNT() as count`, 'FROM logs-*\n| STATS count = COUNT()');

expectQuery(
`FROM logs-* | STATS AVG(transaction.duration.histogram) AS avg_request_latency, PERCENTILE(transaction.duration.histogram, 95) AS p95`,
`FROM logs-*
Expand All @@ -42,11 +45,33 @@ describe('correctCommonEsqlMistakes', () => {
});

it(`replaces " or ' escaping in FROM statements with backticks`, () => {
expectQuery(`FROM "logs-*" | LIMIT 10`, 'FROM `logs-*`\n| LIMIT 10');
expectQuery(`FROM 'logs-*' | LIMIT 10`, 'FROM `logs-*`\n| LIMIT 10');
expectQuery(`FROM "logs-*" | LIMIT 10`, 'FROM logs-*\n| LIMIT 10');
expectQuery(`FROM 'logs-*' | LIMIT 10`, 'FROM logs-*\n| LIMIT 10');
expectQuery(`FROM logs-* | LIMIT 10`, 'FROM logs-*\n| LIMIT 10');
});

it('replaces = as equal operator with ==', () => {
expectQuery(
`FROM logs-*\n| WHERE service.name = "foo"`,
`FROM logs-*\n| WHERE service.name == "foo"`
);

expectQuery(
`FROM logs-*\n| WHERE service.name = "foo" AND service.environment = "bar"`,
`FROM logs-*\n| WHERE service.name == "foo" AND service.environment == "bar"`
);

expectQuery(
`FROM logs-*\n| WHERE (service.name = "foo" AND service.environment = "bar") OR agent.name = "baz"`,
`FROM logs-*\n| WHERE (service.name == "foo" AND service.environment == "bar") OR agent.name == "baz"`
);

expectQuery(
`FROM logs-*\n| WHERE \`what=ever\` = "foo=bar"`,
`FROM logs-*\n| WHERE \`what=ever\` == "foo=bar"`
);
});

it('replaces single-quote escaped strings with double-quote escaped strings', () => {
expectQuery(
`FROM nyc_taxis
Expand Down Expand Up @@ -102,7 +127,7 @@ describe('correctCommonEsqlMistakes', () => {
| EVAL "@timestamp" = TO_DATETIME(timestamp)
| WHERE statement LIKE 'SELECT%'
| STATS avg_duration = AVG(duration)`,
`FROM \`postgres-logs*\`
`FROM postgres-logs*
| GROK message "%{TIMESTAMP_ISO8601:timestamp} %{TZ} \[%{NUMBER:process_id}\]: \[%{NUMBER:log_line}\] user=%{USER:user},db=%{USER:database},app=\[%{DATA:application}\],client=%{IP:client_ip} LOG: duration: %{NUMBER:duration:float} ms statement: %{GREEDYDATA:statement}"
| EVAL @timestamp = TO_DATETIME(timestamp)
| WHERE statement LIKE "SELECT%"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function split(value: string, splitToken: string) {
return statements;
}

function splitIntoCommands(query: string) {
export function splitIntoCommands(query: string) {
const commands: string[] = split(query, '|');

return commands.map((command) => {
Expand Down Expand Up @@ -93,8 +93,8 @@ function removeColumnQuotesAndEscape(column: string) {
function replaceAsKeywordWithAssignments(command: string) {
return command.replaceAll(/^STATS\s*(.*)/g, (__, statsOperations: string) => {
return `STATS ${statsOperations.replaceAll(
/(,\s*)?(.*?)\sAS\s([`a-zA-Z0-9.\-_]+)/g,
'$1$3 = $2'
/(,\s*)?(.*?)\s(AS|as)\s([`a-zA-Z0-9.\-_]+)/g,
'$1$4 = $2'
)}`;
});
}
Expand Down Expand Up @@ -196,6 +196,30 @@ function escapeExpressionsInSort(sortCommand: string) {
return `SORT ${columnsInSort.join(', ')}`;
}

function ensureEqualityOperators(whereCommand: string) {
const body = whereCommand.split(/^WHERE /)[1];

const byChar = body.split('');

let next = '';
let isColumnName = false;
byChar.forEach((char, index) => {
next += char;

if (!isColumnName && char === '=' && byChar[index - 1] === ' ' && byChar[index + 1] === ' ') {
next += '=';
}

if (!isColumnName && (char === '`' || char.match(/[a-z@]/i))) {
isColumnName = true;
} else if (isColumnName && (char === '`' || !char.match(/[a-z@0-9]/i))) {
isColumnName = false;
}
});

return `WHERE ${next}`;
}

export function correctCommonEsqlMistakes(content: string, log: Logger) {
return content.replaceAll(/```esql\n(.*?)\n```/gms, (_, query: string) => {
const commands = splitIntoCommands(query.trim());
Expand All @@ -206,12 +230,14 @@ export function correctCommonEsqlMistakes(content: string, log: Logger) {
switch (name) {
case 'FROM':
formattedCommand = formattedCommand
.replaceAll(/FROM "(.*)"/g, 'FROM `$1`')
.replaceAll(/FROM '(.*)'/g, 'FROM `$1`');
.replaceAll(/FROM "(.*)"/g, 'FROM $1')
.replaceAll(/FROM '(.*)'/g, 'FROM $1')
.replaceAll(/FROM `(.*)`/g, 'FROM $1');
break;

case 'WHERE':
formattedCommand = replaceSingleQuotesWithDoubleQuotes(formattedCommand);
formattedCommand = ensureEqualityOperators(formattedCommand);
break;

case 'EVAL':
Expand Down
Loading

0 comments on commit beee520

Please sign in to comment.