Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update client version to resolve query bottleneck #14157

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## v2.0.0-alpha.13 [unreleased]

### Features

### Bug Fixes

### UI Improvements

1. [14157](https://github.com/influxdata/influxdb/pull/14157): Remove rendering bottleneck when streaming Flux responses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


## v2.0.0-alpha.13 [2019-06-13]

### Features
Expand Down
5 changes: 3 additions & 2 deletions ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
},
"dependencies": {
"@influxdata/clockface": "0.0.13",
"@influxdata/influx": "github:influxdata/influxdb2-js#dev",
"@influxdata/influx": "0.3.5",
"@influxdata/influxdb-templates": "influxdata/influxdb-templates",
"@influxdata/react-custom-scrollbars": "4.3.8",
"@influxdata/giraffe": "0.12.1",
Expand Down
7 changes: 4 additions & 3 deletions ui/src/dataLoaders/components/verifyStep/DataListening.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ class DataListening extends PureComponent<OwnProps & WithRouterProps, State> {
const script = `from(bucket: "${bucket}")
|> range(start: -1m)`

let rowCount: number
let responseLength: number
let timePassed: number

try {
const response = await runQuery(orgID, script).promise
rowCount = response.rowCount
responseLength = response.length
timePassed = Number(new Date()) - this.startTime
} catch (err) {
this.setState({loading: LoadingState.Error})
return
}

if (rowCount > 1) {
if (responseLength > 1) {
this.setState({loading: LoadingState.Done})
return
}
Expand All @@ -139,6 +139,7 @@ class DataListening extends PureComponent<OwnProps & WithRouterProps, State> {
this.setState({loading: LoadingState.NotFound})
return
}

this.intervalID = setTimeout(this.checkForData, FETCH_WAIT)
}

Expand Down
66 changes: 17 additions & 49 deletions ui/src/shared/apis/query.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,36 @@
import Deferred from 'src/utils/Deferred'
import {getWindowVars} from 'src/variables/utils/getWindowVars'
import {buildVarsOption} from 'src/variables/utils/buildVarsOption'
import {client} from 'src/utils/api'

import {File} from '@influxdata/influx'
import {
File,
CancellationError as ClientCancellationError,
} from '@influxdata/influx'

// Types
import {WrappedCancelablePromise, CancellationError} from 'src/types/promises'
import {VariableAssignment} from 'src/types/ast'

const MAX_ROWS = 50000

export interface ExecuteFluxQueryResult {
csv: string
didTruncate: boolean
rowCount: number
}
const MAX_RESPONSE_CHARS = 50000 * 160

export const runQuery = (
orgID: string,
query: string,
extern?: File
): WrappedCancelablePromise<ExecuteFluxQueryResult> => {
const deferred = new Deferred()

const conn = client.queries.execute(orgID, query, extern)

let didTruncate = false
let rowCount = 0
let csv = ''

conn.stream.on('data', d => {
rowCount++
csv += d

if (rowCount < MAX_ROWS) {
return
}

didTruncate = true
conn.cancel()
): WrappedCancelablePromise<string> => {
const {promise, cancel} = client.queries.execute(orgID, query, {
extern,
limitChars: MAX_RESPONSE_CHARS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the clean up here.

})

conn.stream.on('end', () => {
const result: ExecuteFluxQueryResult = {
csv,
didTruncate,
rowCount,
}

deferred.resolve(result)
})
// Convert the client `CancellationError` to a UI `CancellationError`
const wrappedPromise = promise.catch(error =>
error instanceof ClientCancellationError
? Promise.reject(CancellationError)
: Promise.reject(error)
)

conn.stream.on('error', err => {
deferred.reject(err)
})

return {
promise: deferred.promise,
cancel: () => {
conn.cancel()
deferred.reject(new CancellationError())
},
}
return {promise: wrappedPromise, cancel}
}

/*
Expand All @@ -86,7 +54,7 @@ export const executeQueryWithVars = (
orgID: string,
query: string,
variables?: VariableAssignment[]
): WrappedCancelablePromise<ExecuteFluxQueryResult> => {
): WrappedCancelablePromise<string> => {
let isCancelled = false
let cancelExecution

Expand Down
13 changes: 3 additions & 10 deletions ui/src/shared/components/TimeSeries.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ import {withRouter, WithRouterProps} from 'react-router'
import {fromFlux, FromFluxResult} from '@influxdata/giraffe'

// API
import {
executeQueryWithVars,
ExecuteFluxQueryResult,
} from 'src/shared/apis/query'
import {executeQueryWithVars} from 'src/shared/apis/query'

// Utils
import {checkQueryResult} from 'src/shared/utils/checkQueryResult'
Expand Down Expand Up @@ -81,9 +78,7 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {

public state: State = defaultState()

private pendingResults: Array<
WrappedCancelablePromise<ExecuteFluxQueryResult>
> = []
private pendingResults: Array<WrappedCancelablePromise<string>> = []

public async componentDidMount() {
this.reload()
Expand Down Expand Up @@ -144,10 +139,8 @@ class TimeSeries extends Component<Props & WithRouterProps, State> {
)

// Wait for new queries to complete
const results = await Promise.all(this.pendingResults.map(r => r.promise))

const files = await Promise.all(this.pendingResults.map(r => r.promise))
const duration = Date.now() - startTime
const files = results.map(r => r.csv)
const giraffeResult = fromFlux(files.join('\n\n'))

files.forEach(checkQueryResult)
Expand Down
10 changes: 3 additions & 7 deletions ui/src/timeMachine/actions/queries.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import {get} from 'lodash'

// API
import {
executeQueryWithVars,
ExecuteFluxQueryResult,
} from 'src/shared/apis/query'
import {executeQueryWithVars} from 'src/shared/apis/query'

// Actions
import {refreshVariableValues, selectValue} from 'src/variables/actions'
Expand Down Expand Up @@ -85,7 +82,7 @@ export const refreshTimeMachineVariableValues = () => async (
await dispatch(refreshVariableValues(contextID, variablesToRefresh))
}

let pendingResults: Array<WrappedCancelablePromise<ExecuteFluxQueryResult>> = []
let pendingResults: Array<WrappedCancelablePromise<string>> = []

export const executeQueries = () => async (dispatch, getState: GetState) => {
const {view, timeRange} = getActiveTimeMachine(getState())
Expand Down Expand Up @@ -115,10 +112,9 @@ export const executeQueries = () => async (dispatch, getState: GetState) => {
executeQueryWithVars(orgID, text, variableAssignments)
)

const results = await Promise.all(pendingResults.map(r => r.promise))
const files = await Promise.all(pendingResults.map(r => r.promise))

const duration = Date.now() - startTime
const files = results.map(r => r.csv)

files.forEach(checkQueryResult)

Expand Down
9 changes: 3 additions & 6 deletions ui/src/timeMachine/apis/queryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {get} from 'lodash'

// APIs
import {runQuery, ExecuteFluxQueryResult} from 'src/shared/apis/query'
import {runQuery} from 'src/shared/apis/query'
import {parseResponse} from 'src/shared/parsing/flux/response'

// Utils
Expand Down Expand Up @@ -122,11 +122,8 @@ export function findValues({
}
}

export function extractCol(
resp: ExecuteFluxQueryResult,
colName: string
): string[] {
const tables = parseResponse(resp.csv)
export function extractCol(resp: string, colName: string): string[] {
const tables = parseResponse(resp)
const data = get(tables, '0.data', [])

if (!data.length) {
Expand Down
2 changes: 1 addition & 1 deletion ui/src/variables/utils/ValueFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class DefaultValueFetcher implements ValueFetcher {

const request = executeQueryWithVars(orgID, query, variables)

const promise = request.promise.then(({csv}) => {
const promise = request.promise.then(csv => {
const values = extractValues(csv, prevSelection, defaultSelection)

this.cache[key] = values
Expand Down