Skip to content

Commit

Permalink
feat: add abundant log information (#9)
Browse files Browse the repository at this point in the history
Signed-off-by: Qizhi Huang <[email protected]>
  • Loading branch information
huangqz71 authored May 23, 2023
1 parent 38c29a6 commit 89a38f4
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 89 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.vscode
.vscode
logs
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Usage: dataMigrate [flags]
Optional: the retention policy to read (requires -database)
-start string
Optional: the start time to read (RFC3339 format)
-mode string
Optional: whether to enable debug log or not (set as "Debug" to enable it)
-to string
Destination host to write data to (default "127.0.0.1:8086",which is the openGemini service default address)
```
Expand Down
80 changes: 73 additions & 7 deletions src/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,31 @@ type Cursor struct {
}

func (c *Cursor) init() error {
if len(c.seeks) == 0 {
c.buf = nil
c.pos = 0
return nil
}
sort.Sort(ascLocations(c.seeks))
for _, e := range c.seeks {
if e.readMax < e.entry.MinTime-1 {
e.readMax = e.entry.MinTime - 1
}
}
c.readTs = c.seeks[0].readMax
if logger.IsDebug() {
// check the validation of readMax
for i := 1; i < len(c.seeks); i++ {
if c.seeks[i].readMax < c.seeks[i-1].readMax {
logger.LogString("Cursor.init: found readMax not in right order", TOLOGFILE, LEVEL_DEBUG)
}
}
}
var err error
c.buf, err = c.readBlock()
if err != nil {
logger.LogString("Read block failed: "+err.Error(), TOLOGFILE, LEVEL_ERROR)
}
c.pos = 0
return err
}
Expand All @@ -70,6 +86,13 @@ func (c *Cursor) readBlock() (tsm1.Values, error) {
return nil, nil
}

// check the validation of readTs
if logger.IsDebug() {
if c.readTs > c.seeks[0].readMax {
logger.LogString("Cursor.readBlock: readTs > c.seeks[0].readMax", TOLOGFILE, LEVEL_DEBUG)
}
}

var locsToRead []*location
locsToRead = append(locsToRead, c.seeks[0])

Expand All @@ -91,21 +114,44 @@ func (c *Cursor) readBlock() (tsm1.Values, error) {
if len(c.seeks) > len(locsToRead) {
nextRoundStartTs := c.seeks[len(locsToRead)].readMax
if nextRoundStartTs <= upperBound {
upperBound = nextRoundStartTs - 1
upperBound = nextRoundStartTs
}
}

// this should not happen
if upperBound <= c.readTs {
logger.LogString("Cursor.readBlock: found upperBound <= readTs", TOLOGFILE, LEVEL_ERROR)
// resolve the problem
sort.Slice(c.seeks, func(i, j int) bool {
return c.seeks[i].readMax < c.seeks[j].readMax
})
for i := 0; i < len(c.seeks); i++ {
rm := c.seeks[i].readMax
if rm < c.readTs {
logger.LogString("Cursor.readBlock: found readMax < readTs", TOLOGFILE, LEVEL_DEBUG)
c.seeks[i].readMax = c.readTs
rm = c.readTs
}
if rm >= c.et || rm >= c.seeks[i].entry.MaxTime {
c.seeks = append(c.seeks[:i], c.seeks[i+1:]...)
i--
}
}
return c.readBlock()
}

var buf []tsm1.Value

for _, e := range locsToRead {
tombstones := e.r.TombstoneRange(c.key)
values, err := e.r.(*tsm1.TSMReader).ReadAt(&e.entry, nil)
if err != nil {
logger.LogString("Read block failed: "+err.Error(), TOLOGFILE, LEVEL_ERROR)
return nil, err
}
for _, v := range values {
ts := v.UnixNano()
if ts < c.readTs {
if ts <= c.readTs {
continue
}
if ts > upperBound {
Expand All @@ -131,9 +177,12 @@ func (c *Cursor) readBlock() (tsm1.Values, error) {
}

// mark the time range that have been read
tmpReadTs := c.readTs
c.readTs = upperBound

if len(buf) <= 0 {
logger.LogString(fmt.Sprintf("Cursor.readBlock: the buffer is empty with %d locations reading, readTs %d, upperbound %d",
len(locsToRead), tmpReadTs, upperBound), TOLOGFILE, LEVEL_DEBUG)
return c.readBlock()
}

Expand Down Expand Up @@ -176,6 +225,10 @@ func (c *Cursor) next() (tsm1.Value, error) {
// referenced from https://github.com/influxdata/influxdb/tree/v1.8.2/tsdb/engine/tsm1/encoding.gen.go
// function (Values).Deduplicate
func sortAndDeduplicateValues(buf *[]tsm1.Value) []tsm1.Value {
if len(*buf) == 0 {
logger.LogString("sortAndDeduplicateValues: empty buffer", TOLOGFILE, LEVEL_DEBUG)
return nil
}
sort.Slice(*buf, func(i, j int) bool {
return (*buf)[i].UnixNano() < (*buf)[j].UnixNano()
})
Expand All @@ -196,7 +249,7 @@ type Scanner struct {
fields map[string]*Cursor
}

func (s *Scanner) nextPoint() (*client.Point, error) {
func (s *Scanner) nextPoint(cmd *DataMigrateCommand) (*client.Point, error) {
// determine the current timestamp
var curTs int64 = math.MaxInt64
for _, cursor := range s.fields {
Expand Down Expand Up @@ -233,6 +286,16 @@ func (s *Scanner) nextPoint() (*client.Point, error) {
}
}

// statistics
for t := range s.tags {
cmd.stat.tagsRead[s.measurement+t] = struct{}{}
cmd.stat.tagsTotal[s.measurement+t] = struct{}{}
}
for f := range fields {
cmd.stat.fieldsRead[s.measurement+f] = struct{}{}
cmd.stat.fieldTotal[s.measurement+f] = struct{}{}
}

return client.NewPoint(s.measurement, s.tags, fields, time.Unix(0, curTs))
}

Expand All @@ -249,19 +312,21 @@ func (s *Scanner) writeBatches(c client.Client, cmd *DataMigrateCommand) error {
flag = false
}

pt, err := s.nextPoint()
pt, err := s.nextPoint(cmd)

if err != nil {
fmt.Fprintf(cmd.Stdout, "point read error: %v", err)
logger.LogString("point read error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}

if pt == nil {
rowsNum := len(bp.Points())
err := c.Write(bp)
if err != nil {
fmt.Fprintf(cmd.Stdout, "insert error: %v", err)
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}
cmd.stat.rowsRead += rowsNum
break
}

Expand All @@ -270,9 +335,10 @@ func (s *Scanner) writeBatches(c client.Client, cmd *DataMigrateCommand) error {
if count == BATCHSIZE {
err := c.Write(bp)
if err != nil {
fmt.Fprintf(cmd.Stdout, "insert error: %v", err)
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}
cmd.stat.rowsRead += BATCHSIZE
flag = true
count = 0
}
Expand Down
Loading

0 comments on commit 89a38f4

Please sign in to comment.