Skip to content

Commit

Permalink
x-pack/filebeat/input/websocket: do minor clean-up in main loop (#40145)
Browse files Browse the repository at this point in the history
* remove unneeded goroutine
* fix logging: The body was previously not being logged since an io.ReadCloser
  is not a JSON-serialisable type.
  • Loading branch information
efd6 authored Jul 18, 2024
1 parent 942f8c7 commit 463bbb4
Showing 1 changed file with 30 additions and 34 deletions.
64 changes: 30 additions & 34 deletions x-pack/filebeat/input/websocket/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
package websocket

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"time"

"github.com/google/cel-go/cel"
"github.com/gorilla/websocket"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -109,7 +112,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
headers := formHeader(cfg)
c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers)
if resp != nil && resp.Body != nil {
log.Debugw("websocket connection response", "body", resp.Body)
var buf bytes.Buffer
if log.Core().Enabled(zapcore.DebugLevel) {
const limit = 1e4
io.CopyN(&buf, resp.Body, limit)
}
if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 {
buf.WriteString("... truncated")
}
log.Debugw("websocket connection response", "body", &buf)
resp.Body.Close()
}
if err != nil {
Expand All @@ -119,41 +130,26 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}
defer c.Close()

done := make(chan error)

go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
metrics.errorsTotal.Inc()
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Errorw("websocket connection closed", "error", err)
} else {
log.Errorw("failed to read websocket data", "error", err)
}
done <- err
return
}
metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log)
if err != nil {
metrics.errorsTotal.Inc()
log.Errorw("failed to process and publish data", "error", err)
done <- err
return
for {
_, message, err := c.ReadMessage()
if err != nil {
metrics.errorsTotal.Inc()
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Errorw("websocket connection closed", "error", err)
} else {
log.Errorw("failed to read websocket data", "error", err)
}
return err
}
metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log)
if err != nil {
metrics.errorsTotal.Inc()
log.Errorw("failed to process and publish data", "error", err)
return err
}
}()

// blocks until done is closed, context is cancelled or an error is received
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down

0 comments on commit 463bbb4

Please sign in to comment.