Skip to content

Commit

Permalink
Merge pull request #502 from metrico/fix/494_axios_error
Browse files Browse the repository at this point in the history
fix: OOM in traceql requests
  • Loading branch information
akvlad authored May 24, 2024
2 parents 40c4104 + aa2babc commit f96a7e6
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 33 deletions.
28 changes: 13 additions & 15 deletions traceql/clickhouse_transpiler/attr_condition.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { getCompareFn, durationToNs, unquote } = require('./shared')
const { getCompareFn, durationToNs, unquote, bitSet } = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder {
constructor () {
Expand Down Expand Up @@ -71,6 +71,18 @@ module.exports = class Builder {
const having = self.getCond(self.conds)
self.aggregator(sel)
sel.conditions = Sql.And(sel.conditions, Sql.Or(...self.where))
if (Array.isArray(ctx.randomFilter) && Array.isArray(ctx.cachedTraceIds)) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Or(
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])),
new Sql.In('trace_id', 'in', ctx.cachedTraceIds.map(traceId => new Sql.Raw(`unhex('${traceId}')`)))
))
} else if (Array.isArray(ctx.randomFilter)) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])))
}
sel.having(having)
return sel
}
Expand Down Expand Up @@ -248,20 +260,6 @@ function groupBitOr (left, alias) {
return res
}

/**
*
* @param terms
* @returns {SQLObject}
*/
function bitSet (terms) {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}

/**
*
* @param attr {string}
Expand Down
19 changes: 19 additions & 0 deletions traceql/clickhouse_transpiler/attr_condition_eval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const attrCondition = require('./attr_condition')
const {bitSet} = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder extends attrCondition {
build () {
const self = this
const superBuild = super.build()
/** @type {BuiltProcessFn} */
const res = (ctx) => {
const sel = superBuild(ctx)
sel.having_conditions = []
sel.aggregations = [bitSet(self.sqlConditions)]
sel.select_list = [[new Sql.Raw('count()'), 'count']]
sel.order_expressions = []
return sel
}
return res
}
}
4 changes: 1 addition & 3 deletions traceql/clickhouse_transpiler/group_by.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ module.exports = standardBuilder((sel, ctx) => {
.with(withMain)
.select(
['trace_id', 'trace_id'],
[new Sql.Raw('groupArray(span_id)'), 'span_id'],
[new Sql.Raw('groupArray(duration)'), 'duration'],
[new Sql.Raw('groupArray(timestamp_ns)'), 'timestamp_ns']
[new Sql.Raw('groupArray(100)(span_id)'), 'span_id']
).from(new Sql.WithReference(withMain))
.groupBy('trace_id')
.orderBy([new Sql.Raw('max(index_search.timestamp_ns)'), 'desc'])
Expand Down
23 changes: 22 additions & 1 deletion traceql/clickhouse_transpiler/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const AttrConditionPlanner = require('./attr_condition')
const AttrConditionEvalPlanner = require('./attr_condition_eval')
const InitIndexPlanner = require('./init')
const IndexGroupByPlanner = require('./group_by')
const AggregatorPlanner = require('./aggregator')
Expand All @@ -8,10 +9,17 @@ const TracesDataPlanner = require('./traces_data')
/**
* @param script {Token}
*/
module.exports = (script) => {
module.exports.transpile = (script) => {
return new Planner(script).plan()
}

/**
* @param script {Token}
*/
module.exports.evaluateCmpl = (script) => {
return new Planner(script).planEval()
}

class Planner {
/**
*
Expand Down Expand Up @@ -53,6 +61,19 @@ class Planner {
return res
}

planEval () {
this.check()
this.analyze()
const res = (new AttrConditionEvalPlanner())
.withTerms(this.termIdx)
.withConditions(this.cond)
.withAggregatedAttr(this.aggregatedAttr)
.withMain((new InitIndexPlanner()).build())
.build()

return res
}

check () {
if (this.script.Children('SYNTAX').length > 1) {
throw new Error('more than one selector is not supported')
Expand Down
6 changes: 4 additions & 2 deletions traceql/clickhouse_transpiler/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const { standardBuilder } = require('./shared')
* limit: number,
* isCluster: boolean,
* tracesTable: string,
* tracesDistTable: string
* tracesDistTable: string,
* randomFilter: number[]|undefined,
* cachedTraceIds: string[]|undefined,
* }} Context
*/
/**
Expand All @@ -21,7 +23,7 @@ const { standardBuilder } = require('./shared')
*/
module.exports = standardBuilder((sel, ctx) => {
return (new Sql.Select()).select(['trace_id', 'trace_id'],
[new Sql.Raw('lower(hex(span_id))'), 'span_id'],
[new Sql.Raw('span_id'), 'span_id'],
[new Sql.Raw('any(duration)'), 'duration'],
[new Sql.Raw('any(timestamp_ns)'), 'timestamp_ns'])
.from([ctx.tracesAttrsTable, 'traces_idx'])
Expand Down
14 changes: 14 additions & 0 deletions traceql/clickhouse_transpiler/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ module.exports.standardBuilder = (fn) => {
}
}
}

/**
*
* @param terms {SQLObject[]}
* @returns {SQLObject}
*/
module.exports.bitSet = (terms) => {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}
18 changes: 9 additions & 9 deletions traceql/clickhouse_transpiler/traces_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ const processFn = (sel, ctx) => {
const withTraceIds = new Sql.With('trace_ids', (new Sql.Select())
.select('trace_id')
.from(new Sql.WithReference(withMain)))
const withTraceIdsSpanIds = new Sql.With('trace_span_ids', (new Sql.Select())
.select('trace_id', 'span_id')
.from(new Sql.WithReference(withMain))
.join('span_id', 'array'))
return (new Sql.Select())
.with(withMain, withTraceIds)
.with(withMain, withTraceIds, withTraceIdsSpanIds)
.select(
[new Sql.Raw('lower(hex(traces.trace_id))'), 'trace_id'],
[new Sql.Raw('any(index_grouped.span_id)'), 'span_id'],
[new Sql.Raw('any(index_grouped.duration)'), 'duration'],
[new Sql.Raw('any(index_grouped.timestamp_ns)'), 'timestamp_ns'],
[new Sql.Raw(`arrayMap(x -> lower(hex(x)), groupArrayIf(traces.span_id, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)}))`), 'span_id'],
[new Sql.Raw(`groupArrayIf(traces.duration_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'duration'],
[new Sql.Raw(`groupArrayIf(traces.timestamp_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'timestamp_ns'],
[new Sql.Raw('min(traces.timestamp_ns)'), 'start_time_unix_nano'],
[new Sql.Raw(
'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000'
), 'duration_ms'],
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name']
).from([table, 'traces']).join(
new Sql.WithReference(withMain),
'left any',
Sql.Eq(new Sql.Raw('traces.trace_id'), new Sql.Raw('index_grouped.trace_id'))
).where(Sql.And(
).from([table, 'traces']).where(Sql.And(
new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds))
)).groupBy('traces.trace_id')
.orderBy(['start_time_unix_nano', 'desc'])
Expand Down
90 changes: 87 additions & 3 deletions traceql/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const parser = require('./parser')
const transpiler = require('./clickhouse_transpiler')
const { transpile, evaluateCmpl } = require('./clickhouse_transpiler')
const logger = require('../lib/logger')
const { DATABASE_NAME } = require('../lib/utils')
const { clusterName } = require('../common')
Expand All @@ -23,10 +23,94 @@ const search = async (query, limit, from, to) => {
tracesAttrsTable: `${_dbname}.tempo_traces_attrs_gin`,
from: from,
to: to,
limit: limit
limit: limit,
randomFilter: null
}
const scrpit = parser.ParseScript(query)
const planner = transpiler(scrpit.rootToken)
const complexity = await evaluateComplexity(ctx, scrpit.rootToken)
let res = []
if (complexity > 10000000) {
res = await processComplexResult(ctx, scrpit.rootToken, complexity)
} else {
res = await processSmallResult(ctx, scrpit.rootToken)
}
res.forEach(t =>
t.spanSets.forEach(
ss => ss.spans.sort(
(a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano))
)
)
return res
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
const evaluateComplexity = async (ctx, script) => {
const evaluator = evaluateCmpl(script)
const sql = evaluator(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0)
}

/**
*
* @param ctx {Context}
* @param script {Token}
* @param complexity {number}
*/
async function processComplexResult (ctx, script, complexity) {
const planner = transpile(script)
const maxFilter = Math.floor(complexity / 10000000)
let traces = []
for (let i = 0; i < maxFilter; i++) {
ctx.randomFilter = [maxFilter, i]
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
if (response.data.data.length === parseInt(ctx.limit)) {
const minStart = response.data.data.reduce((acc, row) =>
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
)
ctx.from = new Date(Math.floor(minStart / 1000000))
ctx.randomFilter = null
complexity = await evaluateComplexity(ctx, script)
if (complexity <= 10000000) {
return await processSmallResult(ctx, script)
}
ctx.randomFilter = [maxFilter, i]
}
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
traces = response.data.data.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
startTimeUnixNano: row.start_time_unix_nano,
durationMs: row.duration_ms,
spanSets: [
{
spans: row.span_id.map((spanId, i) => ({
spanID: spanId,
startTimeUnixNano: row.timestamp_ns[i],
durationNanos: row.duration[i],
attributes: []
})),
matched: row.span_id.length
}
]
}))
}
return traces
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
async function processSmallResult (ctx, script) {
const planner = transpile(script)
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
const traces = response.data.data.map(row => ({
Expand Down

0 comments on commit f96a7e6

Please sign in to comment.