Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfc: limit handling in distsql #16287

Merged
merged 1 commit into from
Jun 13, 2017
Merged

Conversation

RaduBerinde
Copy link
Member

Initial draft.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@knz
Copy link
Contributor

knz commented Jun 2, 2017

Thanks for writing this.

One aspect I am missing from this discussion is to reveal opportunities due to non-deterministic ordering when ORDER BY is not present. A clause like select * from foo where x = 10 limit 10 (assuming no index on x) can also be satisfied by launching a distributed read / filter, and abort the distributed computation as soon as 10 results have been received by the gateway. I think there are use cases where this would be beneficial.


Review status: 0 of 1 files reviewed at latest revision, 5 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 37 at r1 (raw file):

     `scanNode`.

     *Note: The DistSQL sorter is aware of partial orders but the classic SQL

s/classic/local


docs/RFCS/distsql_limit.md, line 64 at r1 (raw file):

   instances corresponding to a planNode) output rows to streams, in parallel
   with the consumers of those streams. There is no sequencing step where a
   processor can stop and wait and see if the consumer needs more rows. In

Are you saying there is now flow control? I think this is a good occasion to explain how flow control is organized, and discuss whether we need more flow control.


docs/RFCS/distsql_limit.md, line 85 at r1 (raw file):

The goal for this RFC is to find ways to work around the hurdles above to
achieve performance comparable with Classic SQL in all limit cases.

s/classic/local


docs/RFCS/distsql_limit.md, line 108 at r1 (raw file):

### Short-term proposal ###

The simple proposal is to not distribute TableReaders for limited scans.

Probably this should only be done when the LIMIT value is small, e.g. less than 10000 rows. Beyond that, distribution might still be helpful.


docs/RFCS/distsql_limit.md, line 115 at r1 (raw file):

We believe that this solution is good for many common cases. It is at least as
good as classic SQL for queries that only read rows from a single range. It also

s/classic/local


Comments from Reviewable

@RaduBerinde
Copy link
Member Author

Thanks!

In many of these cases, we still don't want to distribute. If there's a good chance of a single node returning 10 rows, then we would be wasting a lot of work if we went to more nodes. It might help latency of a single query but it will be bad for overall throughput. It's tricky to decide.

There are though some other optimizations we can do in this case:

  • first read through all the ranges held by this node, then fallback to reading other ranges
  • choose the node which holds the most ranges

I will think about it some more and write down something.


Review status: 0 of 1 files reviewed at latest revision, 5 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 64 at r1 (raw file):

Previously, knz (kena) wrote…

Are you saying there is now flow control? I think this is a good occasion to explain how flow control is organized, and discuss whether we need more flow control.

There is flow control provided by buffered channels (local streams) and gRPC (remote streams). But the gRPC window sizes are large (to achieve throughput on high latency links).


docs/RFCS/distsql_limit.md, line 108 at r1 (raw file):

Previously, knz (kena) wrote…

Probably this should only be done when the LIMIT value is small, e.g. less than 10000 rows. Beyond that, distribution might still be helpful.

Yes. I remember writing a cutoff like that for index selection.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Review status: 0 of 1 files reviewed at latest revision, 6 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 102 at r2 (raw file):

## Planning ##

We have two proposals here: on is simple and should be implemented in the

s/on is/one is/g


Comments from Reviewable

@RaduBerinde RaduBerinde force-pushed the limit-rfc branch 5 times, most recently from 476d1c6 to ccf0445 Compare June 5, 2017 13:58
@RaduBerinde
Copy link
Member Author

Updated.

@rjnn
Copy link
Contributor

rjnn commented Jun 5, 2017

I think this RFC should be a little more explicit that the goal is to reduce wasteful computation, but care should be taken to explicitly highlight this tradeoff, and perhaps have knobs to shut it down for users who don't mind throwing CPU at the problem for maximum performance.


Review status: 0 of 1 files reviewed at latest revision, 7 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 157 at r3 (raw file):

This solution comes with a planning question: on which node do we put this
planning synchronizer? A reasonable first implementation is to always use the

I think planning synchronizers need to be generalized a little. Eventually, with materialized views, we will have every processor preceded with a planning synchronizer on each of its inputs, so that all processors can reconstruct their entire input subtrees upon a dead input processor. This is complicated by scenarios where we have a full bisection of N^2 streams (e.g. a hashjoin spread across 5 nodes, followed by a filter stage). If one of the hashjoiners (w.l.o.g. say on node 3) dies, who is responsible for bringing it back up? We have 5 downstream processors (likely only 4, since if node 3 died, then the downstream filternode on node3 also died).

I think fault tolerance is more complicated. It probably deserves an RFC of its own.


Comments from Reviewable

@bdarnell
Copy link
Contributor

bdarnell commented Jun 5, 2017

LGTM


Review status: 0 of 1 files reviewed at latest revision, 10 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 65 at r3 (raw file):

   with the consumers of those streams. We have flow control on streams -
   provided by buffered channels on local streams, and by gRPC on remote
   streams; the gRPC window sizes are quite large, necessary for high

Should we do some sort of slow-start for these window sizes? gRPC doesn't currently support varying these window sizes over time but we might be able to build that in if it would make things better at other levels. (I'm thinking we probably don't want to rely too much on network/rpc-level flow control here, but it's a possibility if it would be helpful).


docs/RFCS/distsql_limit.md, line 121 at r3 (raw file):

has the advantage that we only need to resolve one range during planning. The
main problem is that in bad cases the table data will be "routed" through that
first node; this can be very bad if that first node happens to be in a far away

