Skip to content

Commit

Permalink
feat: metadata backup returns bucket manifests (#21601)
Browse files Browse the repository at this point in the history
* fix: properly omit empty time.Time that are optional

* fix: optimize slice creation in manifest fns
  • Loading branch information
williamhbaker authored Jun 4, 2021
1 parent ce53603 commit 34a4a8e
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 17 deletions.
48 changes: 48 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type SqlBackupRestoreService interface {
UnlockSqlStore()
}

type BucketManifestWriter interface {
WriteManifest(ctx context.Context, w io.Writer) error
}

// RestoreService represents the data restore functions of InfluxDB.
type RestoreService interface {
// RestoreKVStore restores & replaces metadata database.
Expand All @@ -54,6 +58,50 @@ type RestoreService interface {
RestoreShard(ctx context.Context, shardID uint64, r io.Reader) error
}

// BucketMetadataManifest contains the information about a bucket for backup purposes.
// It is composed of various nested structs below.
type BucketMetadataManifest struct {
OrganizationID platform.ID `json:"organizationID"`
OrganizationName string `json:"organizationName"`
BucketID platform.ID `json:"bucketID"`
BucketName string `json:"bucketName"`
DefaultRetentionPolicy string `json:"defaultRetentionPolicy"`
RetentionPolicies []RetentionPolicyManifest `json:"retentionPolicies"`
}

type RetentionPolicyManifest struct {
Name string `json:"name"`
ReplicaN int `json:"replicaN"`
Duration time.Duration `json:"duration"`
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
ShardGroups []ShardGroupManifest `json:"shardGroups"`
Subscriptions []SubscriptionManifest `json:"subscriptions"`
}

type ShardGroupManifest struct {
ID uint64 `json:"id"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
DeletedAt *time.Time `json:"deletedAt,omitempty"` // use pointer to time.Time so that omitempty works
TruncatedAt *time.Time `json:"truncatedAt,omitempty"` // use pointer to time.Time so that omitempty works
Shards []ShardManifest `json:"shards"`
}

type ShardManifest struct {
ID uint64 `json:"id"`
ShardOwners []ShardOwner `json:"shardOwners"`
}

type ShardOwner struct {
NodeID uint64 `json:"nodeID"`
}

type SubscriptionManifest struct {
Name string `json:"name"`
Mode string `json:"mode"`
Destinations []string `json:"destinations"`
}

// Manifest lists the KV and shard file information contained in the backup.
type Manifest struct {
KV ManifestKVEntry `json:"kv"`
Expand Down
127 changes: 127 additions & 0 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -20,6 +21,132 @@ import (
"go.uber.org/zap"
)

type BucketManifestWriter struct {
ts *tenant.Service
mc *meta.Client
}

func NewBucketManifestWriter(ts *tenant.Service, mc *meta.Client) BucketManifestWriter {
return BucketManifestWriter{
ts: ts,
mc: mc,
}
}

// WriteManifest writes a bucket manifest describing all of the buckets that exist in the database.
// It is intended to be used to write to an HTTP response after appropriate measures have been taken
// to ensure that the request is authorized.
func (b BucketManifestWriter) WriteManifest(ctx context.Context, w io.Writer) error {
bkts, _, err := b.ts.FindBuckets(ctx, influxdb.BucketFilter{})
if err != nil {
return err
}

l := make([]influxdb.BucketMetadataManifest, 0, len(bkts))

for _, bkt := range bkts {
org, err := b.ts.OrganizationService.FindOrganizationByID(ctx, bkt.OrgID)
if err != nil {
return err
}

dbInfo := b.mc.Database(bkt.ID.String())

l = append(l, influxdb.BucketMetadataManifest{
OrganizationID: bkt.OrgID,
OrganizationName: org.Name,
BucketID: bkt.ID,
BucketName: bkt.Name,
DefaultRetentionPolicy: dbInfo.DefaultRetentionPolicy,
RetentionPolicies: retentionPolicyToManifest(dbInfo.RetentionPolicies),
})
}

return json.NewEncoder(w).Encode(&l)
}

// retentionPolicyToManifest and the various similar functions that follow are for converting
// from the structs in the meta package to the manifest structs
func retentionPolicyToManifest(meta []meta.RetentionPolicyInfo) []influxdb.RetentionPolicyManifest {
r := make([]influxdb.RetentionPolicyManifest, 0, len(meta))

for _, m := range meta {
r = append(r, influxdb.RetentionPolicyManifest{
Name: m.Name,
ReplicaN: m.ReplicaN,
Duration: m.Duration,
ShardGroupDuration: m.ShardGroupDuration,
ShardGroups: shardGroupToManifest(m.ShardGroups),
Subscriptions: subscriptionInfosToManifest(m.Subscriptions),
})
}

return r
}

func subscriptionInfosToManifest(subInfos []meta.SubscriptionInfo) []influxdb.SubscriptionManifest {
r := make([]influxdb.SubscriptionManifest, 0, len(subInfos))

for _, s := range subInfos {
r = append(r, influxdb.SubscriptionManifest(s))
}

return r
}

func shardGroupToManifest(shardGroups []meta.ShardGroupInfo) []influxdb.ShardGroupManifest {
r := make([]influxdb.ShardGroupManifest, 0, len(shardGroups))

for _, s := range shardGroups {
deletedAt := &s.DeletedAt
truncatedAt := &s.TruncatedAt

// set deletedAt and truncatedAt to nil rather than their zero values so that the fields
// can be properly omitted from the JSON response if they are empty
if deletedAt.IsZero() {
deletedAt = nil
}

if truncatedAt.IsZero() {
truncatedAt = nil
}

r = append(r, influxdb.ShardGroupManifest{
ID: s.ID,
StartTime: s.StartTime,
EndTime: s.EndTime,
DeletedAt: deletedAt,
TruncatedAt: truncatedAt,
Shards: shardInfosToManifest(s.Shards),
})
}

return r
}

func shardInfosToManifest(shards []meta.ShardInfo) []influxdb.ShardManifest {
r := make([]influxdb.ShardManifest, 0, len(shards))

for _, s := range shards {
r = append(r, influxdb.ShardManifest{
ID: s.ID,
ShardOwners: shardOwnersToManifest(s.Owners),
})
}

return r
}

func shardOwnersToManifest(shardOwners []meta.ShardOwner) []influxdb.ShardOwner {
r := make([]influxdb.ShardOwner, 0, len(shardOwners))

for _, s := range shardOwners {
r = append(r, influxdb.ShardOwner(s))
}

return r
}

type Request struct {
// Organization to backup.
// If not set, all orgs will be included.
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/authorizer"
"github.com/influxdata/influxdb/v2/backup"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/checks"
"github.com/influxdata/influxdb/v2/chronograf/server"
Expand Down Expand Up @@ -704,6 +705,8 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
ts.BucketService = storage.NewBucketService(m.log, ts.BucketService, m.engine)
ts.BucketService = dbrp.NewBucketService(m.log, ts.BucketService, dbrpSvc)

bucketManifestWriter := backup.NewBucketManifestWriter(ts, metaClient)

onboardingLogger := m.log.With(zap.String("handler", "onboard"))
onboardOpts := []tenant.OnboardServiceOptionFn{tenant.WithOnboardingLogger(onboardingLogger)}
if opts.TestingAlwaysAllowSetup {
Expand Down Expand Up @@ -774,6 +777,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
DeleteService: deleteService,
BackupService: backupService,
SqlBackupRestoreService: m.sqlStore,
BucketManifestWriter: bucketManifestWriter,
RestoreService: restoreService,
AuthorizationService: authSvc,
AuthorizationV1Service: authSvcV1,
Expand Down
1 change: 1 addition & 0 deletions http/api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type APIBackend struct {
DeleteService influxdb.DeleteService
BackupService influxdb.BackupService
SqlBackupRestoreService influxdb.SqlBackupRestoreService
BucketManifestWriter influxdb.BucketManifestWriter
RestoreService influxdb.RestoreService
AuthorizationService influxdb.AuthorizationService
AuthorizationV1Service influxdb.AuthorizationService
Expand Down
33 changes: 19 additions & 14 deletions http/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/v2/authorizer"
Expand All @@ -27,6 +26,7 @@ type BackupBackend struct {

BackupService influxdb.BackupService
SqlBackupRestoreService influxdb.SqlBackupRestoreService
BucketManifestWriter influxdb.BucketManifestWriter
}

// NewBackupBackend returns a new instance of BackupBackend.
Expand All @@ -37,6 +37,7 @@ func NewBackupBackend(b *APIBackend) *BackupBackend {
HTTPErrorHandler: b.HTTPErrorHandler,
BackupService: b.BackupService,
SqlBackupRestoreService: b.SqlBackupRestoreService,
BucketManifestWriter: b.BucketManifestWriter,
}
}

Expand All @@ -48,6 +49,7 @@ type BackupHandler struct {

BackupService influxdb.BackupService
SqlBackupRestoreService influxdb.SqlBackupRestoreService
BucketManifestWriter influxdb.BucketManifestWriter
}

const (
Expand All @@ -67,6 +69,7 @@ func NewBackupHandler(b *BackupBackend) *BackupHandler {
Logger: b.Logger,
BackupService: b.BackupService,
SqlBackupRestoreService: b.SqlBackupRestoreService,
BucketManifestWriter: b.BucketManifestWriter,
}

h.HandlerFunc(http.MethodGet, backupKVStorePath, h.handleBackupKVStore)
Expand Down Expand Up @@ -151,42 +154,44 @@ func (h *BackupHandler) handleBackupMetadata(w http.ResponseWriter, r *http.Requ
w.Header().Set("Content-Type", "multipart/mixed; boundary="+dataWriter.Boundary())

parts := []struct {
fieldname string
filename string
writeFn func(io.Writer) error
contentType string
contentDisposition string
writeFn func(io.Writer) error
}{
{
"kv",
fmt.Sprintf("%s.bolt", baseName),
"application/octet-stream",
fmt.Sprintf("attachment; name=%q; filename=%q", "kv", fmt.Sprintf("%s.bolt", baseName)),
func(fw io.Writer) error {
return h.BackupService.BackupKVStore(ctx, fw)
},
},
{
"sql",
fmt.Sprintf("%s.sqlite", baseName),
"application/octet-stream",
fmt.Sprintf("attachment; name=%q; filename=%q", "sql", fmt.Sprintf("%s.sqlite", baseName)),
func(fw io.Writer) error {
return h.SqlBackupRestoreService.BackupSqlStore(ctx, fw)
},
},
{
"buckets",
fmt.Sprintf("%s.json", baseName),
"application/json; charset=utf-8",
fmt.Sprintf("attachment; name=%q; filename=%q", "buckets", fmt.Sprintf("%s.json", baseName)),
func(fw io.Writer) error {
_, err := io.Copy(fw, strings.NewReader("buckets json - to be implemented"))
return err
return h.BucketManifestWriter.WriteManifest(ctx, fw)
},
},
}

for _, p := range parts {
fw, err := dataWriter.CreateFormFile(p.fieldname, p.filename)
pw, err := dataWriter.CreatePart(map[string][]string{
"Content-Type": {p.contentType},
"Content-Disposition": {p.contentDisposition},
})
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}

if err := p.writeFn(fw); err != nil {
if err := p.writeFn(pw); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
Expand Down
16 changes: 13 additions & 3 deletions http/backup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ func TestBackupMetaService(t *testing.T) {
ctrlr := gomock.NewController(t)
backupSvc := mock.NewMockBackupService(ctrlr)
sqlBackupSvc := mock.NewMockSqlBackupRestoreService(ctrlr)
bucketManifestWriter := mock.NewMockBucketManifestWriter(ctrlr)

b := &BackupBackend{
BackupService: backupSvc,
SqlBackupRestoreService: sqlBackupSvc,
BucketManifestWriter: bucketManifestWriter,
}
h := NewBackupHandler(b)

Expand Down Expand Up @@ -53,6 +55,10 @@ func TestBackupMetaService(t *testing.T) {
sqlBackupSvc.EXPECT().
UnlockSqlStore()

bucketManifestWriter.EXPECT().
WriteManifest(gomock.Any(), gomock.Any()).
Return(nil)

h.handleBackupMetadata(rr, r)
rs := rr.Result()
require.Equal(t, rs.StatusCode, http.StatusOK)
Expand All @@ -67,14 +73,18 @@ func TestBackupMetaService(t *testing.T) {
// The file from the part could be read using something like ioutil.ReadAll, but for testing
// the contents of the part is not meaningful.
got := make(map[string]string)
for {
wantContentTypes := []string{"application/octet-stream", "application/octet-stream", "application/json; charset=utf-8"}
for i := 0; ; i++ {
p, err := mr.NextPart()
if err == io.EOF {
break
}
require.NoError(t, err)
require.Equal(t, "application/octet-stream", p.Header.Get("Content-Type"))
got[p.FormName()] = p.FileName()
require.Equal(t, wantContentTypes[i], p.Header.Get("Content-Type"))

_, params, err := mime.ParseMediaType(p.Header.Get("Content-Disposition"))
require.NoError(t, err)
got[params["name"]] = p.FileName()
}

// wants is a map of form names with the expected extension of the file for that part
Expand Down
Loading

0 comments on commit 34a4a8e

Please sign in to comment.