Skip to content

Commit

Permalink
client: Parse the url only once per Client
Browse files Browse the repository at this point in the history
This stood out on flame graphs due ultimately to the
http.NewRequestFromContext call, which ends up calling url.Parse.

I did an optimization in #447 which similarly, and this is a natural
extension of this.

We can just parse the url string once during NewClient, validate at that
point, then tag along the parsed *url.URL everywhere else and never use
it as a string again. This value never mutates through the lifetime of a
Client and isn't publicly available on any structs.
  • Loading branch information
mattrobenolt committed Feb 18, 2023
1 parent 651016d commit cde703b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 51 deletions.
31 changes: 28 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package connect
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
)

// Client is a reusable, concurrency-safe client for a single procedure.
Expand Down Expand Up @@ -55,7 +58,7 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
Protobuf: config.protobuf(),
CompressMinBytes: config.CompressMinBytes,
HTTPClient: httpClient,
URL: url,
URL: config.URL,
BufferPool: config.BufferPool,
ReadMaxBytes: config.ReadMaxBytes,
SendMaxBytes: config.SendMaxBytes,
Expand Down Expand Up @@ -171,6 +174,7 @@ func (c *Client[Req, Res]) newConn(ctx context.Context, streamType StreamType) S
}

type clientConfig struct {
URL *url.URL
Protocol protocol
Procedure string
CompressMinBytes int
Expand All @@ -184,9 +188,14 @@ type clientConfig struct {
SendMaxBytes int
}

func newClientConfig(url string, options []ClientOption) (*clientConfig, *Error) {
protoPath := extractProtoPath(url)
func newClientConfig(rawURL string, options []ClientOption) (*clientConfig, *Error) {
url, err := parseRequestURL(rawURL)
if err != nil {
return nil, err
}
protoPath := extractProtoPath(url.Path)
config := clientConfig{
URL: url,
Protocol: &protocolConnect{},
Procedure: protoPath,
CompressionPools: make(map[string]*compressionPool),
Expand Down Expand Up @@ -229,3 +238,19 @@ func (c *clientConfig) newSpec(t StreamType) Spec {
IsClient: true,
}
}

func parseRequestURL(rawURL string) (*url.URL, *Error) {
url, err := url.ParseRequestURI(rawURL)
if err == nil {
return url, nil
}
if !strings.Contains(rawURL, "://") {
// URL doesn't have a scheme, so the user is likely accustomed to
// grpc-go's APIs.
err = fmt.Errorf(
"URL %q missing scheme: use http:// or https:// (unlike grpc-go)",
rawURL,
)
}
return nil, NewError(CodeUnavailable, err)
}
49 changes: 31 additions & 18 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"sync"
)

Expand Down Expand Up @@ -52,19 +53,26 @@ type duplexHTTPCall struct {
func newDuplexHTTPCall(
ctx context.Context,
httpClient HTTPClient,
url string,
url *url.URL,
spec Spec,
header http.Header,
) *duplexHTTPCall {
// ensure we make a copy of the url before we pass along to the
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
pipeReader, pipeWriter := io.Pipe()
request, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
url,
pipeReader,
)
request.Header = header
client := &duplexHTTPCall{
request := (&http.Request{
Method: http.MethodPost,
URL: url,
Header: header,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: pipeReader,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
Expand All @@ -73,15 +81,6 @@ func newDuplexHTTPCall(
request: request,
responseReady: make(chan struct{}),
}
if err != nil {
// We can't construct a request, so we definitely can't send it over the
// network. Exhaust the sync.Once immediately and short-circuit Read and
// Write by setting an error.
client.sendRequestOnce.Do(func() {})
connectErr := errorf(CodeUnavailable, "construct *http.Request: %w", err)
client.SetError(connectErr)
}
return client
}

// Write to the request body. Returns an error wrapping io.EOF after SetError
Expand Down Expand Up @@ -276,3 +275,17 @@ func (d *duplexHTTPCall) getError() error {
defer d.errMu.Unlock()
return d.err
}

// See: https://cs.opensource.google/go/go/+/refs/tags/go1.20.1:src/net/http/clone.go;l=22-33
func cloneURL(u *url.URL) *url.URL {
if u == nil {
return nil
}
u2 := new(url.URL)
*u2 = *u
if u.User != nil {
u2.User = new(url.Userinfo)
*u2.User = *u.User
}
return u2
}
4 changes: 2 additions & 2 deletions protobuf_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
// corresponding to the Protobuf package, service, and method. It always starts
// with a slash. Within connect, we use this as (1) Spec.Procedure and (2) the
// path when mounting handlers on muxes.
func extractProtoPath(url string) string {
segments := strings.Split(url, "/")
func extractProtoPath(path string) string {
segments := strings.Split(path, "/")
var pkg, method string
if len(segments) > 0 {
pkg = segments[0]
Expand Down
18 changes: 1 addition & 17 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type protocolClientParams struct {
Codec Codec
CompressMinBytes int
HTTPClient HTTPClient
URL string
URL *url.URL
BufferPool *bufferPool
ReadMaxBytes int
SendMaxBytes int
Expand Down Expand Up @@ -238,22 +238,6 @@ func discard(reader io.Reader) error {
return err
}

func validateRequestURL(rawURL string) (*url.URL, *Error) {
url, err := url.ParseRequestURI(rawURL)
if err == nil {
return url, nil
}
if !strings.Contains(rawURL, "://") {
// URL doesn't have a scheme, so the user is likely accustomed to
// grpc-go's APIs.
err = fmt.Errorf(
"URL %q missing scheme: use http:// or https:// (unlike grpc-go)",
rawURL,
)
}
return nil, NewError(CodeUnavailable, err)
}

// negotiateCompression determines and validates the request compression and
// response compression using the available compressors and protocol-specific
// Content-Encoding and Accept-Encoding headers.
Expand Down
6 changes: 1 addition & 5 deletions protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,9 @@ func (*protocolConnect) NewHandler(params *protocolHandlerParams) protocolHandle

// NewClient implements protocol, so it must return an interface.
func (*protocolConnect) NewClient(params *protocolClientParams) (protocolClient, error) {
url, err := validateRequestURL(params.URL)
if err != nil {
return nil, err
}
return &connectClient{
protocolClientParams: *params,
peer: newPeerFromURL(url, ProtocolConnect),
peer: newPeerFromURL(params.URL, ProtocolConnect),
}, nil
}

Expand Down
8 changes: 2 additions & 6 deletions protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,9 @@ func (g *protocolGRPC) NewHandler(params *protocolHandlerParams) protocolHandler

// NewClient implements protocol, so it must return an interface.
func (g *protocolGRPC) NewClient(params *protocolClientParams) (protocolClient, error) {
url, err := validateRequestURL(params.URL)
if err != nil {
return nil, err
}
peer := newPeerFromURL(url, ProtocolGRPC)
peer := newPeerFromURL(params.URL, ProtocolGRPC)
if g.web {
peer = newPeerFromURL(url, ProtocolGRPCWeb)
peer = newPeerFromURL(params.URL, ProtocolGRPCWeb)
}
return &grpcClient{
protocolClientParams: *params,
Expand Down

0 comments on commit cde703b

Please sign in to comment.