Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tools] Allow reading all the shards in the directory in read_data_files #3857

Merged
merged 2 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/cmd/tools/read_data_files/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@
$ git clone [email protected]:m3db/m3.git
$ make read_data_files
$ ./bin/read_data_files
Usage: read_data_files [-b value] [-n value] [-p value] [-s value] [parameters ...]
Usage: read_data_files [-B value] [-b value] [-f value] [-n value] [-p value] [-s value] [-t value] [-v value] [parameters ...]
-B, --benchmark=value
benchmark mode (optional), [series|datapoints]
-b, --block-start=value
Block Start Time [in nsec]
Block Start Time [in nsec]
-f, --id-filter=value
ID Contains Filter [e.g. xyz]
ID Contains Filter (optional)
-n, --namespace=value
Namespace [e.g. metrics]
Namespace [e.g. metrics]
-p, --path-prefix=value
Path prefix [e.g. /var/lib/m3db]
-s, --shard=value
Shard [expected format uint32]
Path prefix [e.g. /var/lib/m3db]
-s, --shard=value Shard [expected format uint32], or -1 for all shards in the
directory
-t, --fileset-type=value
flush|snapshot
-v, --volume=value
Volume number

# example usage
# read_data_files -b1480960800000000000 -n metrics -p /var/lib/m3db -s 451 -f 'metric-name' > /tmp/sample-data.out
```

# TBH
- The tool outputs the identifiers to `stdout`, remember to redirect as desired.
- The code currently assumes the data layout under the hood is `<path-prefix>/data/<namespace>/<shard>/...<block-start>-[index|...].db`. If this is not the file structure under the hood, replicate it to use this tool. Remember to copy checkpoint files along with each index file.
- The code currently assumes the data layout under the hood is `<path-prefix>/data/<namespace>/<shard>/...<block-start>-[index|...].db`. If this is not the file structure under the hood, replicate it to use this tool. Remember to copy checkpoint files along with each index file.
190 changes: 118 additions & 72 deletions src/cmd/tools/read_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"time"

Expand All @@ -34,6 +36,7 @@ import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/x/xio"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"
Expand All @@ -45,6 +48,8 @@ import (
const (
snapshotType = "snapshot"
flushType = "flush"

allShards = -1
)

type benchmarkMode uint8
Expand All @@ -62,9 +67,10 @@ const (

func main() {
var (
optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]")
optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]")
optShard = getopt.Uint32Long("shard", 's', 0, "Shard [expected format uint32]")
optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]")
optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]")
optShard = getopt.IntLong("shard", 's', allShards,
fmt.Sprintf("Shard [expected format uint32], or %v for all shards in the directory", allShards))
optBlockstart = getopt.Int64Long("block-start", 'b', 0, "Block Start Time [in nsec]")
volume = getopt.Int64Long("volume", 'v', 0, "Volume number")
fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType))
Expand All @@ -82,7 +88,7 @@ func main() {

if *optPathPrefix == "" ||
*optNamespace == "" ||
*optShard < 0 ||
*optShard < allShards ||
*optBlockstart <= 0 ||
*volume < 0 ||
(*fileSetTypeArg != snapshotType && *fileSetTypeArg != flushType) {
Expand Down Expand Up @@ -117,101 +123,141 @@ func main() {

fsOpts := fs.NewOptions().SetFilePathPrefix(*optPathPrefix)

var (
seriesCount = 0
datapointCount = 0
annotationSizeTotal uint64
start = time.Now()
)
shards := []uint32{uint32(*optShard)}
if *optShard == allShards {
shards, err = getShards(*optPathPrefix, fileSetType, *optNamespace)
if err != nil {
log.Fatalf("failed to resolve shards: %v", err)
}
}

reader, err := fs.NewReader(bytesPool, fsOpts)
if err != nil {
log.Fatalf("could not create new reader: %v", err)
}

openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: ident.StringID(*optNamespace),
Shard: *optShard,
BlockStart: xtime.UnixNano(*optBlockstart),
VolumeIndex: int(*volume),
},
FileSetType: fileSetType,
StreamingEnabled: true,
}

err = reader.Open(openOpts)
if err != nil {
log.Fatalf("unable to open reader: %v", err)
}
for _, shard := range shards {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var (
seriesCount = 0
datapointCount = 0
annotationSizeTotal uint64
start = time.Now()
)

for {
entry, err := reader.StreamingRead()
if err == io.EOF {
break
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: ident.StringID(*optNamespace),
Shard: shard,
BlockStart: xtime.UnixNano(*optBlockstart),
VolumeIndex: int(*volume),
},
FileSetType: fileSetType,
StreamingEnabled: true,
}

err = reader.Open(openOpts)
if err != nil {
log.Fatalf("err reading metadata: %v", err)
log.Fatalf("unable to open reader for shard %v: %v", shard, err)
}

var (
id = entry.ID
data = entry.Data
)
for {
entry, err := reader.StreamingRead()
if xerrors.Is(err, io.EOF) {
break
}
if err != nil {
log.Fatalf("err reading metadata: %v", err)
}

if *idFilter != "" && !strings.Contains(id.String(), *idFilter) {
continue
}
var (
id = entry.ID
data = entry.Data
)

if *idFilter != "" && !strings.Contains(id.String(), *idFilter) {
continue
}

if benchMode != benchmarkSeries {
iter := m3tsz.NewReaderIterator(xio.NewBytesReader64(data), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp) // nolint: forbidigo
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", // nolint: forbidigo
base64.StdEncoding.EncodeToString(annotation))
if benchMode != benchmarkSeries {
iter := m3tsz.NewReaderIterator(xio.NewBytesReader64(data), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
// Use fmt package so it goes to stdout instead of stderr
fmt.Printf("{id: %s, dp: %+v", id.String(), dp) // nolint: forbidigo
if len(annotation) > 0 {
fmt.Printf(", annotation: %s", // nolint: forbidigo
base64.StdEncoding.EncodeToString(annotation))
}
fmt.Println("}") // nolint: forbidigo
}
fmt.Println("}") // nolint: forbidigo
annotationSizeTotal += uint64(len(annotation))
datapointCount++
}
annotationSizeTotal += uint64(len(annotation))
datapointCount++
}
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
if err := iter.Err(); err != nil {
log.Fatalf("unable to iterate original data: %v", err)
}
iter.Close()
}
iter.Close()
}

seriesCount++
}

if seriesCount != reader.Entries() {
log.Fatalf("actual time series count (%d) did not match info file data (%d)",
seriesCount, reader.Entries())
}
seriesCount++
}

if benchMode != benchmarkNone {
runTime := time.Since(start)
fmt.Printf("Running time: %s\n", runTime) // nolint: forbidigo
fmt.Printf("\n%d series read\n", seriesCount) // nolint: forbidigo
if runTime > 0 {
fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds()) // nolint: forbidigo
if seriesCount != reader.Entries() {
log.Warnf("actual time series count (%d) did not match info file data (%d)",
seriesCount, reader.Entries())
}

if benchMode == benchmarkDatapoints {
fmt.Printf("\n%d datapoints decoded\n", datapointCount) // nolint: forbidigo
if benchMode != benchmarkNone {
runTime := time.Since(start)
fmt.Printf("Running time: %s\n", runTime) // nolint: forbidigo
fmt.Printf("\n%d series read\n", seriesCount) // nolint: forbidigo
if runTime > 0 {
fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds()) // nolint: forbidigo
fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds()) // nolint: forbidigo
}

fmt.Printf("\nTotal annotation size: %d bytes\n", annotationSizeTotal) // nolint: forbidigo
if benchMode == benchmarkDatapoints {
fmt.Printf("\n%d datapoints decoded\n", datapointCount) // nolint: forbidigo
if runTime > 0 {
fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds()) // nolint: forbidigo
}

fmt.Printf("\nTotal annotation size: %d bytes\n", annotationSizeTotal) // nolint: forbidigo
}
}
}

if err := reader.Close(); err != nil {
log.Fatalf("unable to close reader: %v", err)
}
}

func getShards(pathPrefix string, fileSetType persist.FileSetType, namespace string) ([]uint32, error) {
nsID := ident.StringID(namespace)
path := fs.NamespaceDataDirPath(pathPrefix, nsID)
if fileSetType == persist.FileSetSnapshotType {
path = fs.NamespaceSnapshotsDirPath(pathPrefix, nsID)
}

files, err := ioutil.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("failed reading namespace directory: %w", err)
}

shards := make([]uint32, 0)
for _, f := range files {
if !f.IsDir() {
continue
}
i, err := strconv.Atoi(f.Name())
if err != nil {
return nil, fmt.Errorf("failed extracting shard number: %w", err)
}
if i < 0 {
return nil, fmt.Errorf("negative shard number %v", i)
}
shards = append(shards, uint32(i))
}

return shards, nil
}