Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport/server: fix race between writing status and header #2074

Merged
merged 2 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,20 +683,38 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
})
}

func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
for k, vv := range md {
if isReservedHeader(k) {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
return headerFields
}

// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.headerOk || s.getState() == streamDone {
if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.headerOk = true
s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
} else {
s.header = md
}
}
md = s.header
t.writeHeaderLocked(s)
s.hdrMu.Unlock()
return nil
}

func (t *http2Server) writeHeaderLocked(s *Stream) {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
Expand All @@ -705,15 +723,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.sendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
for k, vv := range md {
if isReservedHeader(k) {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
Expand All @@ -729,29 +739,27 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader)
}
return nil
}

// WriteStatus sends stream status to the client and terminates the stream.
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
if !s.headerOk && s.header.Len() > 0 {
if err := t.WriteHeader(s, nil); err != nil {
return err
}
} else {
if s.getState() == streamDone {
return nil
}
if s.getState() == streamDone {
return nil
}
s.hdrMu.Lock()
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.headerOk {
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame.
t.writeHeaderLocked(s)
} else { // Send a trailer only response.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
}
}
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
Expand All @@ -767,24 +775,17 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
}

// Attach the trailer metadata.
for k, vv := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
if isReservedHeader(k) {
continue
}
for _, v := range vv {
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
}
}
trailer := &headerFrame{
headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
trailingHeader := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: true,
onWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
},
}
t.closeStream(s, false, 0, trailer, true)
s.hdrMu.Unlock()
t.closeStream(s, false, 0, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
Expand All @@ -794,7 +795,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.headerOk { // Headers haven't been written yet.
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return streamErrorf(codes.Internal, "transport: %v", err)
Expand Down
37 changes: 31 additions & 6 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,20 @@ type Stream struct {

headerChan chan struct{} // closed to indicate the end of header metadata.
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
header metadata.MD // the received header metadata.
trailer metadata.MD // the key-value map of trailer metadata.

headerOk bool // becomes true from the first header is about to send
state streamState
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
header metadata.MD // the received header metadata.
trailer metadata.MD // the key-value map of trailer metadata.

status *status.Status // the status error received from the server
// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
headerSent uint32

state streamState

// On client-side it is the status error received from the server.
// On server-side it is unused.
status *status.Status

bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
Expand All @@ -201,6 +208,17 @@ type Stream struct {
contentSubtype string
}

// isHeaderSent is only valid on the server-side.
func (s *Stream) isHeaderSent() bool {
return atomic.LoadUint32(&s.headerSent) == 1
}

// updateHeaderSent updates headerSent and returns true
// if it was alreay set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}

func (s *Stream) swapState(st streamState) streamState {
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
}
Expand Down Expand Up @@ -313,10 +331,12 @@ func (s *Stream) SetHeader(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
if s.isHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.hdrMu.Lock()
s.header = metadata.Join(s.header, md)
s.hdrMu.Unlock()
return nil
}

Expand All @@ -335,7 +355,12 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
if s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
s.hdrMu.Lock()
s.trailer = metadata.Join(s.trailer, md)
s.hdrMu.Unlock()
return nil
}

Expand Down