Skip to content

Commit

Permalink
enable batching
Browse files Browse the repository at this point in the history
tests failing, on the bright side, it means that our test coverage is meaningful :)

at least some tests are failing because of a bug in current defer/stream implementation, see graphql/graphql-js#2975
  • Loading branch information
yaacovCR committed Apr 21, 2021
1 parent 4a0d10d commit cb4473c
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 110 deletions.
6 changes: 6 additions & 0 deletions packages/batch-execute/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
"is-promise": "4.0.0",
"tslib": "~2.2.0"
},
"devDependencies": {
"@graphql-tools/delegate": "^7.1.2",
"@graphql-tools/mock": "^8.1.1",
"@graphql-tools/schema": "^7.1.3",
"@graphql-tools/utils": "^7.7.3"
},
"publishConfig": {
"access": "public",
"directory": "dist"
Expand Down
10 changes: 5 additions & 5 deletions packages/batch-execute/src/createBatchingExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getOperationAST /*, GraphQLSchema */ } from 'graphql';
import { getOperationAST, GraphQLSchema } from 'graphql';

import DataLoader from 'dataloader';

Expand All @@ -9,12 +9,12 @@ import { splitResult } from './splitResult';

export function createBatchingExecutor(
executor: Executor,
// targetSchema: GraphQLSchema,
targetSchema: GraphQLSchema,
dataLoaderOptions?: DataLoader.Options<any, any, any>,
extensionsReducer?: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
): Executor {
const loader = new DataLoader(
createLoadFn(executor, /* targetSchema, */ extensionsReducer ?? defaultExtensionsReducer),
createLoadFn(executor, targetSchema, extensionsReducer ?? defaultExtensionsReducer),
dataLoaderOptions
);
return (executionParams: ExecutionParams) => loader.load(executionParams);
Expand All @@ -30,7 +30,7 @@ function createLoadFn(
| ExecutionResult
| AsyncIterableIterator<AsyncExecutionResult>
| Promise<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>>,
// targetSchema: GraphQLSchema,
targetSchema: GraphQLSchema,
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
) {
return async (
Expand Down Expand Up @@ -64,7 +64,7 @@ function createLoadFn(
| Promise<ExecutionResult | AsyncIterableIterator<ExecutionResult>>
> = [];
batchedExecutionParamSets.forEach(batchedExecutionParamSet => {
const mergedExecutionParams = mergeExecutionParams(batchedExecutionParamSet, /* targetSchema, */ extensionsReducer);
const mergedExecutionParams = mergeExecutionParams(batchedExecutionParamSet, targetSchema, extensionsReducer);
const executionResult = executor(mergedExecutionParams);
results = results.concat(splitResult(executionResult, batchedExecutionParamSet.length));
});
Expand Down
10 changes: 5 additions & 5 deletions packages/batch-execute/src/getBatchingExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import DataLoader from 'dataloader';

import { ExecutionParams, Executor } from '@graphql-tools/utils';
import { createBatchingExecutor } from './createBatchingExecutor';
import { memoize2of4 } from './memoize';
// import { GraphQLSchema } from 'graphql';
import { memoize2of5 } from './memoize';
import { GraphQLSchema } from 'graphql';

export const getBatchingExecutor = memoize2of4(function (
export const getBatchingExecutor = memoize2of5(function (
_context: Record<string, any> = self ?? window ?? global,
executor: Executor,
// targetSchema: GraphQLSchema,
targetSchema: GraphQLSchema,
dataLoaderOptions?: DataLoader.Options<any, any, any>,
extensionsReducer?: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
): Executor {
return createBatchingExecutor(executor, /* targetSchema, */ dataLoaderOptions, extensionsReducer);
return createBatchingExecutor(executor, targetSchema, dataLoaderOptions, extensionsReducer);
});
41 changes: 0 additions & 41 deletions packages/batch-execute/src/memoize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,3 @@ export function memoize2of5<

return memoized;
}

export function memoize2of4<
T1 extends Record<string, any>,
T2 extends Record<string, any>,
T3 extends any,
T4 extends any,
R extends any
>(fn: (A1: T1, A2: T2, A3: T3, A4: T4) => R): (A1: T1, A2: T2, A3: T3, A4: T4) => R {
let cache1: WeakMap<T1, WeakMap<T2, R>>;

function memoized(a1: T1, a2: T2, a3: T3, a4: T4) {
if (!cache1) {
cache1 = new WeakMap();
const cache2: WeakMap<T2, R> = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3, a4);
cache2.set(a2, newValue);
return newValue;
}

let cache2 = cache1.get(a1);
if (!cache2) {
cache2 = new WeakMap();
cache1.set(a1, cache2);
const newValue = fn(a1, a2, a3, a4);
cache2.set(a2, newValue);
return newValue;
}

const cachedValue = cache2.get(a2);
if (cachedValue === undefined) {
const newValue = fn(a1, a2, a3, a4);
cache2.set(a2, newValue);
return newValue;
}

return cachedValue;
}

return memoized;
}
21 changes: 6 additions & 15 deletions packages/batch-execute/src/mergeExecutionParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
InlineFragmentNode,
FieldNode,
OperationTypeNode,
// GraphQLSchema,
GraphQLSchema,
} from 'graphql';

import { ExecutionParams } from '@graphql-tools/utils';
Expand Down Expand Up @@ -64,7 +64,7 @@ import { createPrefix } from './prefix';
*/
export function mergeExecutionParams(
executionParamSets: Array<ExecutionParams>,
// targetSchema: GraphQLSchema,
targetSchema: GraphQLSchema,
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
): ExecutionParams {
const mergedVariables: Record<string, any> = Object.create(null);
Expand All @@ -82,16 +82,8 @@ export function mergeExecutionParams(
prefixedExecutionParams.document.definitions.forEach(def => {
if (isOperationDefinition(def)) {
operation = def.operation;
const selections = def.selectionSet.selections;

// TODO:
// once splitting AsyncIterableIterator<AsyncExecutionResult> is implemented within splitResult,
// the below code can be uncommented to also utilize defer when batching.

/*
let selections: Array<SelectionNode>;
selections = targetSchema.getDirective('defer')
const selections = targetSchema.getDirective('defer')
? [
{
kind: Kind.INLINE_FRAGMENT,
Expand All @@ -113,12 +105,11 @@ export function mergeExecutionParams(
],
selectionSet: {
kind: Kind.SELECTION_SET,
selections,
selections: def.selectionSet.selections,
},
},
]
: selections;
*/
: def.selectionSet.selections;

mergedSelections.push(...selections);
mergedVariableDefinitions.push(...(def.variableDefinitions ?? []));
Expand Down Expand Up @@ -156,7 +147,7 @@ export function mergeExecutionParams(

function prefixExecutionParams(prefix: string, executionParams: ExecutionParams): ExecutionParams {
let document = aliasTopLevelFields(prefix, executionParams.document);
const variableNames = Object.keys(executionParams.variables);
const variableNames = executionParams.variables !== undefined ? Object.keys(executionParams.variables) : [];

if (variableNames.length === 0) {
return { ...executionParams, document };
Expand Down
57 changes: 33 additions & 24 deletions packages/batch-execute/src/split.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d
// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039

import { Push, Repeater } from '@repeaterjs/repeater';
import { Push, Stop, Repeater } from '@repeaterjs/repeater';

type Splitter<T> = (item: T) => [number, T];
type Splitter<T> = (item: T) => [number | undefined, T];

export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<IteratorResult<T>>) {
export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<T>) {
const iterator = asyncIterable[Symbol.asyncIterator]();
const returner = iterator.return ?? undefined;
const returner = iterator.return?.bind(iterator) ?? undefined;

const buffers: Array<Array<IteratorResult<T>>> = Array(n).fill([]);
const buffers: Array<Array<IteratorResult<T>>> = Array(n);
for (let i = 0; i < n; i++) {
buffers[i] = [];
};

if (returner) {
const set: Set<number> = new Set();
Expand All @@ -25,22 +28,22 @@ export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, spl
}
});

await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
await loop(push, stop, earlyReturn, buffer, buffers, iterator, splitter);

await earlyReturn;
});
});
}

return buffers.map(
(buffer, index) =>
buffer =>
new Repeater(async (push, stop) => {
let earlyReturn: any;
stop.then(() => {
earlyReturn = returner ? returner() : true;
});

await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
await loop(push, stop, earlyReturn, buffer, buffers, iterator, splitter);

await earlyReturn;
})
Expand All @@ -49,16 +52,16 @@ export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, spl

async function loop<T>(
push: Push<T>,
stop: Stop,
earlyReturn: Promise<any> | any,
buffer: Array<IteratorResult<T>>,
index: number,
buffers: Array<Array<IteratorResult<T>>>,
iterator: AsyncIterator<T>,
splitter: Splitter<IteratorResult<T>>
splitter: Splitter<T>
): Promise<void> {
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(buffer, index, buffers, iterator, splitter);
const iteration = await next(buffer, buffers, iterator, splitter);

if (iteration === undefined) {
continue;
Expand All @@ -76,32 +79,38 @@ async function loop<T>(

async function next<T>(
buffer: Array<IteratorResult<T>>,
index: number,
buffers: Array<Array<IteratorResult<T>>>,
iterator: AsyncIterator<T>,
splitter: Splitter<IteratorResult<T>>
splitter: Splitter<T>
): Promise<IteratorResult<T> | undefined> {
let iteration: IteratorResult<T>;

if (0 in buffer) {
return buffer.shift();
}

const iterationCandidate = await iterator.next();

let tee = true;
const value = iterationCandidate.value;
if (value) {
if (value !== undefined) {
const [iterationIndex, newValue] = splitter(value);
if (index === iterationIndex) {
return newValue;
if (iterationIndex !== undefined) {
buffers[iterationIndex].push({
...iterationCandidate,
value: newValue,
});
tee = false;
}

buffers[iterationIndex].push(iteration);
return undefined;
}

for (const buffer of buffers) {
buffer.push(iteration);
if (tee) {
for (const b of buffers) {
b.push(iterationCandidate);
}
}
return iterationCandidate;

if (0 in buffer) {
return buffer.shift();
};

return undefined;
}
70 changes: 67 additions & 3 deletions packages/batch-execute/src/splitResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ExecutionResult, GraphQLError } from 'graphql';

import isPromise from 'is-promise';

import { AsyncExecutionResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils';
import { AsyncExecutionResult, ExecutionPatchResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils';

import { parseKey } from './prefix';
import { split } from './split';
Expand Down Expand Up @@ -39,8 +39,72 @@ export function splitExecutionResultOrAsyncIterableIterator(
): Array<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>> {
if (isAsyncIterable(mergedResult)) {
return split(mergedResult, numResults, originalResult => {
// TODO: implement splitter instead of this placeholder
return [0, originalResult];
const path = (originalResult as ExecutionPatchResult).path;
if (path && path.length) {
const { index, originalKey } = parseKey(path[0] as string);
const newPath = ([originalKey] as Array<string | number>).concat(path.slice(1));

const newResult: ExecutionPatchResult = {
...(originalResult as ExecutionPatchResult),
path: newPath,
};

const errors = originalResult.errors;
if (errors) {
const newErrors: Array<GraphQLError> = [];
errors.forEach(error => {
if (error.path) {
const parsedKey = parseKey(error.path[0] as string);
if (parsedKey) {
const { originalKey } = parsedKey;
const newError = relocatedError(error, [originalKey, ...error.path.slice(1)]);
newErrors.push(newError);
return;
}
}

newErrors.push(error);
});
newResult.errors = newErrors;
}

return [index, newResult];
}

let resultIndex: number;
const newResult: ExecutionResult = { ...originalResult };
const data = originalResult.data;
if (data) {
const newData = {};
Object.keys(data).forEach(prefixedKey => {
const { index, originalKey } = parseKey(prefixedKey);
resultIndex = index;
newData[originalKey] = data[prefixedKey];
});
newResult.data = newData;
}

const errors = originalResult.errors;
if (errors) {
const newErrors: Array<GraphQLError> = [];
errors.forEach(error => {
if (error.path) {
const parsedKey = parseKey(error.path[0] as string);
if (parsedKey) {
const { index, originalKey } = parsedKey;
resultIndex = index;
const newError = relocatedError(error, [originalKey, ...error.path.slice(1)]);
newErrors.push(newError);
return;
}
}

newErrors.push(error);
});
newResult.errors = newErrors;
}

return [resultIndex, newResult]
});
}

Expand Down
Loading

0 comments on commit cb4473c

Please sign in to comment.