It could also be bad if that range becomes a hotspot. Local execution would spread the load across all the gateway nodes in the system, while this plan might concentrate load on certain ranges (for example, the first range of a table). Some of the work will inevitably fall on that range sine that's where some of the data lives, but it might be better not to concentrate it further.


docs/RFCS/distsql_limit.md, line 132 at r3 (raw file):

In cases like `SELECT * FROM t LIMIT 10`, we can return any 10 rows; so we are
free to read ranges in any order. An optimization to the proposed solution is to
place the single TableReader on the node that has most ranges for the table, and

s/ranges/range leases/

This seems to concentrate the load unnecessarily. I think here we'd ideally want to target the least-loaded nearby node that has the lease for at least one range in the table. But in any case the exact choices made here don't matter too much because we have a lot of flexibility to change int he future.


Comments from Reviewable

@andreimatei
Copy link
Contributor

:lgtm:

Thanks for writing this


Review status: 0 of 1 files reviewed at latest revision, 13 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 74 at r3 (raw file):

   if we don't need to.
 - there are multiple instances of TableReaders for a given table, each working
   on the ranges are held by that node. Without a special way to plan limits, we

s/are held/held.

Also there's a stray new line at the end of the prev paragraph.


docs/RFCS/distsql_limit.md, line 84 at r3 (raw file):

useful classes of statements that have limits but where we can't put an (even
optimistic) limit on scanNodes; for example
`SELECT * FROM t ORDER BY v LIMIT 10`, where we have to read the entire table in

nit: you might want to clarify that these examples still have a limit, but at a higher level


docs/RFCS/distsql_limit.md, line 159 at r3 (raw file):

planning synchronizer? A reasonable first implementation is to always use the
gateway node. More advanced decisions are possible here though (e.g. if the
table is confined to a datacenter, the synchronizer should be in that

s/datacenter/zone (as that's the crdb word that captures locality)


Comments from Reviewable

@asubiotto
Copy link
Contributor

:lgtm:


Review status: 0 of 1 files reviewed at latest revision, 13 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 65 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Should we do some sort of slow-start for these window sizes? gRPC doesn't currently support varying these window sizes over time but we might be able to build that in if it would make things better at other levels. (I'm thinking we probably don't want to rely too much on network/rpc-level flow control here, but it's a possibility if it would be helpful).

In #14948, @petermattis mentioned that dynamic window sizing is on grpc-go's roadmap.


Comments from Reviewable

- a *soft limit* is one where we have only an optimistic estimation for how
many rows we need. Notable cases which lead to soft limits:

* limits with filters. `SELCT * FROM t WHERE color = 'red' LIMIT 10`. Here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/SELCT/SELECT

@RaduBerinde
Copy link
Member Author

Review status: 0 of 1 files reviewed at latest revision, 14 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 121 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

It could also be bad if that range becomes a hotspot. Local execution would spread the load across all the gateway nodes in the system, while this plan might concentrate load on certain ranges (for example, the first range of a table). Some of the work will inevitably fall on that range sine that's where some of the data lives, but it might be better not to concentrate it further.

Thanks, added a note. Local execution would spread part of the load, but the first range is still a hotspot (reading the first range from KV is roughly equivalent with a tablereader on that node). Ideally we would want our planner to be load-aware and don't put other processors on that node.


docs/RFCS/distsql_limit.md, line 132 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

s/ranges/range leases/

This seems to concentrate the load unnecessarily. I think here we'd ideally want to target the least-loaded nearby node that has the lease for at least one range in the table. But in any case the exact choices made here don't matter too much because we have a lot of flexibility to change int he future.

Done.


docs/RFCS/distsql_limit.md, line 157 at r3 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I think planning synchronizers need to be generalized a little. Eventually, with materialized views, we will have every processor preceded with a planning synchronizer on each of its inputs, so that all processors can reconstruct their entire input subtrees upon a dead input processor. This is complicated by scenarios where we have a full bisection of N^2 streams (e.g. a hashjoin spread across 5 nodes, followed by a filter stage). If one of the hashjoiners (w.l.o.g. say on node 3) dies, who is responsible for bringing it back up? We have 5 downstream processors (likely only 4, since if node 3 died, then the downstream filternode on node3 also died).

I think fault tolerance is more complicated. It probably deserves an RFC of its own.

Yes, this is something we will want to extend. The scope needed for this RFC is fairly limited (only supports planning table readers) so the details are more straightforward. There are a lot of complications for a more general primitive (as you point out); I agree this would need a separate RFC.


docs/RFCS/distsql_limit.md, line 159 at r3 (raw file):

Previously, andreimatei (Andrei Matei) wrote…

s/datacenter/zone (as that's the crdb word that captures locality)

Done.


Comments from Reviewable

@rjnn
Copy link
Contributor

rjnn commented Jun 8, 2017

:LGTM: Would you like to move this to the final comment period and make the forum post?


Reviewed 1 of 1 files at r4.
Review status: all files reviewed at latest revision, 14 unresolved discussions, all commit checks successful.


docs/RFCS/distsql_limit.md, line 157 at r3 (raw file):

Previously, RaduBerinde wrote…

Yes, this is something we will want to extend. The scope needed for this RFC is fairly limited (only supports planning table readers) so the details are more straightforward. There are a lot of complications for a more general primitive (as you point out); I agree this would need a separate RFC.

Sounds good to me.


Comments from Reviewable

@RaduBerinde
Copy link
Member Author

Done, entering final comment period.

@RaduBerinde RaduBerinde merged commit 9056474 into cockroachdb:master Jun 13, 2017
@RaduBerinde RaduBerinde deleted the limit-rfc branch June 13, 2017 13:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants