Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context lifecycle in subscriptions #894

Closed
kouak opened this issue Jun 1, 2017 · 11 comments
Closed

Context lifecycle in subscriptions #894

kouak opened this issue Jun 1, 2017 · 11 comments
Assignees

Comments

@kouak
Copy link

kouak commented Jun 1, 2017

After reading through the codebase, my understanding of the subscription implementation is as follow :

  • initial call subscribe() when a subscription operation hits the server,
  • subscribe() returns an AsyncIterator
  • This AsyncIterator contains a stream of results of subsequent calls to execute()

Again, if my understanding is correct, contextValue is passed to subscribe on the initial subscription operation and then passed down to execute whenever an event triggers a reexecution.

In short, contextValue is set once a subscription operation happens and will remain the same until the client unsubscribes.

A common graphql-js implementation is to use DataLoader as a batching/caching library. For queries and mutations, a set of fresh dataloader instances is usually bound to contextValue on a per request basis. When the query/mutation is done, those DataLoader instances are left for garbage collection.

Now, subscription context has a much longer life span, making DataLoader caching behaviour unwarranted in most cases.

Ideally, I guess providing a way to set contextValue per execute() call would allow a behaviour that matches what one could except when setting contextValue on a per request basis for queries and mutations.

Is there any way to do so in current implementation ? Is this a behaviour that could be implemented in graphql-js or should I settle for one context per subscription operation and work my way around that ?

@ArnaudRinquin
Copy link

I had the same question in mind and piked at the code. It seems that we could add a makeExecutionContext(params.context) and use it to get an updated context before executing the query.

I think it could be hooked here and should be a simple change.

What do you think?

@ArnaudRinquin
Copy link

@kouak Have you made any progress on this? Have you found a work-around?

@kouak
Copy link
Author

kouak commented Jun 19, 2017

Sorry, no progress on this.

My use case involves DataLoader, my workaround is to disable response caching and only leverage DataLoader batching abilities in the subscription context.

As far as I understand, your proposed implementation would not work.

My understanding of the subscription-transport-ws implementation is as follow :

  • A subscription message arrives on the wire
  • this.subscribe is called here returning an asyncIterable. This step sets a contextValue when a subscription is starded and gives control to graphql-js
  • subscription-transport-ws then loops over async values returned by graphql-js here

If my understanding is correct, a graphql-js consumer has no way of modifying contextValue once subscribe has been called.

@yutin1987
Copy link

yutin1987 commented Dec 11, 2017

maybe..., we can modify this: https://github.com/graphql/graphql-js/blob/master/src/subscription/subscribe.js#L149

import assign from 'lodash/assign';
import { execute, createSourceEventStream } from 'graphql';

function subscribe(
  schema,
  document,
  rootValue,
  contextValue,
  variableValues,
  operationName,
  fieldResolver,
  subscribeFieldResolver,
) {
  const sourcePromise = createSourceEventStream(
    schema,
    document,
    rootValue,
    contextValue,
    variableValues,
    operationName,
    subscribeFieldResolver,
  );

  const mapSourceToResponse = payload =>
    execute(
      schema,
      document,
      payload.data,
      assign({}, contextValue, payload.contextValue), // this line
      variableValues,
      operationName,
      fieldResolver,
    );

  return sourcePromise.then(
    sourceStream =>
      mapAsyncIterator(sourceStream, mapSourceToResponse, reportGraphQLError),
    reportGraphQLError,
  );
}

@mattkrick
Copy link
Contributor

FWIW, here's how i solved this:

  • for each request, put a new dataloader in a bag of dataloaders
  • Pass in the bag of dataloaders to the context instead of a single dataloader
  • When your mutation publishes something, include the dataloader ID you're working with
  • When the subscription resolves the payload, use the dataLoader ID that was passed to it.

Extra details:

  • the data loaders created by the subscription should have cache: false that way if it doesn't use the mutation's dataloader, it doesn't store stuff for the life of the subscription
  • if the authorization logic for the mutation payload is more strict than the subscription payload, flush it before sharing it or just don't share it
  • dispose of the subscription dataloader when you call asyncIterator.return().
  • dispose of the mutation dataloader after a couple seconds. trying to clear it after the last subscriber gets their message is VERY difficult unless you want to write your own pubsub engine
  • this only works for all subscribers on the same node, so be smart about stateful socket servers & stateless graphql servers & consider moving to redis when the time comes. worst case scenario: subscribers on a different worker don't use the cached value.

@ravenscar
Copy link

I have a function called buildDataLoaders() which returns an object keyed with the names of all the dataloaders I need, I call this the loaderBucket.

When I get a query or mutation over http then I call buildDataLoaders() to create a new loaderBucket and shove it in the context for the resolvers, the resolvers then use object destructuring to pull out the dataloaders they need.

