Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ReadAt() (pread syscall) instead of Read() in seek.go #1664

Merged
merged 6 commits into from
May 25, 2019
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 54 additions & 39 deletions src/dbnode/persist/fs/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ type seeker struct {
indexFd *os.File
indexFileSize int64

shardDir string

unreadBuf []byte

// Bloom filter associated with the shard / block the seeker is responsible
Expand Down Expand Up @@ -153,17 +151,17 @@ func (s *seeker) Open(
return errClonesShouldNotBeOpened
}

s.shardDir = ShardDataDirPath(s.opts.filePathPrefix, namespace, shard)
shardDir := ShardDataDirPath(s.opts.filePathPrefix, namespace, shard)
var infoFd, digestFd, bloomFilterFd, summariesFd *os.File

// Open necessary files
if err := openFiles(os.Open, map[string]**os.File{
filesetPathFromTime(s.shardDir, blockStart, infoFileSuffix): &infoFd,
filesetPathFromTime(s.shardDir, blockStart, indexFileSuffix): &s.indexFd,
filesetPathFromTime(s.shardDir, blockStart, dataFileSuffix): &s.dataFd,
filesetPathFromTime(s.shardDir, blockStart, digestFileSuffix): &digestFd,
filesetPathFromTime(s.shardDir, blockStart, bloomFilterFileSuffix): &bloomFilterFd,
filesetPathFromTime(s.shardDir, blockStart, summariesFileSuffix): &summariesFd,
filesetPathFromTime(shardDir, blockStart, infoFileSuffix): &infoFd,
filesetPathFromTime(shardDir, blockStart, indexFileSuffix): &s.indexFd,
filesetPathFromTime(shardDir, blockStart, dataFileSuffix): &s.dataFd,
filesetPathFromTime(shardDir, blockStart, digestFileSuffix): &digestFd,
filesetPathFromTime(shardDir, blockStart, bloomFilterFileSuffix): &bloomFilterFd,
filesetPathFromTime(shardDir, blockStart, summariesFileSuffix): &summariesFd,
}); err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +219,7 @@ func (s *seeker) Open(
s.Close()
return fmt.Errorf(
"index file digest for file: %s does not match the expected digest: %c",
filesetPathFromTime(s.shardDir, blockStart, indexFileSuffix), err,
filesetPathFromTime(shardDir, blockStart, indexFileSuffix), err,
)
}

Expand Down Expand Up @@ -317,13 +315,8 @@ func (s *seeker) SeekByIndexEntry(
entry IndexEntry,
resources ReusableSeekerResources,
) (checked.Bytes, error) {
newOffset, err := s.dataFd.Seek(entry.Offset, os.SEEK_SET)
if err != nil {
return nil, err
}
if newOffset != entry.Offset {
return nil, fmt.Errorf("tried to seek to: %d, but seeked to: %d", entry.Offset, newOffset)
}
resources.offsetFileReader.reset(s.dataFd)
resources.offsetFileReader.setOffset(entry.Offset)

// Obtain an appropriately sized buffer.
var buffer checked.Bytes
Expand All @@ -340,7 +333,7 @@ func (s *seeker) SeekByIndexEntry(

// Copy the actual data into the underlying buffer.
underlyingBuf := buffer.Bytes()
n, err := io.ReadFull(s.dataFd, underlyingBuf)
n, err := io.ReadFull(resources.offsetFileReader, underlyingBuf)
if err != nil {
return nil, err
}
Expand All @@ -364,10 +357,11 @@ func (s *seeker) SeekByIndexEntry(
//
// 1. Go to the indexLookup and it will give us an offset that is a good starting
// point for scanning the index file.
// 2. Seek to the position that the indexLookup gave us.
// 3. Reset a decoder with fileDecoderStream (fd wrapped in a bufio.Reader).
// 2. Reset an offsetFileReader with the index fd and an offset (so that calls to Read() will
// begin at the offset provided by the offset lookup).
// 3. Reset a decoder with fileDecoderStream (offsetFileReader wrapped in a bufio.Reader).
// 4. Called DecodeIndexEntry in a tight loop (which will advance our position in the
// file internally) until we've either found the entry we're looking for or gone so
// offsetFileReader internally) until we've either found the entry we're looking for or gone so
// far we know it does not exist.
func (s *seeker) SeekIndexEntry(
id ident.ID,
Expand All @@ -380,16 +374,9 @@ func (s *seeker) SeekIndexEntry(
return IndexEntry{}, err
}

seekedOffset, err := s.indexFd.Seek(offset, os.SEEK_SET)
if err != nil {
return IndexEntry{}, err
}
if seekedOffset != offset {
return IndexEntry{}, instrument.InvariantErrorf(
"tried to seek to offset: %d, but seeked to: %d", seekedOffset, offset)
}

resources.fileDecoderStream.Reset(s.indexFd)
resources.offsetFileReader.reset(s.indexFd)
resources.offsetFileReader.setOffset(offset)
resources.fileDecoderStream.Reset(resources.offsetFileReader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh here's where I mean it'd be easier to avoid reusing the offsetFileReader and just alloc a new one. don't think it's going to cause any performance issues either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are users I see doing tens of thousands of reads per second, might be worth continuing to keep per operation allocation low/zero where possible.

resources.xmsgpackDecoder.Reset(resources.fileDecoderStream)

idBytes := id.Bytes()
Expand Down Expand Up @@ -466,6 +453,7 @@ func (s *seeker) Close() error {
if s.isClone {
return nil
}

multiErr := xerrors.NewMultiError()
if s.bloomFilter != nil {
multiErr = multiErr.Add(s.bloomFilter.Close())
Expand Down Expand Up @@ -501,15 +489,11 @@ func (s *seeker) ConcurrentClone() (ConcurrentDataFileSetSeeker, error) {
bloomFilter: s.bloomFilter,
indexLookup: indexLookupClone,
isClone: true,
}

// File descriptors are not concurrency safe since they have an internal
// seek position.
if err := openFiles(os.Open, map[string]**os.File{
filesetPathFromTime(s.shardDir, s.start.ToTime(), indexFileSuffix): &seeker.indexFd,
filesetPathFromTime(s.shardDir, s.start.ToTime(), dataFileSuffix): &seeker.dataFd,
}); err != nil {
return nil, err
// Index and data fd's are always accessed via the ReadAt() / pread APIs so
// they are concurrency safe and can be shared among clones.
indexFd: s.indexFd,
dataFd: s.dataFd,
}

return seeker, nil
Expand Down Expand Up @@ -542,6 +526,7 @@ type ReusableSeekerResources struct {
xmsgpackDecoder *xmsgpack.Decoder
fileDecoderStream *bufio.Reader
byteDecoderStream xmsgpack.ByteDecoderStream
offsetFileReader *offsetFileReader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two questions:
(1 Does this need to be in this struct? allocating the 16 byte struct per read request isn't going to cost us anything (esp compared to the syscalls you've gotten rid of).
(2) What's left to make a regular Seeker concurrency safe? Would be nice to get rid of the Concurrent...Seeker if we could

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: putting it in the reusable seeker resources. Yeah it doesn't cost much to allocate but we already have this pattern setup so its pretty easy to hook into it. Would rather avoid the alloc if we can.

// This pool should only be used for calling DecodeIndexEntry. We use a
// special pool here to avoid the overhead of channel synchronization, as
// well as ref counting that comes with the checked bytes pool. In addition,
Expand All @@ -558,6 +543,7 @@ func NewReusableSeekerResources(opts Options) ReusableSeekerResources {
xmsgpackDecoder: xmsgpack.NewDecoder(opts.DecodingOptions()),
fileDecoderStream: bufio.NewReaderSize(nil, seekReaderSize),
byteDecoderStream: xmsgpack.NewByteDecoderStream(nil),
offsetFileReader: newOffsetFileReader(),
decodeIndexEntryBytesPool: newSimpleBytesPool(),
}
}
Expand Down Expand Up @@ -610,3 +596,32 @@ func (s *simpleBytesPool) Put(b []byte) {

s.pool = append(s.pool, b[:])
}

// offsetFileReader implements io.Reader() and allows an *os.File to be wrapped
// such that any calls to Read() are issued at the provided offset. This is used
// to issue reads to specific portions of the index and data files without having
// to first call Seek(). This reduces the number of syscalls that need to be made
// and also allows the fds to be shared among concurrent goroutines since the
// internal F.D offset managed by the kernel is not being used.
type offsetFileReader struct {
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
fd *os.File
offset int64
}

func newOffsetFileReader() *offsetFileReader {
return &offsetFileReader{}
}

func (p *offsetFileReader) Read(b []byte) (n int, err error) {
n, err = p.fd.ReadAt(b, p.offset)
p.offset += int64(n)
return n, err
}

func (p *offsetFileReader) reset(fd *os.File) {
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
p.fd = fd
}

func (p *offsetFileReader) setOffset(offset int64) {
p.offset = offset
}