Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add aof_writer cmd_writer json_writer #914

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ func main() {
// create writer
var theWriter writer.Writer
switch {
case v.IsSet("file_writer"):
opts := new(writer.FileWriterOptions)
defaults.SetDefaults(opts)
err := v.UnmarshalKey("file_writer", opts)
if err != nil {
log.Panicf("failed to read the FileWriter config entry. err: %v", err)
}
theWriter = writer.NewFileWriter(ctx, opts)
case v.IsSet("redis_writer"):
opts := new(writer.RedisWriterOptions)
defaults.SetDefaults(opts)
Expand Down
58 changes: 58 additions & 0 deletions docs/src/en/writer/file_writer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# file_writer

## Introduction

Can use ` file_writer ` to write data to file with type CMD/JSON/AOF .
It is commonly used to extract/migrate/fix data by file.

## configuration

```toml
[file_writer]
filepath = "/tmp/cmd.txt"
type = "cmd" #cmd,aof,json (default cmd)
```

* An absolute filepath should be passed in.
## application scenarios
- share data between two system: one system write aof to disk/s3/oss, another system read file from them.
- partial migrate data with business prefix: extract aof with prefix "XXX:" data from A system, B system import the aof with command `redis-cli --pipe XXX.aof` .
- fix data by cmd file: export cmd data from one system, fix wrong data, and then import cmd file with command `redis-cli < cmd.txt`.
- analysis data with json: export json file, and then import them into mongodb/bi to analysis.

## example output:
### cmd_writer output:
```
SELECT 0
set key1 1
set key2 2
set key3 3
sadd key4 1 2 3 4
lpush key5 1 2 3 4 5
zadd key6 1 2 3 4 5 6
```
### json_writer output:
```
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
```
### aof_writer output:
```
*2
$6
SELECT
$1
0
*3
$3
set
$4
key1
$1
1
```
57 changes: 57 additions & 0 deletions docs/src/zh/writer/file_writer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# file_writer

## 介绍

可以使用 ` file_writer` 写文件, 可写的格式有 CMD/JSON/AOF, 常用于通过文件介质抽取/迁移/订正数据.
## 配置

```toml
[file_writer]
filepath = "/tmp/cmd.txt"
type = "cmd" #cmd,aof,json (default cmd)
```

* 绝对路径 filepath 是必填的.

## 应用场景
- 俩系统共享数据: 一个系统把文件写到 disk/s3/oss, 另一系统从中读取.
- 跨系统局部迁移带指定前缀的数据: 从A系统迁出带前缀"XXX:"的数据, B系统通过命令导入这些数据 `redis-cli --pipe XXX.aof` .
- 通过命令文件订正数据: 从一个系统中导出数据成cmd格式, 订正后再导入命令`redis-cli < cmd.txt`.
- 通过json格式做数据分析: 导出成json文件, 导入到mongodb/bi做分析.

## 示例输出
### cmd_writer 输出:
```
SELECT 0
set key1 1
set key2 2
set key3 3
sadd key4 1 2 3 4
lpush key5 1 2 3 4 5
zadd key6 1 2 3 4 5 6
```
### json_writer 输出:
```
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
```
### aof_writer 输出:
```
*2
$6
SELECT
$1
0
*3
$3
set
$4
key1
$1
1
```
129 changes: 129 additions & 0 deletions internal/writer/file_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package writer

import (
"RedisShake/internal/entry"
"RedisShake/internal/log"
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
)

type FileType string

const (
AOF FileType = "aof"
CMD FileType = "cmd"
JSON FileType = "json"
)

var FileTypes = []FileType{CMD, AOF, JSON}

type FileWriterOptions struct {
Filepath string `mapstructure:"filepath" default:""`
FileType string `mapstructure:"type" default:"cmd"`
}

type fileWriter struct {
fileType FileType
path string
DbId int
ch chan *entry.Entry
chWg sync.WaitGroup
stat struct {
EntryCount int `json:"entry_count"`
}
}

func (w *fileWriter) Write(e *entry.Entry) {
w.ch <- e
}

func (w *fileWriter) Close() {
close(w.ch)
w.chWg.Wait()
}

func (w *fileWriter) Status() interface{} {
return w.stat
}

func (w *fileWriter) StatusString() string {
return fmt.Sprintf("exported entry count=%d", w.stat.EntryCount)
}

func (w *fileWriter) StatusConsistent() bool {
return true
}

func NewFileWriter(ctx context.Context, opts *FileWriterOptions) Writer {
absolutePath, err := filepath.Abs(opts.Filepath)
if err != nil {
log.Panicf("NewFileWriter path=[%s]: filepath.Abs error: %s", opts.Filepath, err.Error())
}
log.Infof("NewFileWriter absolute path=[%s],type=[%s]", absolutePath, opts.FileType)
w := &fileWriter{
fileType: FileType(opts.FileType),
DbId: 0,
path: absolutePath,
ch: make(chan *entry.Entry),
}
w.stat.EntryCount = 0
return w
}

func (w *fileWriter) StartWrite(ctx context.Context) (ch chan *entry.Entry) {
w.chWg = sync.WaitGroup{}
w.chWg.Add(1)
go w.processWrite(ctx)
return w.ch

}

func (w *fileWriter) processWrite(ctx context.Context) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
file, err := os.Create(w.path)
if err != nil {
log.Panicf("create file failed:", err)
return
}
defer file.Close()
writer := bufio.NewWriter(file)
for {
select {
case <-ctx.Done():
// do nothing until w.ch is closed
case <-ticker.C:
writer.Flush()
case e, ok := <-w.ch:
if !ok {
w.chWg.Done()
writer.Flush()
return
}
w.stat.EntryCount++
w.writeEntry(writer, e)
}
}
}

func (w *fileWriter) writeEntry(writer *bufio.Writer, e *entry.Entry) {
switch w.fileType {
case CMD:
writer.WriteString(strings.Join(e.Argv, " ") + "\n")
case AOF:
writer.Write(e.Serialize())
case JSON:
// compute SerializeSize for json result
e.Serialize()
json, _ := json.Marshal(e)
writer.Write(json)
writer.WriteString("\n")
}
}
4 changes: 4 additions & 0 deletions shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ password = "" # keep empty if no authentication is required
tls = false
off_reply = false # turn off the server reply

# [file_writer]
# filepath = "/tmp/cmd.txt"
# type = "cmd" #cmd,aof,json (default cmd)

[filter]
# Allow keys with specific prefixes or suffixes
# Examples:
Expand Down