Skip to content

Commit

Permalink
Merge pull request #3 from xiangyu5632/main
Browse files Browse the repository at this point in the history
fix: dataMigrate Performance optimization

1. create a link once.
2. empty batchpoints after the data is writen successfully

Signed-off-by: opengemini_admin <[email protected]>
  • Loading branch information
xiangyu5632 authored Apr 12, 2023
2 parents 7d1c034 + 5b9ef12 commit f4bdc3d
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions src/dataMigrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,9 @@ func (cmd * DataMigrateCommand) formatespace (buf string) string {
return b.String()
}

func (cmd *DataMigrateCommand) writeValues(seriesKey []byte, field string, values []tsm1.Value) error {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://"+cmd.out,
})
if err != nil {
fmt.Println("Error creating openGemini Client: ", err.Error())
}
defer c.Close()

func (cmd *DataMigrateCommand) writeValues(c client.Client, seriesKey []byte, field string, values []tsm1.Value) error {
sk := string(seriesKey)
//fmt.Println(sk,field)
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: cmd.database,
Precision: "ns",
})

tags := map[string]string{}
vs := strings.Split(sk,",")
Expand All @@ -225,27 +213,47 @@ func (cmd *DataMigrateCommand) writeValues(seriesKey []byte, field string, value
fields := map[string]interface{}{}
count := 0;
all := len(values)
var bp client.BatchPoints
flag := true
for i, value := range values {
if flag {
bp, _ = client.NewBatchPoints(client.BatchPointsConfig{
Database: cmd.database,
Precision: "ns",
})
flag = false
}
ts := value.UnixNano()
if (ts < cmd.startTime) || (ts > cmd.endTime) {
if (ts < cmd.startTime) {
continue
}
if ts > cmd.endTime {
if count != 0 {
err := c.Write(bp)
if err != nil {
fmt.Fprintf(cmd.Stdout,"insert error: %v",err)
return err
}
}
break
}
fields[field]=value.Value()
pt, err := client.NewPoint(measurement, tags, fields, time.Unix(0,ts))
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
count = count + 1
if( count == BATCHSIZE ){
if count == BATCHSIZE {
err := c.Write(bp)
if err != nil {
fmt.Fprintf(cmd.Stdout,"insert error: %v",err)
return err
}
flag = true
count = 0
}
if i == all-1 {
if i == all-1 && count != 0 {
err := c.Write(bp)
if err != nil {
fmt.Fprintf(cmd.Stdout,"insert error: %v",err)
Expand Down Expand Up @@ -279,6 +287,14 @@ func (cmd *DataMigrateCommand) ReadTSMFile(tsmFilePath string) error {
return nil
}

c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://"+cmd.out,
})
if err != nil {
fmt.Println("Error creating openGemini Client: ", err.Error())
}
defer c.Close()

for i := 0; i < r.KeyCount(); i++ {
key, _ := r.KeyAt(i)
values, err := r.ReadAll(key)
Expand All @@ -289,7 +305,7 @@ func (cmd *DataMigrateCommand) ReadTSMFile(tsmFilePath string) error {
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
field = escape.Bytes(field)

if err := cmd.writeValues( measurement, string(field), values); err != nil {
if err := cmd.writeValues(c, measurement, string(field), values); err != nil {
// An error from writeValues indicates an IO error, which should be returned.
return err
}
Expand Down

0 comments on commit f4bdc3d

Please sign in to comment.