Skip to content

Commit

Permalink
Merge pull request #257 from Altinity/master
Browse files Browse the repository at this point in the history
Adding UPLOAD_CONCURRENCY and DOWNLOAD_CONCURRENCY, prepare 1.1.0
  • Loading branch information
Slach authored Sep 9, 2021
2 parents da3824d + d456868 commit 5532dce
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 156 deletions.
2 changes: 2 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ general:
backups_to_keep_remote: 0 # BACKUPS_TO_KEEP_REMOTE
log_level: info # LOG_LEVEL
allow_empty_backups: false # ALLOW_EMPTY_BACKUPS
download_concurrency: 1 # DOWNLOAD_CONCURRENCY, max 255
upload_concurrency: 1 # UPLOAD_CONCURRENCY, max 255
clickhouse:
username: default # CLICKHOUSE_USERNAME
password: "" # CLICKHOUSE_PASSWORD
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"crypto/tls"
"fmt"
"io/ioutil"
"math"
"os"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -35,6 +37,8 @@ type GeneralConfig struct {
BackupsToKeepRemote int `yaml:"backups_to_keep_remote" envconfig:"BACKUPS_TO_KEEP_REMOTE"`
LogLevel string `yaml:"log_level" envconfig:"LOG_LEVEL"`
AllowEmptyBackups bool `yaml:"allow_empty_backups" envconfig:"ALLOW_EMPTY_BACKUPS"`
DownloadConcurrency uint8 `yaml:"download_concurrency" envconfig:"DOWNLOAD_CONCURRENCY"`
UploadConcurrency uint8 `yaml:"upload_concurrency" envconfig:"UPLOAD_CONCURRENCY"`
}

// GCSConfig - GCS settings section
Expand Down Expand Up @@ -268,13 +272,20 @@ func PrintDefaultConfig() {
}