Socket connections for subscriptions are are more difficult.

In the end I called buildDataLoaders() when the subscription starts, and implemented a flush() function which lives in the loaderBucket. flush() is a recursive call to buildDataLoaders() which will create a new set of fresh dataloaders, then clobber the loaderBucket with the new dataloaders.

In the resolve in the subscriptions I call flush() so it has a new set of dataLoaders for each event sent out. This way I can use the caching functionality of the dataloaders in subscriptions.

I know this isn't super efficient but it's pretty fast, and could be turned into a lazy implementation if I need to.

@tgriesser
Copy link
Contributor

tgriesser commented Feb 10, 2020

I just went to implement subscriptions for the first time and ran into this issue as well. Currently working around it using this patch using patch-package.

What this allows us to do is specify a function contextValueExecution which derives a new context for each new execution phase of the subscription.

const result = await graphqlSubscribe({
  schema: graphqlSchema,
  document,
  variableValues: variables,
  contextValue: makeDataContext('subscription'),
  contextValueExecution(originalCtx) {
    return makeDataContext('subscription-execution', { parent: originalCtx })
  },
})

I think given the way folks currently use context as a sort of singleton "state container" in the query / mutation execution, a similar concept is necessary for subscriptions executions to work well without too many work-arounds. Particularly that is usually the reason you're re-triggering a subscription - because state has changed, you'd want to start from a fresh context.

@IvanGoncharov let me know what you think of this approach, if it's something that would be reasonable to PR, and if so if the API naming is alright or should be changed.

@mattkrick
Copy link
Contributor

@tgriesser I really like this solution. The only problem I see, and please correct me if I'm wrong, but it seems this might be predicated on the subscription server being able to read context from the stateless (non-subscription) server?

use case:

  • there exist 1+ stateful servers (state includes subscription iterator, websocket, etc.) and 1+ stateless graphql execution servers.
  • when that socket server receives a subscription request, it calls createSourceEventStream to create the Source Stream (a SubscriptionIterator) and uses that to crease a Response Stream
  • when ResponseStream.next() is called, it calls the stateless graphql server (with an optional server ID & dataloader ID so it can reuse the same dataloader that the mutation used)

@tgriesser
Copy link
Contributor

The only problem I see, and please correct me if I'm wrong, but it seems this might be predicated on the subscription server being able to read context from the stateless (non-subscription) server?

It shouldn't have anything to do with servers, this is purely an addition to allow the mapSourceToResponse code to optionally create a new context, generated via a function that is provided to the original subscribe. How you create that context, or implement it in relation to your servers is up to you.

Any "graphql server", is really just a wrapper around the core execute algorithm, which you can actually call independently in your own code without needing a server. So this is just building on that idea to optionally allow a way to separate the context for subscribe from that of execute.

@mattkrick
Copy link
Contributor

gotcha, i think we just use different words. what you call a graphql server i call an execution server. there can also exist a subscription server, which does not execute graphql queries/mutations. In that case,mapSourceToResponse isn't used, so the fix wouldn't help any folks who separate the 2

* This may be useful when hosting the stateful subscription service in a
* different process or machine than the stateless GraphQL execution engine,
* or otherwise separating these two steps. For more on this, see the
* "Supporting Subscriptions at Scale" information in the GraphQL specification.

tgriesser added a commit to tgriesser/graphql-js that referenced this issue Mar 11, 2020
yaacovCR added a commit that referenced this issue Oct 1, 2024
to safely export buildExecutionContext() as validateExecutionArgs()

motivation: this will allow us to:

1. export `executeOperation()` and `executeSubscription()` for those who would like to use directly.
2. add a `perEventExecutor` option to `ExecutionArgs` that allows one to pass a custom context for execution of each execution event, with the opportunity to clean up the context on return, a la #2485 and #3071, addressing #894, which would not require re-coercion of variables.

The signature of the `perEventExecutor` option would be:

```ts
type SubscriptionEventExecutor = ( validatedExecutionArgs: ValidatedExecutionArgs): PromiseOrValue<ExecutionResult>
```

rather than:

```ts
type SubscriptionEventExecutor = ( executionArgs: ExecutionArgs): PromiseOrValue<ExecutionResult>
```

This might be a first step to integrating `subscribe()` completely into `execute()` (see: #3644) but is also a reasonable stopping place.
yaacovCR added a commit that referenced this issue Oct 6, 2024
by exporting executeSubscriptionEvent()

and adding option for to provide a custom fn

addresses #894

cf. #2485 , #3071
@yaacovCR
Copy link
Contributor

yaacovCR commented Oct 7, 2024

Closed by #4211

@yaacovCR yaacovCR closed this as completed Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants