-
Notifications
You must be signed in to change notification settings - Fork 479
/
Copy pathsyncing_file.go
188 lines (166 loc) · 5.62 KB
/
syncing_file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package vfs
import (
"sync/atomic"
"github.com/cockroachdb/errors"
)
// SyncingFileOptions holds the options for a syncingFile.
type SyncingFileOptions struct {
NoSyncOnClose bool
BytesPerSync int
PreallocateSize int
}
type syncingFile struct {
File
// fd can be InvalidFd if the underlying File does not support it.
fd uintptr
useSyncRange bool
closing bool
noSyncOnClose bool
bytesPerSync int64
preallocateSize int64
atomic struct {
// The offset at which dirty data has been written.
offset int64
// The offset at which data has been synced. Note that if SyncFileRange is
// being used, the periodic syncing of data during writing will only ever
// sync up to offset-1MB. This is done to avoid rewriting the tail of the
// file multiple times, but has the side effect of ensuring that Close will
// sync the file's metadata.
syncOffset int64
}
preallocatedBlocks int64
syncData func() error
syncTo func(offset int64) error
timeDiskOp func(op func())
}
// NewSyncingFile wraps a writable file and ensures that data is synced
// periodically as it is written. The syncing does not provide persistency
// guarantees for these periodic syncs, but is used to avoid latency spikes if
// the OS automatically decides to write out a large chunk of dirty filesystem
// buffers. The underlying file is fully synced upon close.
func NewSyncingFile(f File, opts SyncingFileOptions) File {
s := &syncingFile{
File: f,
fd: f.Fd(),
noSyncOnClose: bool(opts.NoSyncOnClose),
bytesPerSync: int64(opts.BytesPerSync),
preallocateSize: int64(opts.PreallocateSize),
}
// Ensure a file that is opened and then closed will be synced, even if no
// data has been written to it.
s.atomic.syncOffset = -1
type dhChecker interface {
timeDiskOp(op func())
}
if d, ok := f.(dhChecker); ok {
s.timeDiskOp = d.timeDiskOp
} else {
s.timeDiskOp = func(op func()) {
op()
}
}
s.init()
if s.syncData == nil {
s.syncData = s.File.Sync
}
return s
}
// NB: syncingFile.Write is unsafe for concurrent use!
func (f *syncingFile) Write(p []byte) (n int, err error) {
_ = f.preallocate(atomic.LoadInt64(&f.atomic.offset))
n, err = f.File.Write(p)
if err != nil {
return n, errors.WithStack(err)
}
// The offset is updated atomically so that it can be accessed safely from
// Sync.
atomic.AddInt64(&f.atomic.offset, int64(n))
if err := f.maybeSync(); err != nil {
return 0, err
}
return n, nil
}
func (f *syncingFile) preallocate(offset int64) error {
if f.fd == InvalidFd || f.preallocateSize == 0 {
return nil
}
newPreallocatedBlocks := (offset + f.preallocateSize - 1) / f.preallocateSize
if newPreallocatedBlocks <= f.preallocatedBlocks {
return nil
}
length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks)
offset = f.preallocateSize * f.preallocatedBlocks
f.preallocatedBlocks = newPreallocatedBlocks
return preallocExtend(f.fd, offset, length)
}
func (f *syncingFile) ratchetSyncOffset(offset int64) {
for {
syncOffset := atomic.LoadInt64(&f.atomic.syncOffset)
if syncOffset >= offset {
return
}
if atomic.CompareAndSwapInt64(&f.atomic.syncOffset, syncOffset, offset) {
return
}
}
}
func (f *syncingFile) Sync() error {
// We update syncOffset (atomically) in order to avoid spurious syncs in
// maybeSync. Note that even if syncOffset is larger than the current file
// offset, we still need to call the underlying file's sync for persistence
// guarantees (which are not provided by sync_file_range).
f.ratchetSyncOffset(atomic.LoadInt64(&f.atomic.offset))
return f.syncData()
}
func (f *syncingFile) maybeSync() error {
if f.bytesPerSync <= 0 {
return nil
}
// From the RocksDB source:
//
// We try to avoid sync to the last 1MB of data. For two reasons:
// (1) avoid rewrite the same page that is modified later.
// (2) for older version of OS, write can block while writing out
// the page.
// Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset.
const syncRangeBuffer = 1 << 20 // 1 MB
offset := atomic.LoadInt64(&f.atomic.offset)
if offset <= syncRangeBuffer {
return nil
}
const syncRangeAlignment = 4 << 10 // 4 KB
syncToOffset := offset - syncRangeBuffer
syncToOffset -= syncToOffset % syncRangeAlignment
syncOffset := atomic.LoadInt64(&f.atomic.syncOffset)
if syncToOffset < 0 || (syncToOffset-syncOffset) < f.bytesPerSync {
return nil
}
if f.fd == InvalidFd {
return errors.WithStack(f.Sync())
}
// Note that syncTo will always be called with an offset < atomic.offset. The
// syncTo implementation may choose to sync the entire file (i.e. on OSes
// which do not support syncing a portion of the file). The syncTo
// implementation must call ratchetSyncOffset with as much of the file as it
// has synced.
return errors.WithStack(f.syncTo(syncToOffset))
}
func (f *syncingFile) Close() error {
// Sync any data that has been written but not yet synced unless the file
// has noSyncOnClose option explicitly set.
// Note that if SyncFileRange was used, atomic.syncOffset will be less than
// atomic.offset. See syncingFile.syncToRange.
f.closing = true
if !f.noSyncOnClose || f.useSyncRange {
if atomic.LoadInt64(&f.atomic.offset) > atomic.LoadInt64(&f.atomic.syncOffset) {
if err := f.Sync(); err != nil {
return errors.WithStack(err)
}
}
}
return errors.WithStack(f.File.Close())
}