diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index cef94676..f8b1be36 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -42,6 +42,7 @@ func init() { Cmd.Flags().StringVar(&conf.DataDir, "data-dir", "./data/db", "Directory where to store data") Cmd.Flags().StringVar(&conf.WalDir, "wal-dir", "./data/wal", "Directory for write-ahead-logs") Cmd.Flags().DurationVar(&conf.WalRetentionTime, "wal-retention-time", 1*time.Hour, "Retention time for the entries in the write-ahead-log") + Cmd.Flags().BoolVar(&conf.WalSyncData, "wal-sync-data", true, "Whether to sync data in write-ahead-log") Cmd.Flags().Int64Var(&conf.DbBlockCacheMB, "db-cache-size-mb", kv.DefaultKVFactoryOptions.CacheSizeMB, "Max size of the shared DB cache") } diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go index 7d48c9e9..8af4852d 100644 --- a/cmd/standalone/cmd.go +++ b/cmd/standalone/cmd.go @@ -42,6 +42,7 @@ func init() { Cmd.Flags().StringVar(&conf.DataDir, "data-dir", "./data/db", "Directory where to store data") Cmd.Flags().StringVar(&conf.WalDir, "wal-dir", "./data/wal", "Directory for write-ahead-logs") Cmd.Flags().DurationVar(&conf.WalRetentionTime, "wal-retention-time", 1*time.Hour, "Retention time for the entries in the write-ahead-log") + Cmd.Flags().BoolVar(&conf.WalSyncData, "wal-sync-data", true, "Whether to sync data in write-ahead-log") Cmd.Flags().DurationVar(&conf.NotificationsRetentionTime, "notifications-retention-time", 1*time.Hour, "Retention time for the db notifications to clients") Cmd.Flags().Int64Var(&conf.DbBlockCacheMB, "db-cache-size-mb", kv.DefaultKVFactoryOptions.CacheSizeMB, "Max size of the shared DB cache") diff --git a/server/server.go b/server/server.go index c699cb83..74b9cd9f 100644 --- a/server/server.go +++ b/server/server.go @@ -33,6 +33,7 @@ type Config struct { WalDir string WalRetentionTime time.Duration + WalSyncData bool NotificationsRetentionTime time.Duration DbBlockCacheMB int64 @@ -72,7 +73,10 @@ func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replica s := &Server{ replicationRpcProvider: replicationRpcProvider, walFactory: wal.NewWalFactory(&wal.WalFactoryOptions{ - BaseWalDir: config.WalDir, + BaseWalDir: config.WalDir, + Retention: config.WalRetentionTime, + SegmentSize: wal.DefaultWalFactoryOptions.SegmentSize, + SyncData: true, }), kvFactory: kvFactory, healthServer: health.NewServer(), diff --git a/server/standalone.go b/server/standalone.go index ea9916d5..e45a0c8e 100644 --- a/server/standalone.go +++ b/server/standalone.go @@ -74,8 +74,10 @@ func NewStandalone(config StandaloneConfig) (*Standalone, error) { kvOptions := kv.KVFactoryOptions{DataDir: config.DataDir} s.walFactory = wal.NewWalFactory(&wal.WalFactoryOptions{ - BaseWalDir: config.WalDir, - Retention: config.WalRetentionTime, + BaseWalDir: config.WalDir, + Retention: config.WalRetentionTime, + SegmentSize: wal.DefaultWalFactoryOptions.SegmentSize, + SyncData: config.WalSyncData, }) var err error if s.kvFactory, err = kv.NewPebbleKVFactory(&kvOptions); err != nil { diff --git a/server/wal/wal.go b/server/wal/wal.go index 23747d7e..235cc4c9 100644 --- a/server/wal/wal.go +++ b/server/wal/wal.go @@ -36,12 +36,14 @@ type WalFactoryOptions struct { BaseWalDir string Retention time.Duration SegmentSize int32 + SyncData bool } var DefaultWalFactoryOptions = &WalFactoryOptions{ BaseWalDir: "data/wal", Retention: 1 * time.Hour, SegmentSize: 64 * 1024 * 1024, + SyncData: true, } type WalFactory interface { diff --git a/server/wal/wal_impl.go b/server/wal/wal_impl.go index 51e819fc..06593a5b 100644 --- a/server/wal/wal_impl.go +++ b/server/wal/wal_impl.go @@ -59,6 +59,7 @@ type wal struct { shard int64 firstOffset atomic.Int64 segmentSize uint32 + syncData bool currentSegment ReadWriteSegment readOnlySegments ReadOnlySegmentsGroup @@ -104,6 +105,7 @@ func newWal(namespace string, shard int64, options *WalFactoryOptions, commitOff namespace: namespace, shard: shard, segmentSize: uint32(options.SegmentSize), + syncData: options.SyncData, appendLatency: metrics.NewLatencyHistogram("oxia_server_wal_append_latency", "The time it takes to append entries to the WAL", labels), @@ -143,11 +145,13 @@ func newWal(namespace string, shard int64, options *WalFactoryOptions, commitOff w.trimmer = newTrimmer(namespace, shard, w, options.Retention, trimmerCheckInterval, clock, commitOffsetProvider) - go common.DoWithLabels(map[string]string{ - "oxia": "wal-sync", - "namespace": namespace, - "shard": fmt.Sprintf("%d", shard), - }, w.runSync) + if options.SyncData { + go common.DoWithLabels(map[string]string{ + "oxia": "wal-sync", + "namespace": namespace, + "shard": fmt.Sprintf("%d", shard), + }, w.runSync) + } return w, nil } @@ -342,6 +346,11 @@ func (t *wal) runSync() { } func (t *wal) Sync(ctx context.Context) error { + if !t.syncData { + t.lastSyncedOffset.Store(t.lastAppendedOffset.Load()) + return nil + } + t.Lock() defer t.Unlock()