Skip to content

Commit

Permalink
feat: replication apis durable queue management (#22719)
Browse files Browse the repository at this point in the history
* feat: added durable queue management to replications service

* refactor: improved mapping of replication streams to durable queues

* refactor: modified replication stream durable queues to use user-specified engine path

* chore: generated test mocks for replications DurableQueueManager

* chore: add test coverage for replications durable queue manager

* refactor: made changes based on code review, added mutex to durableQueueManager, improved error logging

* chore: ran make fmt

* refactor: further improvements to error logging
  • Loading branch information
mcfarlm3 authored Oct 26, 2021
1 parent fba7fac commit 8825cd5
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)

replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")))
replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)

Expand Down
5 changes: 5 additions & 0 deletions pkg/durablequeue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ func (l *Queue) TotalBytes() int64 {
return n
}

// Dir returns the directory associated with the queue.
func (l *Queue) Dir() string {
return l.dir
}

// diskUsage returns the total size on disk used by the Queue.
func (l *Queue) diskUsage() int64 {
var size int64
Expand Down
128 changes: 128 additions & 0 deletions replications/internal/queue_management.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package internal

import (
"fmt"
"os"
"path/filepath"
"sync"

"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
"go.uber.org/zap"
)

type durableQueueManager struct {
replicationQueues map[platform.ID]*durablequeue.Queue
logger *zap.Logger
enginePath string
mutex sync.RWMutex
}

// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
//replication streams.
func NewDurableQueueManager(log *zap.Logger, enginePath string) *durableQueueManager {
replicationQueues := make(map[platform.ID]*durablequeue.Queue)

return &durableQueueManager{
replicationQueues: replicationQueues,
logger: log,
enginePath: enginePath,
}
}

// InitializeQueue creates a new durable queue which is associated with a replication stream.
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()

// Check for duplicate replication ID
if _, exists := qm.replicationQueues[replicationID]; exists {
return fmt.Errorf("durable queue already exists for replication ID %q", replicationID)
}

// Set up path for new queue on disk
dir := filepath.Join(
qm.enginePath,
"replicationq",
replicationID.String(),
)
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}

// Create a new durable queue
newQueue, err := durablequeue.NewQueue(
dir,
maxQueueSizeBytes,
durablequeue.DefaultSegmentSize,
&durablequeue.SharedCount{},
durablequeue.MaxWritesPending,
func(bytes []byte) error {
return nil
},
)

if err != nil {
return err
}

// Map new durable queue to its corresponding replication stream via replication ID
qm.replicationQueues[replicationID] = newQueue

// Open the new queue
if err := newQueue.Open(); err != nil {
return err
}

qm.logger.Debug("Created new durable queue for replication stream",
zap.String("id", replicationID.String()), zap.String("path", dir))

return nil
}

// DeleteQueue deletes a durable queue and its associated data on disk.
func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()

if qm.replicationQueues[replicationID] == nil {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}

// Close the queue
if err := qm.replicationQueues[replicationID].Close(); err != nil {
return err
}

qm.logger.Debug("Closed replication stream durable queue",
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))

// Delete any enqueued, un-flushed data on disk for this queue
if err := qm.replicationQueues[replicationID].Remove(); err != nil {
return err
}

qm.logger.Debug("Deleted data associated with replication stream durable queue",
zap.String("id", replicationID.String()), zap.String("path", qm.replicationQueues[replicationID].Dir()))

// Remove entry from replicationQueues map
delete(qm.replicationQueues, replicationID)

return nil
}

// UpdateMaxQueueSize updates the maximum size of the durable queue.
func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error {
qm.mutex.RLock()
defer qm.mutex.RUnlock()

if qm.replicationQueues[replicationID] == nil {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}

if err := qm.replicationQueues[replicationID].SetMaxSize(maxQueueSizeBytes); err != nil {
return err
}

return nil
}
100 changes: 100 additions & 0 deletions replications/internal/queue_management_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package internal

import (
"os"
"path/filepath"
"testing"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

var (
replicationID = platform.ID(1)
maxQueueSizeBytes = 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
)

func TestCreateNewQueueDirExists(t *testing.T) {
t.Parallel()

tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)

require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
}

func TestCreateNewQueueDuplicateID(t *testing.T) {
t.Parallel()

tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)

// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.NoError(t, err)

// Try to initialize another queue with the same replication ID
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"")
}

func TestDeleteQueueDirRemoved(t *testing.T) {
t.Parallel()

tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)

// Create a valid new queue
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)
err = qm.InitializeQueue(replicationID, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))

// Delete queue and make sure its queue has been deleted from disk
err = qm.DeleteQueue(replicationID)
require.NoError(t, err)
require.NoDirExists(t, filepath.Join(tempEnginePath, "replicationq", replicationID.String()))
}

func TestDeleteQueueNonexistentID(t *testing.T) {
t.Parallel()

tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)

// Delete nonexistent queue
err = qm.DeleteQueue(replicationID)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}

func TestUpdateMaxQueueSizeNonexistentID(t *testing.T) {
t.Parallel()

tempEnginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
defer os.RemoveAll(tempEnginePath)

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, tempEnginePath)

// Update nonexistent queue
err = qm.UpdateMaxQueueSize(replicationID, influxdb.DefaultReplicationMaxQueueSizeBytes)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
77 changes: 77 additions & 0 deletions replications/mock/queue_management.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8825cd5

Please sign in to comment.