diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index a62c031d2ee9..6adde0b25ec5 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -201,7 +201,7 @@ func (s *diskstore) tryOpenLog() error { ok = true s.logNeedsTruncate = false s.logFile = f - s.logBuf = bufio.NewWriterSize(&ensureWriter{s.logFile}, s.bufferSize) + s.logBuf = bufio.NewWriterSize(s.logFile, s.bufferSize) return nil } @@ -346,7 +346,7 @@ func (s *diskstore) checkpointTmpFile(tempfile string, states map[string]entry) f.Close() }) - writer := bufio.NewWriterSize(&ensureWriter{f}, s.bufferSize) + writer := bufio.NewWriterSize(f, s.bufferSize) enc := newJSONEncoder(writer) if _, err = writer.Write([]byte{'['}); err != nil { return "", err @@ -650,7 +650,7 @@ func writeMetaFile(home string, mode os.FileMode) error { f.Close() }) - enc := newJSONEncoder(&ensureWriter{f}) + enc := newJSONEncoder(f) err = enc.Encode(storeMeta{ Version: storeVersion, }) diff --git a/libbeat/statestore/backend/memlog/util.go b/libbeat/statestore/backend/memlog/util.go index 685aec6a7d65..484005b615eb 100644 --- a/libbeat/statestore/backend/memlog/util.go +++ b/libbeat/statestore/backend/memlog/util.go @@ -24,20 +24,6 @@ import ( "syscall" ) -// ensureWriter writes the buffer to the underlying writer -// for as long as w returns a retryable error (e.g. EAGAIN) -// or the input buffer has been exhausted. -// -// XXX: this code was written and tested with go1.13 and go1.14, which does not -// handled EINTR. Some users report EINTR getting triggered more often in -// go1.14 due to changes in the signal handling for implementing -// preemption. -// In future versions EINTR will be handled by go for us. -// See: https://github.com/golang/go/issues/38033 -type ensureWriter struct { - w io.Writer -} - // countWriter keeps track of the amount of bytes written over time. type countWriter struct { n uint64 @@ -50,18 +36,6 @@ func (c *countWriter) Write(p []byte) (int, error) { return n, err } -func (e *ensureWriter) Write(p []byte) (int, error) { - var N int - for len(p) > 0 { - n, err := e.w.Write(p) - N, p = N+n, p[n:] - if err != nil && !isRetryErr(err) { - return N, err - } - } - return N, nil -} - func isRetryErr(err error) bool { return err == syscall.EINTR || err == syscall.EAGAIN } diff --git a/libbeat/statestore/backend/memlog/util_test.go b/libbeat/statestore/backend/memlog/util_test.go deleted file mode 100644 index fca2a2bbaf65..000000000000 --- a/libbeat/statestore/backend/memlog/util_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package memlog - -import ( - "syscall" - "testing" -) - -// A mock Writer implementation that always returns a configurable -// error on the first write call, to test error handling in ensureWriter. -type mockErrorWriter struct { - errorType error - reportedError bool -} - -func (mew *mockErrorWriter) Write(data []byte) (n int, err error) { - if !mew.reportedError { - mew.reportedError = true - return 0, mew.errorType - } - return len(data), nil -} - -func TestEnsureWriter_RetriableError(t *testing.T) { - // EAGAIN is retriable, ensureWriter.Write should succeed. - errorWriter := &mockErrorWriter{errorType: syscall.EAGAIN} - bytes := []byte{1, 2, 3} - writer := &ensureWriter{errorWriter} - written, err := writer.Write(bytes) - if err != nil { - t.Fatalf("ensureWriter shouldn't propagate retriable errors") - } - if written != len(bytes) { - t.Fatalf("Expected %d bytes written, got %d", len(bytes), written) - } -} - -func TestEnsureWriter_NonRetriableError(t *testing.T) { - // EINVAL is not retriable, ensureWriter.Write should return an error. - errorWriter := &mockErrorWriter{errorType: syscall.EINVAL} - bytes := []byte{1, 2, 3} - writer := &ensureWriter{errorWriter} - written, err := writer.Write(bytes) - if err != syscall.EINVAL { - t.Fatalf("ensureWriter should propagate nonretriable errors") - } - if written != 0 { - t.Fatalf("Expected 0 bytes written, got %d", written) - } -} - -func TestEnsureWriter_NoError(t *testing.T) { - // This tests the case where the underlying writer returns with no error, - // but without writing the full buffer. - var bytes []byte = []byte{1, 2, 3} - errorWriter := &mockErrorWriter{errorType: nil} - writer := &ensureWriter{errorWriter} - written, err := writer.Write(bytes) - if err != nil { - t.Fatalf("ensureWriter should only error if the underlying writer does") - } - if written != len(bytes) { - t.Fatalf("Expected %d bytes written, got %d", len(bytes), written) - } -}