diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 7dc1cd557b6..f22ef7be4ac 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -30,6 +30,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292] - Python 3 is required now to run python tests and tools. {pull}14798[14798] - The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667] +- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693] - `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691] ==== Bugfixes diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go index a98c063eb48..44e49b2a8cf 100644 --- a/libbeat/publisher/queue/spool/module.go +++ b/libbeat/publisher/queue/spool/module.go @@ -64,7 +64,7 @@ func create( log = defaultLogger() } - return NewSpool(log, path, Settings{ + return newDiskSpool(log, path, settings{ ACKListener: ackListener, Mode: config.File.Permissions, WriteBuffer: uint(config.Write.BufferSize), diff --git a/libbeat/publisher/queue/spool/spool.go b/libbeat/publisher/queue/spool/spool.go index bc1ee43b831..1a920e622b7 100644 --- a/libbeat/publisher/queue/spool/spool.go +++ b/libbeat/publisher/queue/spool/spool.go @@ -32,8 +32,8 @@ import ( "github.com/elastic/go-txfile/pq" ) -// Spool implements an on-disk queue.Queue. -type Spool struct { +// diskSpool implements an on-disk queue.Queue. +type diskSpool struct { // producer/input support inCtx *spoolCtx inBroker *inBroker @@ -53,8 +53,8 @@ type spoolCtx struct { done chan struct{} } -// Settings configure a new spool to be created. -type Settings struct { +// settings configure a new spool to be created. +type settings struct { Mode os.FileMode File txfile.Options @@ -76,8 +76,8 @@ type Settings struct { const minInFlushTimeout = 100 * time.Millisecond const minOutFlushTimeout = 0 * time.Millisecond -// NewSpool creates and initializes a new file based queue. -func NewSpool(logger logger, path string, settings Settings) (*Spool, error) { +// newDiskSpool creates and initializes a new file based queue. +func newDiskSpool(logger logger, path string, settings settings) (*diskSpool, error) { mode := settings.Mode if mode == 0 { mode = os.ModePerm @@ -115,7 +115,7 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) { return nil, err } - spool := &Spool{ + spool := &diskSpool{ inCtx: inCtx, outCtx: outCtx, } @@ -159,7 +159,7 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) { } // Close shuts down the queue and closes the used file. -func (s *Spool) Close() error { +func (s *diskSpool) Close() error { // stop all workers (waits for all workers to be finished) s.outCtx.Close() s.inCtx.Close() @@ -174,17 +174,17 @@ func (s *Spool) Close() error { } // BufferConfig returns the queue initial buffer settings. -func (s *Spool) BufferConfig() queue.BufferConfig { +func (s *diskSpool) BufferConfig() queue.BufferConfig { return queue.BufferConfig{Events: -1} } // Producer creates a new queue producer for publishing events. -func (s *Spool) Producer(cfg queue.ProducerConfig) queue.Producer { +func (s *diskSpool) Producer(cfg queue.ProducerConfig) queue.Producer { return s.inBroker.Producer(cfg) } // Consumer creates a new queue consumer for consuming and acking events. -func (s *Spool) Consumer() queue.Consumer { +func (s *diskSpool) Consumer() queue.Consumer { return s.outBroker.Consumer() } @@ -192,7 +192,7 @@ func (s *Spool) Consumer() queue.Consumer { // Flush events are forwarded to all workers. // The onFlush callback is directly called by the queue writer (same go-routine) // on Write or Flush operations. -func (s *Spool) onFlush(n uint) { +func (s *diskSpool) onFlush(n uint) { s.inBroker.onFlush(n) s.outBroker.onFlush(n) } @@ -200,7 +200,7 @@ func (s *Spool) onFlush(n uint) { // onACK is run whenever the queue signals events being acked and removed from // the queue. // ACK events are forwarded to all workers. -func (s *Spool) onACK(events, pages uint) { +func (s *diskSpool) onACK(events, pages uint) { s.inBroker.onACK(events, pages) } diff --git a/libbeat/publisher/queue/spool/spool_test.go b/libbeat/publisher/queue/spool/spool_test.go index 8a8b466c2a1..b5947152d9a 100644 --- a/libbeat/publisher/queue/spool/spool_test.go +++ b/libbeat/publisher/queue/spool/spool_test.go @@ -36,7 +36,7 @@ var seed int64 var debug bool type testQueue struct { - *Spool + *diskSpool teardown func() } @@ -104,7 +104,7 @@ func makeTestQueue( logger = new(silentLogger) } - spool, err := NewSpool(logger, path, Settings{ + spool, err := newDiskSpool(logger, path, settings{ WriteBuffer: writeBuffer, WriteFlushTimeout: flushTimeout, Codec: codecCBORL, @@ -119,13 +119,13 @@ func makeTestQueue( t.Fatal(err) } - tq := &testQueue{Spool: spool, teardown: cleanPath} + tq := &testQueue{diskSpool: spool, teardown: cleanPath} return tq } } func (t *testQueue) Close() error { - err := t.Spool.Close() + err := t.diskSpool.Close() t.teardown() return err }