Skip to content

Commit

Permalink
events: fix slow client connection to empty event stream (#10637)
Browse files Browse the repository at this point in the history
* events: fix slow client connection to empty event stream

* doc: fix changelog of event stream connection init
  • Loading branch information
h3yduck authored May 21, 2021
1 parent 771aad2 commit 6b083c2
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 1.1.1 (Unreleased)

BUG FIXES:
* api: Fixed event stream connection initialization when there are no events to send [[GH-10637](https://github.com/hashicorp/nomad/issues/10637)]
* cli: Fixed a bug where `quota status` and `namespace status` commands may panic if the CLI targets a pre-1.1.0 cluster

## 1.1.0 (May 18, 2021)
Expand Down
4 changes: 3 additions & 1 deletion nomad/stream/ndjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ type JsonStream struct {

// NewJsonStream creates a new json stream that will output Json structs
// to the passed output channel. The constructor starts a goroutine
// to begin heartbeating on its set interval.
// to begin heartbeating on its set interval and also sends an initial heartbeat
// to notify the client about the successful connection initialization.
func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream {
s := &JsonStream{
ctx: ctx,
outCh: make(chan *structs.EventJson, 10),
heartbeatTick: time.NewTicker(heartbeat),
}

s.outCh <- JsonHeartbeat
go s.heartbeat()

return s
Expand Down
17 changes: 6 additions & 11 deletions nomad/stream/ndjson_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stream

import (
"bytes"
"context"
"testing"
"time"
Expand All @@ -24,12 +23,12 @@ func TestJsonStream(t *testing.T) {

require.NoError(t, s.Send(testObj{Name: "test"}))

out1 := <-out
initialHeartbeat := <-out
require.Equal(t, []byte(`{}`), initialHeartbeat.Data)

var expected bytes.Buffer
expected.Write([]byte(`{"name":"test"}`))
testMessage1 := <-out
require.Equal(t, []byte(`{"name":"test"}`), testMessage1.Data)

require.Equal(t, expected.Bytes(), out1.Data)
select {
case msg := <-out:
require.Failf(t, "Did not expect another message", "%#v", msg)
Expand All @@ -38,12 +37,8 @@ func TestJsonStream(t *testing.T) {

require.NoError(t, s.Send(testObj{Name: "test2"}))

out2 := <-out
expected.Reset()

expected.Write([]byte(`{"name":"test2"}`))
require.Equal(t, expected.Bytes(), out2.Data)

testMessage2 := <-out
require.Equal(t, []byte(`{"name":"test2"}`), testMessage2.Data)
}

func TestJson_Send_After_Stop(t *testing.T) {
Expand Down

0 comments on commit 6b083c2

Please sign in to comment.