func DefaultConfig() *Config {
availableConcurrency := uint8(1)
if runtime.NumCPU() > 1 {
availableConcurrency = uint8(math.Min(float64(runtime.NumCPU()/2), 128))
}
return &Config{
General: GeneralConfig{
RemoteStorage: "none",
MaxFileSize: 100 * 1024 * 1024 * 1024, // 100GB
BackupsToKeepLocal: 0,
BackupsToKeepRemote: 0,
LogLevel: "info",
DisableProgressBar: true,
UploadConcurrency: availableConcurrency,
DownloadConcurrency: availableConcurrency,
},
ClickHouse: ClickHouseConfig{
Username: "default",
Expand Down
47 changes: 20 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,46 +1,39 @@
module github.com/AlexAkulov/clickhouse-backup

require (
cloud.google.com/go/storage v1.10.0
cloud.google.com/go/storage v1.16.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.10.1-0.20200807102407-24fe552e0870
github.com/ClickHouse/clickhouse-go v1.4.5
github.com/ClickHouse/clickhouse-go v1.4.7
github.com/apex/log v1.9.0
github.com/aws/aws-sdk-go v1.37.15
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/djherbis/buffer v1.1.0 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/frankban/quicktest v1.10.0 // indirect
github.com/go-logfmt/logfmt v0.4.0
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/aws/aws-sdk-go v1.40.31
github.com/djherbis/buffer v1.2.0
github.com/go-logfmt/logfmt v0.5.1
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4
github.com/jlaffaye/ftp v0.0.0-20200730135723-c2ee4fa2503b
github.com/jmoiron/sqlx v1.2.0
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.8.0
github.com/jlaffaye/ftp v0.0.0-20210307004419-5d4190119067
github.com/jmoiron/sqlx v1.3.4
github.com/kelseyhightower/envconfig v1.4.0
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-shellwords v1.0.12
github.com/mholt/archiver v1.1.3-0.20190812163345-2d1449806793
github.com/mholt/archiver/v3 v3.5.0
github.com/otiai10/copy v1.6.0
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.0
github.com/prometheus/client_golang v1.7.1
github.com/stretchr/testify v1.6.1
github.com/tencentyun/cos-go-sdk-v5 v0.0.0-20200120023323-87ff3bc489ac
github.com/urfave/cli v1.22.2
github.com/pkg/sftp v1.13.2
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.30
github.com/urfave/cli v1.22.5
github.com/yargevad/filepathx v1.0.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/mod v0.3.0
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
google.golang.org/api v0.28.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5
golang.org/x/mod v0.5.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/api v0.54.0
gopkg.in/cheggaaa/pb.v1 v1.0.28
gopkg.in/djherbis/buffer.v1 v1.1.0
gopkg.in/djherbis/nio.v2 v2.0.3
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v2 v2.4.0
)

go 1.16
350 changes: 272 additions & 78 deletions go.sum

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package backup

import (
"fmt"
"github.com/AlexAkulov/clickhouse-backup/utils"
"os"
"path"
"time"

"github.com/AlexAkulov/clickhouse-backup/config"
"github.com/AlexAkulov/clickhouse-backup/pkg/clickhouse"
Expand Down Expand Up @@ -35,6 +37,7 @@ func RemoveOldBackupsLocal(cfg *config.Config, keepLastBackup bool) error {
}

func RemoveBackupLocal(cfg *config.Config, backupName string) error {
start := time.Now()
backupList, err := GetLocalBackups(cfg)
if err != nil {
return err
Expand Down Expand Up @@ -63,6 +66,7 @@ func RemoveBackupLocal(cfg *config.Config, backupName string) error {
apexLog.WithField("operation", "delete").
WithField("location", "local").
WithField("backup", backupName).
WithField("duration", utils.HumanizeDuration(time.Since(start))).
Info("done")
return nil
}
Expand All @@ -71,6 +75,7 @@ func RemoveBackupLocal(cfg *config.Config, backupName string) error {
}

func RemoveBackupRemote(cfg *config.Config, backupName string) error {
start := time.Now()
if cfg.General.RemoteStorage == "none" {
fmt.Println("RemoveBackupRemote aborted: RemoteStorage set to \"none\"")
return nil
Expand All @@ -90,7 +95,16 @@ func RemoveBackupRemote(cfg *config.Config, backupName string) error {
}
for _, backup := range backupList {
if backup.BackupName == backupName {
return bd.RemoveBackup(backup)
if err := bd.RemoveBackup(backup); err != nil {
return err
}
apexLog.WithFields(apexLog.Fields{
"backup": backupName,
"location": "remote",
"operation": "delete",
"duration": utils.HumanizeDuration(time.Since(start)),
}).Info("done")
return nil
}
}
return fmt.Errorf("'%s' is not found on remote storage", backupName)
Expand Down
88 changes: 72 additions & 16 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package backup

import (
"context"
"encoding/json"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -148,6 +151,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, schemaOnly b
metadataSize += int64(size)
log.
WithField("duration", utils.HumanizeDuration(time.Since(start))).
WithField("size", utils.FormatBytes(int64(size))).
Info("done")
}

Expand All @@ -159,20 +163,35 @@ func (b *Backuper) Download(backupName string, tablePattern string, schemaOnly b
}
}
}
for _, tableMetadata := range tableMetadataForDownload {
log.Debugf("prepare table concurrent semaphore with concurrency=%d len(tableMetadataForDownload)=%d", b.cfg.General.UploadConcurrency, len(tableMetadataForDownload))
s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
g, ctx := errgroup.WithContext(context.Background())

for i, tableMetadata := range tableMetadataForDownload {
if tableMetadata.MetadataOnly {
continue
}
dataSize += tableMetadata.TotalBytes
start := time.Now()
if err := b.downloadTableData(remoteBackup.BackupMetadata, tableMetadata); err != nil {
return err
if err := s.Acquire(ctx, 1); err != nil {
return fmt.Errorf("can't acquire semaphore during download: %v", err)
}
log.
WithField("table", fmt.Sprintf("%s.%s", tableMetadata.Database, tableMetadata.Table)).
WithField("duration", utils.HumanizeDuration(time.Since(start))).
WithField("size", utils.FormatBytes(metadataSize+tableMetadata.TotalBytes)).
Info("done")
dataSize += tableMetadata.TotalBytes
idx := i
g.Go(func() error {
defer s.Release(1)
start := time.Now()
if err := b.downloadTableData(remoteBackup.BackupMetadata, tableMetadataForDownload[idx]); err != nil {
return err
}
log.
WithField("table", fmt.Sprintf("%s.%s", tableMetadataForDownload[idx].Database, tableMetadataForDownload[idx].Table)).
WithField("duration", utils.HumanizeDuration(time.Since(start))).
WithField("size", utils.FormatBytes(tableMetadataForDownload[idx].TotalBytes)).
Info("done")
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("one of download go-routine return error: %v", err)
}
}
rbacSize, err := b.downloadRBACData(remoteBackup)
Expand Down Expand Up @@ -231,27 +250,64 @@ func (b *Backuper) downloadBackupRelatedDir(remoteBackup new_storage.Backup, pre

func (b *Backuper) downloadTableData(remoteBackup metadata.BackupMetadata, table metadata.TableMetadata) error {
uuid := path.Join(clickhouse.TablePathEncode(table.Database), clickhouse.TablePathEncode(table.Table))

s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
g, ctx := errgroup.WithContext(context.Background())

if remoteBackup.DataFormat != "directory" {
capacity := 0
for disk := range table.Files {
capacity += len(table.Files[disk])
}
apexLog.Debugf("start downloadTableData %s.%s with concurrency=%d len(table.Files[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity)

for disk := range table.Files {
diskPath := b.DiskMap[disk]
tableLocalDir := path.Join(diskPath, "backup", remoteBackup.BackupName, "shadow", uuid, disk)
for _, archiveFile := range table.Files[disk] {
tableRemoteFile := path.Join(remoteBackup.BackupName, "shadow", clickhouse.TablePathEncode(table.Database), clickhouse.TablePathEncode(table.Table), archiveFile)
if err := b.dst.CompressedStreamDownload(tableRemoteFile, tableLocalDir); err != nil {
return err
if err := s.Acquire(ctx, 1); err != nil {
return fmt.Errorf("can't acquire semaphore during download: %v", err)
}
tableRemoteFile := path.Join(remoteBackup.BackupName, "shadow", clickhouse.TablePathEncode(table.Database), clickhouse.TablePathEncode(table.Table), archiveFile)
g.Go(func() error {
apexLog.Debugf("start download from %s", tableRemoteFile)
defer s.Release(1)
if err := b.dst.CompressedStreamDownload(tableRemoteFile, tableLocalDir); err != nil {
return err
}
apexLog.Debugf("finish download from %s", tableRemoteFile)
return nil
})
}
}
} else {
capacity := 0
for disk := range table.Parts {
capacity += len(table.Parts[disk])
}
apexLog.Debugf("start downloadTableData %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity)
for disk := range table.Parts {
if err := s.Acquire(ctx, 1); err != nil {
return fmt.Errorf("can't acquire semaphore during download: %v", err)
}
tableRemotePath := path.Join(remoteBackup.BackupName, "shadow", uuid, disk)
diskPath := b.DiskMap[disk]
tableLocalDir := path.Join(diskPath, "backup", remoteBackup.BackupName, "shadow", uuid, disk)
if err := b.dst.DownloadPath(0, tableRemotePath, tableLocalDir); err != nil {
return err
}
g.Go(func() error {
apexLog.Debugf("start download from %s", tableRemotePath)
defer s.Release(1)
if err := b.dst.DownloadPath(0, tableRemotePath, tableLocalDir); err != nil {
return err
}
apexLog.Debugf("finish download from %s", tableRemotePath)
return nil
})
}
}
if err := g.Wait(); err != nil {
return fmt.Errorf("one of download go-routine return error: %v", err)
}

// Create symlink for exists parts
for disk, parts := range table.Parts {
for _, p := range parts {
Expand Down
Loading

0 comments on commit 5532dce

Please sign in to comment.