Skip to content

Commit

Permalink
fix: resolved the problem of cursor reading an empty block causing pa…
Browse files Browse the repository at this point in the history
…nic (#8)

Signed-off-by: Qizhi Huang <[email protected]>
  • Loading branch information
huangqz71 authored May 19, 2023
1 parent 74cca5b commit 38c29a6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ Usage: dataMigrate [flags]
-to string
Destination host to write data to (default "127.0.0.1:8086",which is the openGemini service default address)
```

**Notice**: When using this tool, please do not migrate data without shutting down InfluxDB if possible; otherwise, some unknown problems may occur. To ensure that data is as complete as possible after migration, keep the empty write load running before shutting down InfluxDB and wait for data in the cache to complete disk dumping (10 minutes by default).

**Welcome to add more features.**
7 changes: 6 additions & 1 deletion src/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Cursor) init() error {

func (c *Cursor) readBlock() (tsm1.Values, error) {
// No matching blocks to decode
if len(c.seeks) == 0 || c.readTs == c.et {
if len(c.seeks) == 0 || c.readTs >= c.et {
return nil, nil
}

Expand Down Expand Up @@ -132,6 +132,11 @@ func (c *Cursor) readBlock() (tsm1.Values, error) {

// mark the time range that have been read
c.readTs = upperBound

if len(buf) <= 0 {
return c.readBlock()
}

return sortAndDeduplicateValues(&buf), nil
}

Expand Down
8 changes: 6 additions & 2 deletions src/dataMigrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (cmd *DataMigrateCommand) writeCurrentFiles() error {
et: cmd.endTime,
readTs: cmd.startTime,
key: key,
seeks: cmd.locations(key, cmd.startTime),
seeks: cmd.locations(key, cmd.startTime, cmd.endTime),
}
if err := newCursor.init(); err != nil {
return err
Expand All @@ -287,7 +287,7 @@ func (cmd *DataMigrateCommand) writeCurrentFiles() error {
}

// Referenced from the implementation of InfluxDB
func (cmd *DataMigrateCommand) locations(key []byte, st int64) []*location {
func (cmd *DataMigrateCommand) locations(key []byte, st int64, et int64) []*location {
var cache []tsm1.IndexEntry
var locations []*location
for _, fd := range cmd.files {
Expand All @@ -313,6 +313,10 @@ func (cmd *DataMigrateCommand) locations(key []byte, st int64) []*location {
continue
}

if ie.MinTime > et {
continue
}

location := &location{
r: fd,
entry: ie,
Expand Down

0 comments on commit 38c29a6

Please sign in to comment.