From 88681a9974b4b2eb486200371aa47add0052c6a9 Mon Sep 17 00:00:00 2001 From: Beorn Facchini Date: Sat, 11 Jan 2020 00:16:07 +1100 Subject: [PATCH] Read websocket close in tail handler (#1383) * Read websocket close in tail handler * Send close message to websocket on quit signals --- pkg/logcli/query/tail.go | 14 ++++++++++++++ pkg/querier/http.go | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/logcli/query/tail.go b/pkg/logcli/query/tail.go index 905c6fea66f2a..63f71439ad3c4 100644 --- a/pkg/logcli/query/tail.go +++ b/pkg/logcli/query/tail.go @@ -3,9 +3,13 @@ package query import ( "fmt" "log" + "os" + "os/signal" "strings" + "syscall" "github.com/fatih/color" + "github.com/gorilla/websocket" "github.com/grafana/loki/pkg/logcli/client" "github.com/grafana/loki/pkg/logcli/output" "github.com/grafana/loki/pkg/loghttp" @@ -18,6 +22,16 @@ func (q *Query) TailQuery(delayFor int, c *client.Client, out output.LogOutput) log.Fatalf("Tailing logs failed: %+v", err) } + go func() { + stopChan := make(chan os.Signal, 1) + signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM) + <-stopChan + if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { + log.Println("Error closing websocket:", err) + } + os.Exit(0) + }() + tailReponse := new(loghttp.TailResponse) if len(q.IgnoreLabelsKey) > 0 { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 77925bed41c8e..20d6e8afc098e 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -182,6 +182,23 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { responseChan := tailer.getResponseChan() closeErrChan := tailer.getCloseErrorChan() + doneChan := make(chan struct{}) + go func() { + for { + _, _, err := conn.ReadMessage() + if err != nil { + if closeErr, ok := err.(*websocket.CloseError); ok { + if closeErr.Code == websocket.CloseNormalClosure { + break + } + level.Error(util.Logger).Log("msg", "Error from client", "err", err) + break + } + } + } + doneChan <- struct{}{} + }() + for { select { case response = <-responseChan: @@ -214,6 +231,8 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { } return } + case <-doneChan: + return } } }