diff --git a/scripts/docker-integration-tests/cold_writes_simple/test.sh b/scripts/docker-integration-tests/cold_writes_simple/test.sh index 93fa4ebae7..bfd58373df 100755 --- a/scripts/docker-integration-tests/cold_writes_simple/test.sh +++ b/scripts/docker-integration-tests/cold_writes_simple/test.sh @@ -35,7 +35,6 @@ function write_data { } }') - if [[ $respCode -eq "200" ]]; then return 0 else @@ -63,17 +62,17 @@ function read_all { } echo "Write data for 'now - 2 * bufferPast' (testing cold writes from memory)" -write_data "coldWritesNoIndex" "foo" "$(($(date +"%s") - 60 * 10 * 2))" 12.3456789 +write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 10 * 2))" 12.3456789 echo "Expect to read 1 datapoint" -read_all "coldWritesNoIndex" "foo" 1 +read_all "coldWritesRepairAndNoIndex" "foo" 1 echo "Write data for 'now - 2 * blockSize' (testing compaction to disk)" -write_data "coldWritesNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" 98.7654321 +write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" 98.7654321 echo "Wait until cold writes are flushed" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ -n "$(docker-compose -f ${COMPOSE_FILE} exec dbnode01 find /var/lib/m3db/data/coldWritesNoIndex -name "*1-checkpoint.db")" ]' + '[ -n "$(docker-compose -f ${COMPOSE_FILE} exec dbnode01 find /var/lib/m3db/data/coldWritesRepairAndNoIndex -name "*1-checkpoint.db")" ]' echo "Restart DB (test bootstrapping cold writes)" docker-compose -f ${COMPOSE_FILE} restart dbnode01 @@ -83,4 +82,4 @@ ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' echo "Expect to read 2 datapoints" -read_all "coldWritesNoIndex" "foo" 2 +read_all "coldWritesRepairAndNoIndex" "foo" 2 diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index d37e13e45a..6e14aedf29 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -47,6 +47,70 @@ function setup_single_m3db_node { wait_for_db_init } +function setup_three_m3db_nodes { + local dbnode_host_1=${DBNODE_HOST:-dbnode01} + local dbnode_host_2=${DBNODE_HOST:-dbnode02} + local dbnode_host_3=${DBNODE_HOST:-dbnode03} + local dbnode_port=${DBNODE_PORT:-9000} + local dbnode_host_1_health_port=${DBNODE_HEALTH_PORT:-9012} + local dbnode_host_2_health_port=${DBNODE_HEALTH_PORT:-9022} + local dbnode_host_3_health_port=${DBNODE_HEALTH_PORT:-9032} + local coordinator_port=${COORDINATOR_PORT:-7201} + + echo "Wait for API to be available" + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq ".namespaces | length")" == "0" ]' + + echo "Adding placement and agg namespace" + curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/database/create -d '{ + "type": "cluster", + "namespaceName": "agg", + "retentionTime": "6h", + "num_shards": 3, + "replicationFactor": 3, + "hosts": [ + { + "id": "m3db_local_1", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "address": "'"${dbnode_host_1}"'", + "port": '"${dbnode_port}"' + }, + { + "id": "m3db_local_2", + "isolation_group": "rack-b", + "zone": "embedded", + "weight": 1024, + "address": "'"${dbnode_host_2}"'", + "port": '"${dbnode_port}"' + }, + { + "id": "m3db_local_3", + "isolation_group": "rack-c", + "zone": "embedded", + "weight": 1024, + "address": "'"${dbnode_host_3}"'", + "port": '"${dbnode_port}"' + } + ] + }' + + echo "Wait until placement is init'd" + ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local_1.id)" == \"m3db_local_1\" ]' + + wait_for_namespaces + + echo "Wait until bootstrapped" + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_1_health_port}"'/health | jq .bootstrapped)" == true ]' + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_2_health_port}"'/health | jq .bootstrapped)" == true ]' + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_host_3_health_port}"'/health | jq .bootstrapped)" == true ]' +} + function wait_for_db_init { local dbnode_host=${DBNODE_HOST:-dbnode01} local dbnode_port=${DBNODE_PORT:-9000} @@ -80,6 +144,16 @@ function wait_for_db_init { ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + wait_for_namespaces + + echo "Wait until bootstrapped" + ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:'"${dbnode_health_port}"'/health | jq .bootstrapped)" == true ]' +} + +function wait_for_namespaces { + local coordinator_port=${COORDINATOR_PORT:-7201} + echo "Wait until agg namespace is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' @@ -94,19 +168,19 @@ function wait_for_db_init { ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' - echo "Adding coldWritesNoIndex namespace" + echo "Adding coldWritesRepairAndNoIndex namespace" curl -vvvsSf -X POST 0.0.0.0:${coordinator_port}/api/v1/services/m3db/namespace -d '{ - "name": "coldWritesNoIndex", + "name": "coldWritesRepairAndNoIndex", "options": { "bootstrapEnabled": true, "flushEnabled": true, "writesToCommitLog": true, "cleanupEnabled": true, "snapshotEnabled": true, - "repairEnabled": false, + "repairEnabled": true, "coldWritesEnabled": true, "retentionOptions": { - "retentionPeriodDuration": "8h", + "retentionPeriodDuration": "4h", "blockSizeDuration": "1h", "bufferFutureDuration": "10m", "bufferPastDuration": "10m", @@ -116,12 +190,8 @@ function wait_for_db_init { } }' - echo "Wait until coldWritesNoIndex namespace is init'd" + echo "Wait until coldWritesRepairAndNoIndex namespace is init'd" ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.coldWritesNoIndex.coldWritesEnabled)" == true ]' - - echo "Wait until bootstrapped" - ATTEMPTS=100 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:'"${dbnode_health_port}"'/health | jq .bootstrapped)" == true ]' + '[ "$(curl -sSf 0.0.0.0:'"${coordinator_port}"'/api/v1/namespace | jq .registry.namespaces.coldWritesRepairAndNoIndex.coldWritesEnabled)" == true ]' } diff --git a/scripts/docker-integration-tests/repair/docker-compose.yml b/scripts/docker-integration-tests/repair/docker-compose.yml new file mode 100644 index 0000000000..2bb9780e20 --- /dev/null +++ b/scripts/docker-integration-tests/repair/docker-compose.yml @@ -0,0 +1,60 @@ +version: "3.5" +services: + dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9012:9002" + - "0.0.0.0:9013:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=m3db_local_1 + volumes: + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" + dbnode02: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9022:9002" + - "0.0.0.0:9023:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=m3db_local_2 + volumes: + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" + dbnode03: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9032:9002" + - "0.0.0.0:9033:9003" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + environment: + - M3DB_HOST_ID=m3db_local_3 + volumes: + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" + coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + - "0.0.0.0:7204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./:/etc/m3coordinator/" +networks: + backend: diff --git a/scripts/docker-integration-tests/repair/m3coordinator.yml b/scripts/docker-integration-tests/repair/m3coordinator.yml new file mode 100644 index 0000000000..466079145a --- /dev/null +++ b/scripts/docker-integration-tests/repair/m3coordinator.yml @@ -0,0 +1,46 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +logging: + level: info + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +limits: + perQuery: + maxFetchedSeries: 100 + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10h + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + +tagOptions: + idScheme: quoted diff --git a/scripts/docker-integration-tests/repair/m3dbnode.yml b/scripts/docker-integration-tests/repair/m3dbnode.yml new file mode 100644 index 0000000000..6bdae716a7 --- /dev/null +++ b/scripts/docker-integration-tests/repair/m3dbnode.yml @@ -0,0 +1,86 @@ +db: + logging: + level: info + + tracing: + backend: jaeger + jaeger: + reporter: + localAgentHostPort: jaeger:6831 + sampler: + type: const + param: 1 + + metrics: + prometheus: + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: detailed + + listenAddress: 0.0.0.0:9000 + clusterListenAddress: 0.0.0.0:9001 + httpNodeListenAddress: 0.0.0.0:9002 + httpClusterListenAddress: 0.0.0.0:9003 + debugListenAddress: 0.0.0.0:9004 + + hostID: + resolver: environment + envVarName: M3DB_HOST_ID + + client: + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + + gcPercentage: 100 + + writeNewSeriesAsync: true + writeNewSeriesLimitPerSecond: 1048576 + writeNewSeriesBackoffDuration: 2ms + + bootstrap: + # Intentionally disable peers bootstrapper to ensure it doesn't interfere with test. + bootstrappers: + - filesystem + - commitlog + - uninitialized_topology + commitlog: + returnUnfulfilledForCorruptCommitLogFiles: false + + cache: + series: + policy: lru + postingsList: + size: 262144 + + commitlog: + flushMaxBytes: 524288 + flushEvery: 1s + queue: + calculationType: fixed + size: 2097152 + + fs: + filePathPrefix: /var/lib/m3db + + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode01:2379 + seedNodes: + initialCluster: + - hostID: m3db_local_1 + endpoint: http://dbnode01:2380 + + # Enable repairs. + repair: + enabled: true + throttle: 1ms + checkInterval: 1ms + diff --git a/scripts/docker-integration-tests/repair/test.sh b/scripts/docker-integration-tests/repair/test.sh new file mode 100755 index 0000000000..a86ac6c7ad --- /dev/null +++ b/scripts/docker-integration-tests/repair/test.sh @@ -0,0 +1,86 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/repair +COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml +export REVISION + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode02 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes dbnode03 +docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes coordinator01 + +# Think of this as a defer func() in golang +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +setup_three_m3db_nodes + +function write_data { + namespace=$1 + id=$2 + timestamp=$3 + value=$4 + port=$5 + + respCode=$(curl -s -o /dev/null -X POST -w "%{http_code}" 0.0.0.0:"$port"/write -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "datapoint": { + "timestamp":'"$timestamp"', + "value": '"$value"' + } + }') + + + if [[ $respCode -eq "200" ]]; then + return 0 + else + return 1 + fi +} + +function read_all { + namespace=$1 + id=$2 + expected_datapoints=$3 + port=$4 + + received_datapoints=$(curl -sSf -X POST 0.0.0.0:"$port"/fetch -d '{ + "namespace": "'"$namespace"'", + "id": "'"$id"'", + "rangeStart": 0, + "rangeEnd":'"$(date +"%s")"' + }' | jq '.datapoints | length') + + if [[ $expected_datapoints -eq $received_datapoints ]]; then + return 0 + else + return 1 + fi +} + +# Write 2 block sizes into the past to ensure it's a repairable block since the current mutable +# block will not be repaired. Use the node-specific port to ensure the write only goes to dbnode01 +# and not the other two nodes. +echo "Write data for 'now - 2 * blockSize' to dbnode01" +write_data "coldWritesRepairAndNoIndex" "foo" "$(($(date +"%s") - 60 * 60 * 2))" 12.3456789 9012 + +# This should pass immediately since it was written to this node. +echo "Expect to read the data back from dbnode01" +read_all "coldWritesRepairAndNoIndex" "foo" 1 9012 + +# These two should eventually succeed once a repair detects the mismatch. +echo "Wait for the data to become available (via repairs) from dbnode02" +ATTEMPTS=30 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9022 + +echo "Wait for the data to become available (via repairs) from dbnode03" +ATTEMPTS=10 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff \ + read_all "coldWritesRepairAndNoIndex" "foo" 1 9022 diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index 678e2b7644..7337b32b2b 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -35,7 +35,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/peers" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/uninitialized" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/topology" ) @@ -109,6 +108,7 @@ type BootstrapConfigurationValidator interface { // New creates a bootstrap process based on the bootstrap configuration. func (bsc BootstrapConfiguration) New( validator BootstrapConfigurationValidator, + rsOpts result.Options, opts storage.Options, topoMapProvider topology.MapProvider, origin topology.Host, @@ -119,19 +119,10 @@ func (bsc BootstrapConfiguration) New( } var ( - mutableSegmentAlloc = index.NewBootstrapResultMutableSegmentAllocator( - opts.IndexOptions()) - bs bootstrap.BootstrapperProvider - err error + bs bootstrap.BootstrapperProvider + err error + fsOpts = opts.CommitLogOptions().FilesystemOptions() ) - rsOpts := result.NewOptions(). - SetInstrumentOptions(opts.InstrumentOptions()). - SetDatabaseBlockOptions(opts.DatabaseBlockOptions()). - SetSeriesCachePolicy(opts.SeriesCachePolicy()). - SetIndexMutableSegmentAllocator(mutableSegmentAlloc) - - fsOpts := opts.CommitLogOptions().FilesystemOptions() - // Start from the end of the list because the bootstrappers are ordered by precedence in descending order. for i := len(bsc.Bootstrappers) - 1; i >= 0; i-- { switch bsc.Bootstrappers[i] { diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 9a8f583915..700575c0cf 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -305,15 +305,6 @@ type RepairPolicy struct { // Enabled or disabled. Enabled bool `yaml:"enabled"` - // The repair interval. - Interval time.Duration `yaml:"interval" validate:"nonzero"` - - // The repair time offset. - Offset time.Duration `yaml:"offset" validate:"nonzero"` - - // The repair time jitter. - Jitter time.Duration `yaml:"jitter" validate:"nonzero"` - // The repair throttle. Throttle time.Duration `yaml:"throttle" validate:"nonzero"` diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 711a712ead..3b661eda97 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/environment" "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/instrument" @@ -127,9 +128,6 @@ db: repair: enabled: false - interval: 2h - offset: 30m - jitter: 1h throttle: 2m checkInterval: 1m @@ -175,7 +173,7 @@ db: lowWatermark: 0.01 highWatermark: 0.02 capacity: 4096 - hostBlockMetadataSlicePool: + replicaMetadataSlicePool: size: 131072 capacity: 3 lowWatermark: 0.01 @@ -440,9 +438,6 @@ func TestConfiguration(t *testing.T) { blockSize: null repair: enabled: false - interval: 2h0m0s - offset: 30m0s - jitter: 1h0m0s throttle: 2m0s checkInterval: 1m0s debugShadowComparisonsEnabled: false @@ -527,7 +522,7 @@ func TestConfiguration(t *testing.T) { lowWatermark: 0.01 highWatermark: 0.02 capacity: 4096 - hostBlockMetadataSlicePool: + replicaMetadataSlicePool: size: 131072 lowWatermark: 0.01 highWatermark: 0.02 @@ -989,6 +984,6 @@ db: adminClient := client.NewMockAdminClient(ctrl) _, err = cfg.DB.Bootstrap.New(validator, - storage.DefaultTestOptions(), mapProvider, origin, adminClient) + result.NewOptions(), storage.DefaultTestOptions(), mapProvider, origin, adminClient) require.NoError(t, err) } diff --git a/src/cmd/services/m3dbnode/config/pooling.go b/src/cmd/services/m3dbnode/config/pooling.go index e4ad0d81b6..fabcafe34a 100644 --- a/src/cmd/services/m3dbnode/config/pooling.go +++ b/src/cmd/services/m3dbnode/config/pooling.go @@ -149,7 +149,7 @@ var ( refillHighWaterMark: defaultRefillHighWaterMark, capacity: 4096, }, - "hostBlockMetadataSlice": poolPolicyDefault{ + "replicaMetadataSlice": poolPolicyDefault{ size: 131072, refillLowWaterMark: defaultRefillLowWaterMark, refillHighWaterMark: defaultRefillHighWaterMark, @@ -288,8 +288,8 @@ type PoolingPolicy struct { // The policy for the FetchBlocksMetadataResults pool. FetchBlocksMetadataResultsPool CapacityPoolPolicy `yaml:"fetchBlocksMetadataResultsPool"` - // The policy for the HostBlockMetadataSlice pool. - HostBlockMetadataSlicePool CapacityPoolPolicy `yaml:"hostBlockMetadataSlicePool"` + // The policy for the ReplicaMetadataSlicePool pool. + ReplicaMetadataSlicePool CapacityPoolPolicy `yaml:"replicaMetadataSlicePool"` // The policy for the BlockMetadat pool. BlockMetadataPool PoolPolicy `yaml:"blockMetadataPool"` @@ -363,7 +363,7 @@ func (p *PoolingPolicy) InitDefaultsAndValidate() error { if err := p.FetchBlocksMetadataResultsPool.initDefaultsAndValidate("fetchBlocksMetadataResults"); err != nil { return err } - if err := p.HostBlockMetadataSlicePool.initDefaultsAndValidate("hostBlockMetadataSlice"); err != nil { + if err := p.ReplicaMetadataSlicePool.initDefaultsAndValidate("replicaMetadataSlice"); err != nil { return err } if err := p.BlockMetadataPool.initDefaultsAndValidate("blockMetadata"); err != nil { diff --git a/src/cmd/services/m3dbnode/main/main_index_test.go b/src/cmd/services/m3dbnode/main/main_index_test.go index 4c95566e71..49f28904cb 100644 --- a/src/cmd/services/m3dbnode/main/main_index_test.go +++ b/src/cmd/services/m3dbnode/main/main_index_test.go @@ -24,13 +24,13 @@ package main_test import ( "fmt" + "net/http" "strconv" "strings" "sync" "testing" "text/template" "time" - "net/http" "github.com/m3db/m3/src/cluster/integration/etcd" "github.com/m3db/m3/src/cluster/placement" @@ -178,7 +178,7 @@ func TestIndexEnabledServer(t *testing.T) { }) serverWg.Done() }() - defer func(){ + defer func() { // Resetting DefaultServeMux to prevent multiple assignments // to /debug/dump in Server.Run() http.DefaultServeMux = http.NewServeMux() @@ -373,9 +373,6 @@ db: repair: enabled: false - interval: 2h - offset: 30m - jitter: 1h throttle: 2m checkInterval: 1m @@ -420,7 +417,7 @@ db: capacity: 128 lowWatermark: 0.01 highWatermark: 0.02 - hostBlockMetadataSlicePool: + replicaMetadataSlicePool: size: 128 capacity: 3 lowWatermark: 0.01 diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index b6bb158275..eddc38f71f 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -24,12 +24,12 @@ package main_test import ( "fmt" + "net/http" "strconv" "sync" "testing" "text/template" "time" - "net/http" "github.com/m3db/m3/src/cluster/integration/etcd" "github.com/m3db/m3/src/cluster/placement" @@ -170,7 +170,7 @@ func TestConfig(t *testing.T) { }) serverWg.Done() }() - defer func(){ + defer func() { // Resetting DefaultServeMux to prevent multiple assignments // to /debug/dump in Server.Run() http.DefaultServeMux = http.NewServeMux() @@ -318,7 +318,7 @@ func TestEmbeddedConfig(t *testing.T) { }) serverWg.Done() }() - defer func(){ + defer func() { // Resetting DefaultServeMux to prevent multiple assignments // to /debug/dump in Server.Run() http.DefaultServeMux = http.NewServeMux() @@ -527,9 +527,6 @@ db: repair: enabled: false - interval: 2h - offset: 30m - jitter: 1h throttle: 2m checkInterval: 1m @@ -574,7 +571,7 @@ db: capacity: 128 lowWatermark: 0.01 highWatermark: 0.02 - hostBlockMetadataSlicePool: + replicaMetadataSlicePool: size: 128 capacity: 3 lowWatermark: 0.01 diff --git a/src/dbnode/config/m3dbnode-all-config.yml b/src/dbnode/config/m3dbnode-all-config.yml index c4108e3bb1..fa38d12e4a 100644 --- a/src/dbnode/config/m3dbnode-all-config.yml +++ b/src/dbnode/config/m3dbnode-all-config.yml @@ -154,9 +154,6 @@ db: # This feature is currently not working, do not enable. repair: enabled: false - interval: 2h - offset: 30m - jitter: 1h throttle: 2m checkInterval: 1m @@ -202,7 +199,7 @@ db: capacity: 4096 lowWatermark: 0.7 highWatermark: 1.0 - hostBlockMetadataSlicePool: + replicaMetadataSlicePool: size: 131072 capacity: 3 lowWatermark: 0.7 diff --git a/src/dbnode/encoding/m3tsz/encoder.go b/src/dbnode/encoding/m3tsz/encoder.go index 3a927a2f4d..ee8b158abc 100644 --- a/src/dbnode/encoding/m3tsz/encoder.go +++ b/src/dbnode/encoding/m3tsz/encoder.go @@ -26,11 +26,11 @@ import ( "time" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" xtime "github.com/m3db/m3/src/x/time" - "github.com/m3db/m3/src/dbnode/namespace" ) var ( @@ -310,9 +310,26 @@ func (enc *encoder) LastEncoded() (ts.Datapoint, error) { return result, nil } -// Len returns the length of the data stream. +// Len returns the length of the final data stream that would be generated +// by a call to Stream(). func (enc *encoder) Len() int { - return enc.os.Len() + raw, pos := enc.os.Rawbytes() + if len(raw) == 0 { + return 0 + } + + // Calculate how long the stream would be once it was "capped" with a tail. + var ( + lastIdx = len(raw) - 1 + lastByte = raw[lastIdx] + scheme = enc.opts.MarkerEncodingScheme() + tail = scheme.Tail(lastByte, pos) + ) + tail.IncRef() + tailLen := tail.Len() + tail.DecRef() + + return len(raw[:lastIdx]) + tailLen } // Close closes the encoder. diff --git a/src/dbnode/encoding/m3tsz/encoder_test.go b/src/dbnode/encoding/m3tsz/encoder_test.go index 4b5e65cae3..8a287a7ea9 100644 --- a/src/dbnode/encoding/m3tsz/encoder_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_test.go @@ -409,6 +409,23 @@ func TestEncoderLastEncoded(t *testing.T) { }) } +func TestEncoderLenReturnsFinalStreamLength(t *testing.T) { + testMultiplePasses(t, multiplePassesTest{ + postEncodeAll: func(enc *encoder, numDatapointsEncoded int) { + encLen := enc.Len() + stream, ok := enc.Stream(encoding.StreamOptions{}) + + var streamLen int + if ok { + segment, err := stream.Segment() + require.NoError(t, err) + streamLen = segment.Len() + } + require.Equal(t, streamLen, encLen) + }, + }) +} + type multiplePassesTest struct { preEncodeAll func(enc *encoder, numDatapointsToEncode int) preEncodeDatapoint func(enc *encoder, datapoint ts.Datapoint) diff --git a/src/dbnode/encoding/proto/round_trip_prop_test.go b/src/dbnode/encoding/proto/round_trip_prop_test.go index 1bc8c3ea08..2dbaf06818 100644 --- a/src/dbnode/encoding/proto/round_trip_prop_test.go +++ b/src/dbnode/encoding/proto/round_trip_prop_test.go @@ -163,10 +163,16 @@ func TestRoundTripProp(t *testing.T) { enc.SetSchema(setSchemaDescr) } + // Ensure that the Len() method always returns the length of the final stream that would + // be returned by a call to Stream(). + encLen := enc.Len() stream, ok := enc.Stream(encoding.StreamOptions{}) if !ok { return true, nil } + segment, err := stream.Segment() + require.NoError(t, err) + require.Equal(t, segment.Len(), encLen) iter.Reset(stream, schemaDescr) diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 882c79f355..d8f1e9ccd3 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -70,7 +70,7 @@ type Encoder interface { // an error is returned. LastEncoded() (ts.Datapoint, error) - // Len returns the length of the encoded bytes in the encoder. + // Len returns the length of the encoded stream as returned by a call to Stream(). Len() int // Reset resets the start time of the encoder and the internal state. diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index d4f1b8e55d..7d3c87d400 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -25,6 +25,7 @@ package integration import ( "bytes" "errors" + "fmt" "testing" "time" @@ -199,6 +200,22 @@ func verifyForTime( filesetType persist.FileSetType, expected generate.SeriesBlock, ) { + err := checkForTime( + storageOpts, reader, shardSet, iteratorPool, timestamp, + nsCtx, filesetType, expected) + require.NoError(t, err) +} + +func checkForTime( + storageOpts storage.Options, + reader fs.DataFileSetReader, + shardSet sharding.ShardSet, + iteratorPool encoding.ReaderIteratorPool, + timestamp time.Time, + nsCtx ns.Context, + filesetType persist.FileSetType, + expected generate.SeriesBlock, +) error { shards := make(map[uint32]struct{}) for _, series := range expected { shard := shardSet.Lookup(series.ID) @@ -220,24 +237,39 @@ func verifyForTime( // Identify the latest volume for this block start. case persist.FileSetSnapshotType: snapshotFiles, err := fs.SnapshotFiles(filePathPrefix, nsCtx.ID, shard) - require.NoError(t, err) + if err != nil { + return err + } latest, ok := snapshotFiles.LatestVolumeForBlock(timestamp) - require.True(t, ok) + if !ok { + return fmt.Errorf("no latest snapshot volume for block: %v", timestamp) + } rOpts.Identifier.VolumeIndex = latest.ID.VolumeIndex case persist.FileSetFlushType: dataFiles, err := fs.DataFiles(filePathPrefix, nsCtx.ID, shard) - require.NoError(t, err) + if err != nil { + return err + } latest, ok := dataFiles.LatestVolumeForBlock(timestamp) - require.True(t, ok) + if !ok { + return fmt.Errorf("no latest data volume for block: %v", timestamp) + } rOpts.Identifier.VolumeIndex = latest.ID.VolumeIndex } - require.NoError(t, reader.Open(rOpts)) + if err := reader.Open(rOpts); err != nil { + return err + } + for i := 0; i < reader.Entries(); i++ { id, tagsIter, data, _, err := reader.Read() - require.NoError(t, err) + if err != nil { + return err + } tags, err := testutil.NewTagsFromTagIterator(tagsIter) - require.NoError(t, err) + if err != nil { + return err + } data.IncRef() @@ -248,7 +280,9 @@ func verifyForTime( dp, _, ann := it.Current() datapoints = append(datapoints, generate.TestValue{Datapoint: dp, Annotation: ann}) } - require.NoError(t, it.Err()) + if err := it.Err(); err != nil { + return err + } it.Close() actual = append(actual, generate.Series{ @@ -260,10 +294,12 @@ func verifyForTime( data.DecRef() data.Finalize() } - require.NoError(t, reader.Close()) + if err := reader.Close(); err != nil { + return err + } } - compareSeriesList(t, expected, actual) + return compareSeriesList(expected, actual) } func verifyFlushedDataFiles( @@ -273,16 +309,33 @@ func verifyFlushedDataFiles( nsID ident.ID, seriesMaps map[xtime.UnixNano]generate.SeriesBlock, ) { + err := checkFlushedDataFiles(shardSet, storageOpts, nsID, seriesMaps) + require.NoError(t, err) +} + +func checkFlushedDataFiles( + shardSet sharding.ShardSet, + storageOpts storage.Options, + nsID ident.ID, + seriesMaps map[xtime.UnixNano]generate.SeriesBlock, +) error { fsOpts := storageOpts.CommitLogOptions().FilesystemOptions() reader, err := fs.NewReader(storageOpts.BytesPool(), fsOpts) - require.NoError(t, err) + if err != nil { + return err + } iteratorPool := storageOpts.ReaderIteratorPool() nsCtx := ns.NewContextFor(nsID, storageOpts.SchemaRegistry()) for timestamp, seriesList := range seriesMaps { - verifyForTime( - t, storageOpts, reader, shardSet, iteratorPool, timestamp.ToTime(), + err := checkForTime( + storageOpts, reader, shardSet, iteratorPool, timestamp.ToTime(), nsCtx, persist.FileSetFlushType, seriesList) + if err != nil { + return err + } } + + return nil } func verifySnapshottedDataFiles( diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 91f4eb5958..f8a79a22c1 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -114,6 +114,7 @@ type bootstrappableTestSetupOptions struct { testStatsReporter xmetrics.TestStatsReporter disablePeersBootstrapper bool useTChannelClientForWriting bool + enableRepairs bool } type closeFn func() @@ -159,6 +160,7 @@ func newDefaultBootstrappableTestSetups( bootstrapConsistencyLevel = setupOpts[i].bootstrapConsistencyLevel topologyInitializer = setupOpts[i].topologyInitializer testStatsReporter = setupOpts[i].testStatsReporter + enableRepairs = setupOpts[i].enableRepairs origin topology.Host instanceOpts = newMultiAddrTestOptions(opts, instance) ) @@ -292,6 +294,19 @@ func newDefaultBootstrappableTestSetups( setup.storageOpts = setup.storageOpts.SetBootstrapProcessProvider(provider) + if enableRepairs { + setup.storageOpts = setup.storageOpts. + SetRepairEnabled(true). + SetRepairOptions( + setup.storageOpts.RepairOptions(). + SetRepairThrottle(time.Millisecond). + SetRepairCheckInterval(time.Millisecond). + SetAdminClient(adminClient). + SetDebugShadowComparisonsPercentage(1.0). + // Avoid log spam. + SetDebugShadowComparisonsEnabled(false)) + } + setups = append(setups, setup) appendCleanupFn(func() { setup.close() diff --git a/src/dbnode/integration/integration_data_verify.go b/src/dbnode/integration/integration_data_verify.go index 6082753ad9..bfc94a66c6 100644 --- a/src/dbnode/integration/integration_data_verify.go +++ b/src/dbnode/integration/integration_data_verify.go @@ -21,9 +21,11 @@ package integration import ( + "bytes" "encoding/json" "fmt" "os" + "reflect" "sort" "testing" "time" @@ -38,7 +40,6 @@ import ( xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -212,6 +213,17 @@ func verifySeriesMapForRange( return true } +func containsSeries(ts *testSetup, namespace, seriesID ident.ID, start, end time.Time) (bool, error) { + req := rpc.NewFetchRequest() + req.NameSpace = namespace.String() + req.ID = seriesID.String() + req.RangeStart = xtime.ToNormalizedTime(start, time.Second) + req.RangeEnd = xtime.ToNormalizedTime(end, time.Second) + req.ResultTimeType = rpc.TimeType_UNIX_SECONDS + fetched, err := ts.fetch(req) + return len(fetched) != 0, err +} + func writeVerifyDebugOutput( t *testing.T, filePath string, start, end time.Time, series generate.SeriesBlock) bool { w, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, os.ModeAppend) @@ -307,17 +319,35 @@ func createFileIfPrefixSet(t *testing.T, prefix, suffix string) (string, bool) { } func compareSeriesList( - t *testing.T, expected generate.SeriesBlock, actual generate.SeriesBlock, -) { +) error { sort.Sort(expected) sort.Sort(actual) - require.Equal(t, len(expected), len(actual)) + if len(expected) != len(actual) { + return fmt.Errorf( + "number of expected series: %d did not match actual: %d", + len(expected), len(actual)) + } for i := range expected { - require.Equal(t, expected[i].ID.Bytes(), actual[i].ID.Bytes()) - require.Equal(t, expected[i].Data, expected[i].Data) + if !bytes.Equal(expected[i].ID.Bytes(), actual[i].ID.Bytes()) { + return fmt.Errorf( + "series ID did not match, expected: %s, actual: %s", + expected[i].ID.String(), actual[i].ID.String()) + } + if len(expected[i].Data) != len(actual[i].Data) { + return fmt.Errorf( + "data for series: %s did not match, expected: %d data points, actual: %d", + expected[i].ID.String(), len(expected[i].Data), len(actual[i].Data)) + } + if !reflect.DeepEqual(expected[i].Data, actual[i].Data) { + return fmt.Errorf( + "data for series: %s did not match, expected: %v, actual: %v", + expected[i].ID.String(), expected[i].Data, actual[i].Data) + } } + + return nil } diff --git a/src/dbnode/integration/repair_test.go b/src/dbnode/integration/repair_test.go new file mode 100644 index 0000000000..b487646e05 --- /dev/null +++ b/src/dbnode/integration/repair_test.go @@ -0,0 +1,270 @@ +// +build integration + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/require" +) + +func TestRepairDisjointSeries(t *testing.T) { + genRepairData := func(now time.Time, blockSize time.Duration) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart, + ) { + currBlockStart := now.Truncate(blockSize) + node0Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"foo"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + }) + node1Data = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"bar"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + }) + + allData = make(map[xtime.UnixNano]generate.SeriesBlock) + for start, data := range node0Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + for start, data := range node1Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + for start, data := range node2Data { + for _, series := range data { + allData[start] = append(allData[start], series) + } + } + + return node0Data, node1Data, node2Data, allData + } + + testRepair(t, genRepairData, testRepairOptions{}) +} + +func TestRepairMergeSeries(t *testing.T) { + genRepairData := func(now time.Time, blockSize time.Duration) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart, + ) { + currBlockStart := now.Truncate(blockSize) + allData = generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-4 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-3 * blockSize)}, + {IDs: []string{"foo", "baz"}, NumPoints: 90, Start: currBlockStart.Add(-2 * blockSize)}}) + node0Data = make(map[xtime.UnixNano]generate.SeriesBlock) + node1Data = make(map[xtime.UnixNano]generate.SeriesBlock) + + remainder := 0 + appendSeries := func(target map[xtime.UnixNano]generate.SeriesBlock, start time.Time, s generate.Series) { + var dataWithMissing []generate.TestValue + for i := range s.Data { + if i%2 != remainder { + continue + } + dataWithMissing = append(dataWithMissing, s.Data[i]) + } + target[xtime.ToUnixNano(start)] = append( + target[xtime.ToUnixNano(start)], + generate.Series{ID: s.ID, Data: dataWithMissing}, + ) + remainder = 1 - remainder + } + for start, data := range allData { + for _, series := range data { + appendSeries(node0Data, start.ToTime(), series) + appendSeries(node1Data, start.ToTime(), series) + } + } + + return node0Data, node1Data, node2Data, allData + } + + testRepair(t, genRepairData, testRepairOptions{}) +} + +func TestRepairDoesNotRepairCurrentBlock(t *testing.T) { + genRepairData := func(now time.Time, blockSize time.Duration) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart, + ) { + currBlockStart := now.Truncate(blockSize) + node0Data = generate.BlocksByStart([]generate.BlockConfig{ + // Write in previous block should be repaired. + {IDs: []string{"prevBlock1", "prevBlock2"}, NumPoints: 1, Start: currBlockStart.Add(-blockSize)}, + // Write in current block, should not be repaired. + {IDs: []string{"currBlock1", "currBlock2"}, NumPoints: 1, Start: currBlockStart}, + }) + + allData = make(map[xtime.UnixNano]generate.SeriesBlock) + for start, data := range node0Data { + if !start.ToTime().Equal(currBlockStart) { + allData[start] = data + } + } + require.Equal(t, 1, len(allData)) + + return node0Data, node1Data, node2Data, allData + } + + currBlockSeries := []ident.ID{ident.StringID("currBlock1"), ident.StringID("currBlock2")} + testRepairOpts := testRepairOptions{ + node1ShouldNotContainSeries: currBlockSeries, + node2ShouldNotContainSeries: currBlockSeries} + testRepair(t, genRepairData, testRepairOpts) +} + +type genRepairDatafn func( + now time.Time, + blockSize time.Duration, +) ( + node0Data generate.SeriesBlocksByStart, + node1Data generate.SeriesBlocksByStart, + node2Data generate.SeriesBlocksByStart, + allData generate.SeriesBlocksByStart) + +type testRepairOptions struct { + node0ShouldNotContainSeries []ident.ID + node1ShouldNotContainSeries []ident.ID + node2ShouldNotContainSeries []ident.ID +} + +func testRepair( + t *testing.T, + genRepairData genRepairDatafn, + testRepairOpts testRepairOptions, +) { + if testing.Short() { + t.SkipNow() + } + + // Test setups. + log := xtest.NewLogger(t) + retentionOpts := retention.NewOptions(). + SetRetentionPeriod(20 * time.Hour). + SetBlockSize(2 * time.Hour). + SetBufferPast(10 * time.Minute). + SetBufferFuture(2 * time.Minute) + nsOpts := namespace.NewOptions(). + SetRepairEnabled(true). + // Explicitly ensure that the repair feature works even if cold writes is disabled + // at the namespace level. + SetColdWritesEnabled(false). + SetRetentionOptions(retentionOpts) + namesp, err := namespace.NewMetadata(testNamespaces[0], nsOpts) + require.NoError(t, err) + opts := newTestOptions(t). + SetNamespaces([]namespace.Metadata{namesp}) + + setupOpts := []bootstrappableTestSetupOptions{ + {disablePeersBootstrapper: true, enableRepairs: true}, + {disablePeersBootstrapper: true, enableRepairs: true}, + {disablePeersBootstrapper: true, enableRepairs: true}, + } + setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts) + defer closeFn() + + // Ensure that the current time is set such that the previous block is flushable. + blockSize := retentionOpts.BlockSize() + now := setups[0].getNowFn().Truncate(blockSize).Add(retentionOpts.BufferPast()).Add(time.Second) + for _, setup := range setups { + setup.setNowFn(now) + } + + node0Data, node1Data, node2Data, allData := genRepairData(now, blockSize) + if node0Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[0], node0Data, 0)) + } + if node1Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[1], node1Data, 0)) + } + if node2Data != nil { + require.NoError(t, writeTestDataToDisk(namesp, setups[2], node2Data, 0)) + } + + // Start the servers with filesystem bootstrappers. + setups.parallel(func(s *testSetup) { + if err := s.startServer(); err != nil { + panic(err) + } + }) + log.Debug("servers are now up") + + // Stop the servers. + defer func() { + setups.parallel(func(s *testSetup) { + require.NoError(t, s.stopServer()) + }) + log.Debug("servers are now down") + }() + + require.True(t, waitUntil(func() bool { + for _, setup := range setups { + if err := checkFlushedDataFiles(setup.shardSet, setup.storageOpts, namesp.ID(), allData); err != nil { + // Increment the time each time it fails to make sure background processes are able to proceed. + for _, s := range setups { + s.setNowFn(s.getNowFn().Add(time.Millisecond)) + } + return false + } + } + return true + }, 60*time.Second)) + + // Verify in-memory data matches what we expect. + verifySeriesMaps(t, setups[0], namesp.ID(), allData) + verifySeriesMaps(t, setups[1], namesp.ID(), allData) + verifySeriesMaps(t, setups[2], namesp.ID(), allData) + + for _, seriesID := range testRepairOpts.node0ShouldNotContainSeries { + contains, err := containsSeries(setups[0], namesp.ID(), seriesID, now.Add(-retentionOpts.RetentionPeriod()), now) + require.NoError(t, err) + require.False(t, contains) + } + for _, seriesID := range testRepairOpts.node1ShouldNotContainSeries { + contains, err := containsSeries(setups[1], namesp.ID(), seriesID, now.Add(-retentionOpts.RetentionPeriod()), now) + require.NoError(t, err) + require.False(t, contains) + } + for _, seriesID := range testRepairOpts.node2ShouldNotContainSeries { + contains, err := containsSeries(setups[2], namesp.ID(), seriesID, now.Add(-retentionOpts.RetentionPeriod()), now) + require.NoError(t, err) + require.False(t, contains) + } +} diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index b44b0aee0d..964dba0bc8 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -537,12 +537,20 @@ func (s *service) readDatapoints( return nil, err } + // Resolve all futures (block reads can be backed by async implementations) and filter out any empty segments. + filteredBlockReaderSliceOfSlices, err := xio.FilterEmptyBlockReadersSliceOfSlicesInPlace(encoded) + if err != nil { + return nil, err + } + // Make datapoints an initialized empty array for JSON serialization as empty array than null datapoints := make([]*rpc.Datapoint, 0) multiIt := db.Options().MultiReaderIteratorPool().Get() nsCtx := namespace.NewContextFor(nsID, db.Options().SchemaRegistry()) - multiIt.ResetSliceOfSlices(xio.NewReaderSliceOfSlicesFromBlockReadersIterator(encoded), nsCtx.Schema) + multiIt.ResetSliceOfSlices( + xio.NewReaderSliceOfSlicesFromBlockReadersIterator( + filteredBlockReaderSliceOfSlices), nsCtx.Schema) defer multiIt.Close() for multiIt.Next() { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index cb0e5eb682..edbbb132c9 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -58,6 +58,7 @@ import ( m3dbruntime "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/cluster" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" @@ -664,15 +665,21 @@ func Run(runOpts RunOptions) { kvWatchClientConsistencyLevels(envCfg.KVStore, logger, clientAdminOpts, runtimeOptsMgr) + mutableSegmentAlloc := index.NewBootstrapResultMutableSegmentAllocator( + opts.IndexOptions()) + rsOpts := result.NewOptions(). + SetInstrumentOptions(opts.InstrumentOptions()). + SetDatabaseBlockOptions(opts.DatabaseBlockOptions()). + SetSeriesCachePolicy(opts.SeriesCachePolicy()). + SetIndexMutableSegmentAllocator(mutableSegmentAlloc) + opts = opts.SetRepairEnabled(false) if cfg.Repair != nil { repairOpts := opts.RepairOptions(). - SetRepairInterval(cfg.Repair.Interval). - SetRepairTimeOffset(cfg.Repair.Offset). - SetRepairTimeJitter(cfg.Repair.Jitter). SetRepairThrottle(cfg.Repair.Throttle). SetRepairCheckInterval(cfg.Repair.CheckInterval). SetAdminClient(m3dbClient). + SetResultOptions(rsOpts). SetDebugShadowComparisonsEnabled(cfg.Repair.DebugShadowComparisonsEnabled) if cfg.Repair.DebugShadowComparisonsPercentage > 0 { @@ -695,7 +702,7 @@ func Run(runOpts RunOptions) { // See GitHub issue #1013 for more details. topoMapProvider := newTopoMapProvider(topo) bs, err := cfg.Bootstrap.New(config.NewBootstrapConfigurationValidator(), - opts, topoMapProvider, origin, m3dbClient) + rsOpts, opts, topoMapProvider, origin, m3dbClient) if err != nil { logger.Fatal("could not create bootstrap process", zap.Error(err)) } @@ -725,7 +732,7 @@ func Run(runOpts RunOptions) { cfg.Bootstrap.Bootstrappers = bootstrappers updated, err := cfg.Bootstrap.New(config.NewBootstrapConfigurationValidator(), - opts, topoMapProvider, origin, m3dbClient) + rsOpts, opts, topoMapProvider, origin, m3dbClient) if err != nil { logger.Error("updated bootstrapper list failed", zap.Error(err)) return diff --git a/src/dbnode/storage/block/result.go b/src/dbnode/storage/block/result.go index c9908f4fdf..3c9943e6f1 100644 --- a/src/dbnode/storage/block/result.go +++ b/src/dbnode/storage/block/result.go @@ -234,6 +234,8 @@ func (it *filteredBlocksMetadataIter) Next() bool { return false } tagsIter.Close() + // Set to nil so it doesn't get closed again later and trigger a double-put pooling bug. + it.res[it.resIdx].Tags = nil } it.metadata = NewMetadata(it.id, tags, block.Start, block.Size, block.Checksum, block.LastRead) diff --git a/src/dbnode/storage/block/result_test.go b/src/dbnode/storage/block/result_test.go index b5c910e728..9b5b9e00a1 100644 --- a/src/dbnode/storage/block/result_test.go +++ b/src/dbnode/storage/block/result_test.go @@ -111,4 +111,10 @@ func TestFilteredBlocksMetadataIter(t *testing.T) { assert.Equal(t, expected[i].Checksum, actual[i].Checksum) assert.Equal(t, expected[i].LastRead, actual[i].LastRead) } + + for _, fetchMetadataResult := range res.Results() { + // Ensure that the consumed (and closed) tags are marked as nil so subsequent code paths + // can't trigger a double close. + require.Nil(t, fetchMetadataResult.Tags) + } } diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 10a0219657..69d6c3d684 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -128,9 +128,6 @@ func testNamespaceMap(t *testing.T) namespace.Map { func testRepairOptions(ctrl *gomock.Controller) repair.Options { return repair.NewOptions(). SetAdminClient(client.NewMockAdminClient(ctrl)). - SetRepairInterval(time.Second). - SetRepairTimeOffset(500 * time.Millisecond). - SetRepairTimeJitter(300 * time.Millisecond). SetRepairCheckInterval(100 * time.Millisecond) } diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 41b915a0f2..5d55ba03fc 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -22,12 +22,11 @@ package index import ( "bytes" - "fmt" "testing" "github.com/m3db/m3/src/m3ninx/doc" - xtest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -401,7 +400,6 @@ func TestAggResultsReset(t *testing.T) { aggResults, ok = res.(*aggregatedResults) require.True(t, ok) require.Equal(t, 100, aggResults.aggregateOpts.SizeLimit) - fmt.Println(aggResults.nsID.String()) require.Equal(t, newID.Bytes(), aggResults.nsID.Bytes()) // Ensure new NS is cloned diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 71d7af2b4a..ea69aa3d41 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1053,7 +1053,9 @@ func (n *dbNamespace) ColdFlush( nsCtx := namespace.Context{Schema: n.schemaDescr} n.RUnlock() - if !n.nopts.ColdWritesEnabled() { + // If repair is enabled we still need cold flush regardless of whether cold writes is + // enabled since repairs are dependent on the cold flushing logic. + if !n.nopts.ColdWritesEnabled() && !n.nopts.RepairEnabled() { n.metrics.flushColdData.ReportSuccess(n.nowFn().Sub(callStart)) return nil } @@ -1245,6 +1247,7 @@ func (n *dbNamespace) Repair( n.RLock() nsCtx := n.nsContextWithRLock() + nsMeta := n.metadata n.RUnlock() for _, shard := range shards { @@ -1257,7 +1260,7 @@ func (n *dbNamespace) Repair( ctx := n.opts.ContextPool().Get() defer ctx.Close() - metadataRes, err := shard.Repair(ctx, nsCtx, tr, repairer) + metadataRes, err := shard.Repair(ctx, nsCtx, nsMeta, tr, repairer) mutex.Lock() if err != nil { diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 1de5e156bc..2d83f4b17f 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -635,7 +635,7 @@ func TestNamespaceRepair(t *testing.T) { } } shard.EXPECT(). - Repair(gomock.Any(), gomock.Any(), repairTimeRange, repairer). + Repair(gomock.Any(), gomock.Any(), gomock.Any(), repairTimeRange, repairer). Return(res, errs[i]) ns.shards[testShardIDs[i].ID()] = shard } diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index 6598e611ae..a8d483239a 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -25,7 +25,6 @@ import ( "errors" "fmt" "math" - "math/rand" "strconv" "sync" "sync/atomic" @@ -34,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/repair" @@ -42,6 +42,7 @@ import ( "github.com/m3db/m3/src/x/dice" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" "github.com/jhump/protoreflect/dynamic" @@ -56,6 +57,7 @@ var ( type recordFn func(namespace ident.ID, shard databaseShard, diffRes repair.MetadataComparisonResult) +// TODO(rartoul): See if we can find a way to guard against too much metadata. type shardRepairer struct { opts Options rpopts repair.Options @@ -90,6 +92,7 @@ func (r shardRepairer) Options() repair.Options { func (r shardRepairer) Repair( ctx context.Context, nsCtx namespace.Context, + nsMeta namespace.Metadata, tr xtime.Range, shard databaseShard, ) (repair.MetadataComparisonResult, error) { @@ -99,13 +102,12 @@ func (r shardRepairer) Repair( } var ( - start = tr.Start - end = tr.End - origin = session.Origin() - replicas = session.Replicas() + start = tr.Start + end = tr.End + origin = session.Origin() ) - metadata := repair.NewReplicaMetadataComparer(replicas, r.rpopts) + metadata := repair.NewReplicaMetadataComparer(origin, r.rpopts) ctx.RegisterFinalizer(metadata) // Add local metadata @@ -117,6 +119,8 @@ func (r shardRepairer) Repair( accumLocalMetadata = block.NewFetchBlocksMetadataResults() pageToken PageToken ) + // Safe to register since by the time this function completes we won't be using the metadata + // for anything anymore. ctx.RegisterCloser(accumLocalMetadata) for { @@ -150,20 +154,20 @@ func (r shardRepairer) Repair( r.logger.Error( "Shadow compare failed", zap.Error(err)) - return repair.MetadataComparisonResult{}, err } } localIter := block.NewFilteredBlocksMetadataIter(accumLocalMetadata) - err = metadata.AddLocalMetadata(origin, localIter) + err = metadata.AddLocalMetadata(localIter) if err != nil { return repair.MetadataComparisonResult{}, err } - // Add peer metadata + rsOpts := r.opts.RepairOptions().ResultOptions() + // Add peer metadata. level := r.rpopts.RepairConsistencyLevel() peerIter, err := session.FetchBlocksMetadataFromPeers(nsCtx.ID, shard.ID(), start, end, - level, result.NewOptions()) + level, rsOpts) if err != nil { return repair.MetadataComparisonResult{}, err } @@ -171,7 +175,62 @@ func (r shardRepairer) Repair( return repair.MetadataComparisonResult{}, err } - metadataRes := metadata.Compare() + var ( + // TODO(rartoul): Pool this slice. + metadatasToFetchBlocksFor = []block.ReplicaMetadata{} + metadataRes = metadata.Compare() + seriesWithChecksumMismatches = metadataRes.ChecksumDifferences.Series() + ) + + for _, e := range seriesWithChecksumMismatches.Iter() { + for blockStart, replicaMetadataBlocks := range e.Value().Metadata.Blocks() { + blStartTime := blockStart.ToTime() + blStartRange := xtime.Range{Start: blStartTime, End: blStartTime} + if !tr.Contains(blStartRange) { + instrument.EmitAndLogInvariantViolation(r.opts.InstrumentOptions(), func(l *zap.Logger) { + l.With( + zap.Time("blockStart", blockStart.ToTime()), + zap.String("namespace", nsMeta.ID().String()), + zap.Uint32("shard", shard.ID()), + ).Error("repair received replica metadata for unrequested blockStart") + }) + continue + } + + for _, replicaMetadata := range replicaMetadataBlocks.Metadata() { + if replicaMetadata.Host.ID() == session.Origin().ID() { + // Don't request blocks for self metadata. + continue + } + metadatasToFetchBlocksFor = append(metadatasToFetchBlocksFor, replicaMetadata) + } + } + } + + perSeriesReplicaIter, err := session.FetchBlocksFromPeers(nsMeta, shard.ID(), level, metadatasToFetchBlocksFor, rsOpts) + if err != nil { + return repair.MetadataComparisonResult{}, err + } + + // TODO(rartoul): Copying the IDs for the purposes of the map key is wasteful. Considering using + // SetUnsafe or marking as NoFinalize() and making the map check IsNoFinalize(). + numMismatchSeries := seriesWithChecksumMismatches.Len() + results := result.NewShardResult(numMismatchSeries, rsOpts) + for perSeriesReplicaIter.Next() { + _, id, block := perSeriesReplicaIter.Current() + // TODO(rartoul): Handle tags in both branches: https://github.com/m3db/m3/issues/1848 + if existing, ok := results.BlockAt(id, block.StartTime()); ok { + if err := existing.Merge(block); err != nil { + return repair.MetadataComparisonResult{}, err + } + } else { + results.AddBlock(id, ident.Tags{}, block) + } + } + + if err := shard.Load(results.AllSeries()); err != nil { + return repair.MetadataComparisonResult{}, err + } r.recordFn(nsCtx.ID, shard, metadataRes) @@ -219,8 +278,8 @@ const ( ) type repairState struct { + LastAttempt time.Time Status repairStatus - NumFailures int } type namespaceRepairStateByTime map[xtime.UnixNano]repairState @@ -270,6 +329,7 @@ func (r repairStatesByNs) setRepairState( // it with a mutex. type dbRepairer struct { database database + opts Options ropts repair.Options shardRepairer databaseShardRepairer repairStatesByNs repairStatesByNs @@ -278,11 +338,8 @@ type dbRepairer struct { sleepFn sleepFn nowFn clock.NowFn logger *zap.Logger - repairInterval time.Duration - repairTimeOffset time.Duration - repairTimeJitter time.Duration repairCheckInterval time.Duration - repairMaxRetries int + scope tally.Scope status tally.Gauge closedLock sync.Mutex @@ -291,9 +348,11 @@ type dbRepairer struct { } func newDatabaseRepairer(database database, opts Options) (databaseRepairer, error) { - nowFn := opts.ClockOptions().NowFn() - scope := opts.InstrumentOptions().MetricsScope() - ropts := opts.RepairOptions() + var ( + nowFn = opts.ClockOptions().NowFn() + scope = opts.InstrumentOptions().MetricsScope().SubScope("repair") + ropts = opts.RepairOptions() + ) if ropts == nil { return nil, errNoRepairOptions } @@ -303,25 +362,17 @@ func newDatabaseRepairer(database database, opts Options) (databaseRepairer, err shardRepairer := newShardRepairer(opts, ropts) - var jitter time.Duration - if repairJitter := ropts.RepairTimeJitter(); repairJitter > 0 { - src := rand.NewSource(nowFn().UnixNano()) - jitter = time.Duration(float64(repairJitter) * (float64(src.Int63()) / float64(math.MaxInt64))) - } - r := &dbRepairer{ database: database, + opts: opts, ropts: ropts, shardRepairer: shardRepairer, repairStatesByNs: newRepairStates(), sleepFn: time.Sleep, nowFn: nowFn, logger: opts.InstrumentOptions().Logger(), - repairInterval: ropts.RepairInterval(), - repairTimeOffset: ropts.RepairTimeOffset(), - repairTimeJitter: jitter, repairCheckInterval: ropts.RepairCheckInterval(), - repairMaxRetries: ropts.RepairMaxRetries(), + scope: scope, status: scope.Gauge("repair"), } r.repairFn = r.Repair @@ -330,8 +381,6 @@ func newDatabaseRepairer(database database, opts Options) (databaseRepairer, err } func (r *dbRepairer) run() { - var curIntervalStart time.Time - for { r.closedLock.Lock() closed := r.closed @@ -343,61 +392,23 @@ func (r *dbRepairer) run() { r.sleepFn(r.repairCheckInterval) - now := r.nowFn() - intervalStart := now.Truncate(r.repairInterval) - - // If we haven't reached the offset yet, skip - target := intervalStart.Add(r.repairTimeOffset + r.repairTimeJitter) - if now.Before(target) { - continue - } - - // If we are in the same interval, we must have already repaired, skip - if intervalStart.Equal(curIntervalStart) { - continue - } - - curIntervalStart = intervalStart if err := r.repairFn(); err != nil { r.logger.Error("error repairing database", zap.Error(err)) } } } -func (r *dbRepairer) namespaceRepairTimeRanges(ns databaseNamespace) xtime.Ranges { +func (r *dbRepairer) namespaceRepairTimeRange(ns databaseNamespace) xtime.Range { var ( - now = r.nowFn() - rtopts = ns.Options().RetentionOptions() - blockSize = rtopts.BlockSize() - start = now.Add(-rtopts.RetentionPeriod()).Truncate(blockSize) - end = now.Add(-rtopts.BufferPast()).Truncate(blockSize) + now = r.nowFn() + rtopts = ns.Options().RetentionOptions() ) - - targetRanges := xtime.NewRanges(xtime.Range{Start: start, End: end}) - for tNano := range r.repairStatesByNs[ns.ID().String()] { - t := tNano.ToTime() - if !r.needsRepair(ns.ID(), t) { - targetRanges = targetRanges.RemoveRange(xtime.Range{Start: t, End: t.Add(blockSize)}) - } - } - - return targetRanges -} - -func (r *dbRepairer) needsRepair(ns ident.ID, t time.Time) bool { - repairState, exists := r.repairStatesByNs.repairStates(ns, t) - if !exists { - return true - } - return repairState.Status == repairNotStarted || - (repairState.Status == repairFailed && repairState.NumFailures < r.repairMaxRetries) + return xtime.Range{ + Start: retention.FlushTimeStart(rtopts, now), + End: retention.FlushTimeEnd(rtopts, now)} } func (r *dbRepairer) Start() { - if r.repairInterval <= 0 { - return - } - go r.run() } @@ -407,6 +418,19 @@ func (r *dbRepairer) Stop() { r.closedLock.Unlock() } +// Repair will analyze the current repair state for each namespace/blockStart combination and pick one blockStart +// per namespace to repair. It will prioritize blocks that have never been repaired over those that have been +// repaired before, and it will prioritize more recent blocks over older ones. If all blocks have been repaired +// before then it will prioritize the least recently repaired block. +// +// The Repair function only attempts to repair one block at a time because this allows the background repair process +// to run its prioritization logic more frequently. For example, if we attempted to repair all blocks in one pass, +// even with appropriate backpressure, this could lead to situations where recent blocks are not repaired for a +// substantial amount of time whereas with the current approach the longest delay between running the prioritization +// logic is the amount of time it takes to repair one block for all shards. +// +// Long term we will want to move to a model that actually tracks state for individual shard/blockStart combinations, +// not just blockStarts. func (r *dbRepairer) Repair() error { // Don't attempt a repair if the database is not bootstrapped yet if !r.database.IsBootstrapped() { @@ -426,12 +450,77 @@ func (r *dbRepairer) Repair() error { if err != nil { return err } + for _, n := range namespaces { - iter := r.namespaceRepairTimeRanges(n).Iter() - for iter.Next() { - multiErr = multiErr.Add(r.repairNamespaceWithTimeRange(n, iter.Value())) + repairRange := r.namespaceRepairTimeRange(n) + blockSize := n.Options().RetentionOptions().BlockSize() + + // Iterating backwards will be exclusive on the start, but we want to be inclusive on the + // start so subtract a blocksize. + repairRange.Start = repairRange.Start.Add(-blockSize) + + var ( + numUnrepairedBlocks = 0 + hasRepairedABlockStart = false + leastRecentlyRepairedBlockStart time.Time + leastRecentlyRepairedBlockStartLastRepairTime time.Time + ) + repairRange.IterateBackward(blockSize, func(blockStart time.Time) bool { + repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart) + if ok && (leastRecentlyRepairedBlockStart.IsZero() || + repairState.LastAttempt.Before(leastRecentlyRepairedBlockStartLastRepairTime)) { + leastRecentlyRepairedBlockStart = blockStart + leastRecentlyRepairedBlockStartLastRepairTime = repairState.LastAttempt + } + + if ok && repairState.Status == repairSuccess { + return true + } + + // Failed or unrepair block from this point onwards. + numUnrepairedBlocks++ + if hasRepairedABlockStart { + // Only want to repair one namespace/blockStart per call to Repair() + // so once we've repaired a single blockStart we don't perform any + // more actual repairs although we do keep iterating so that we can + // emit an accurate value for the "num-unrepaired-blocks" gauge. + return true + } + + if err := r.repairNamespaceBlockstart(n, blockStart); err != nil { + multiErr = multiErr.Add(err) + } + hasRepairedABlockStart = true + + return true + }) + + // Update metrics with statistics about repair status. + r.scope.Tagged(map[string]string{ + "namespace": n.ID().String(), + }).Gauge("num-unrepaired-blocks").Update(float64(numUnrepairedBlocks)) + + secondsSinceLastRepair := r.nowFn().Sub(leastRecentlyRepairedBlockStartLastRepairTime).Seconds() + r.scope.Tagged(map[string]string{ + "namespace": n.ID().String(), + }).Gauge("max-seconds-since-last-block-repair").Update(secondsSinceLastRepair) + + if hasRepairedABlockStart { + // Previous loop performed a repair which means we've hit our limit of repairing + // one block per namespace per call to Repair() so we can skip the logic below. + continue + } + + // If we've made it this far that means that there were no unrepaired blocks which means we should + // repair the least recently repaired block instead. + if leastRecentlyRepairedBlockStart.IsZero() { + continue + } + if err := r.repairNamespaceBlockstart(n, leastRecentlyRepairedBlockStart); err != nil { + multiErr = multiErr.Add(err) } } + return multiErr.FinalError() } @@ -443,31 +532,38 @@ func (r *dbRepairer) Report() { } } -func (r *dbRepairer) repairNamespaceWithTimeRange(n databaseNamespace, tr xtime.Range) error { +func (r *dbRepairer) repairNamespaceBlockstart(n databaseNamespace, blockStart time.Time) error { var ( - rtopts = n.Options().RetentionOptions() - blockSize = rtopts.BlockSize() - err error + blockSize = n.Options().RetentionOptions().BlockSize() + repairRange = xtime.Range{Start: blockStart, End: blockStart.Add(blockSize)} + repairTime = r.nowFn() ) - - // repair the namespace - if err = n.Repair(r.shardRepairer, tr); err != nil { - err = fmt.Errorf("namespace %s failed to repair time range %v: %v", n.ID().String(), tr, err) + if err := r.repairNamespaceWithTimeRange(n, repairRange); err != nil { + r.markRepairAttempt(n.ID(), blockStart, repairTime, repairFailed) + return err } - // update repairer state - for t := tr.Start; t.Before(tr.End); t = t.Add(blockSize) { - repairState, _ := r.repairStatesByNs.repairStates(n.ID(), t) - if err == nil { - repairState.Status = repairSuccess - } else { - repairState.Status = repairFailed - repairState.NumFailures++ - } - r.repairStatesByNs.setRepairState(n.ID(), t, repairState) + r.markRepairAttempt(n.ID(), blockStart, repairTime, repairSuccess) + return nil +} + +func (r *dbRepairer) repairNamespaceWithTimeRange(n databaseNamespace, tr xtime.Range) error { + if err := n.Repair(r.shardRepairer, tr); err != nil { + return fmt.Errorf("namespace %s failed to repair time range %v: %v", n.ID().String(), tr, err) } - return err + return nil +} + +func (r *dbRepairer) markRepairAttempt( + namespace ident.ID, + blockStart time.Time, + repairTime time.Time, + repairStatus repairStatus) { + repairState, _ := r.repairStatesByNs.repairStates(namespace, blockStart) + repairState.Status = repairStatus + repairState.LastAttempt = repairTime + r.repairStatesByNs.setRepairState(namespace, blockStart, repairState) } var noOpRepairer databaseRepairer = repairerNoOp{} @@ -512,10 +608,16 @@ func (r shardRepairer) shadowCompare( tmpCtx.Reset() defer tmpCtx.BlockingClose() - localSeriesDataBlocks, err := shard.ReadEncoded(tmpCtx, seriesID, start, end, nsCtx) + + unfilteredLocalSeriesDataBlocks, err := shard.ReadEncoded(tmpCtx, seriesID, start, end, nsCtx) if err != nil { return err } + localSeriesDataBlocks, err := xio.FilterEmptyBlockReadersSliceOfSlicesInPlace(unfilteredLocalSeriesDataBlocks) + if err != nil { + return err + } + localSeriesSliceOfSlices := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(localSeriesDataBlocks) localSeriesIter := r.opts.MultiReaderIteratorPool().Get() localSeriesIter.ResetSliceOfSlices(localSeriesSliceOfSlices, nsCtx.Schema) diff --git a/src/dbnode/storage/repair/metadata.go b/src/dbnode/storage/repair/metadata.go index deca8f478e..3eebc373de 100644 --- a/src/dbnode/storage/repair/metadata.go +++ b/src/dbnode/storage/repair/metadata.go @@ -35,36 +35,36 @@ const ( defaultReplicaSeriesMetadataCapacity = 4096 ) -type hostBlockMetadataSlice struct { - metadata []HostBlockMetadata - pool HostBlockMetadataSlicePool +type replicaMetadataSlice struct { + metadata []block.ReplicaMetadata + pool ReplicaMetadataSlicePool } -func newHostBlockMetadataSlice() HostBlockMetadataSlice { - return &hostBlockMetadataSlice{} +func newReplicaMetadataSlice() ReplicaMetadataSlice { + return &replicaMetadataSlice{} } -func newPooledHostBlockMetadataSlice(metadata []HostBlockMetadata, pool HostBlockMetadataSlicePool) HostBlockMetadataSlice { - return &hostBlockMetadataSlice{metadata: metadata, pool: pool} +func newPooledReplicaMetadataSlice(metadata []block.ReplicaMetadata, pool ReplicaMetadataSlicePool) ReplicaMetadataSlice { + return &replicaMetadataSlice{metadata: metadata, pool: pool} } -func (s *hostBlockMetadataSlice) Add(metadata HostBlockMetadata) { +func (s *replicaMetadataSlice) Add(metadata block.ReplicaMetadata) { s.metadata = append(s.metadata, metadata) } -func (s *hostBlockMetadataSlice) Metadata() []HostBlockMetadata { +func (s *replicaMetadataSlice) Metadata() []block.ReplicaMetadata { return s.metadata } -func (s *hostBlockMetadataSlice) Reset() { - var zeroed HostBlockMetadata +func (s *replicaMetadataSlice) Reset() { + var zeroed block.ReplicaMetadata for i := range s.metadata { s.metadata[i] = zeroed } s.metadata = s.metadata[:0] } -func (s *hostBlockMetadataSlice) Close() { +func (s *replicaMetadataSlice) Close() { if s.pool != nil { s.pool.Put(s) } @@ -72,18 +72,18 @@ func (s *hostBlockMetadataSlice) Close() { type replicaBlockMetadata struct { start time.Time - metadata HostBlockMetadataSlice + metadata ReplicaMetadataSlice } // NewReplicaBlockMetadata creates a new replica block metadata -func NewReplicaBlockMetadata(start time.Time, p HostBlockMetadataSlice) ReplicaBlockMetadata { +func NewReplicaBlockMetadata(start time.Time, p ReplicaMetadataSlice) ReplicaBlockMetadata { return replicaBlockMetadata{start: start, metadata: p} } -func (m replicaBlockMetadata) Start() time.Time { return m.start } -func (m replicaBlockMetadata) Metadata() []HostBlockMetadata { return m.metadata.Metadata() } -func (m replicaBlockMetadata) Add(metadata HostBlockMetadata) { m.metadata.Add(metadata) } -func (m replicaBlockMetadata) Close() { m.metadata.Close() } +func (m replicaBlockMetadata) Start() time.Time { return m.start } +func (m replicaBlockMetadata) Metadata() []block.ReplicaMetadata { return m.metadata.Metadata() } +func (m replicaBlockMetadata) Add(metadata block.ReplicaMetadata) { m.metadata.Add(metadata) } +func (m replicaBlockMetadata) Close() { m.metadata.Close() } type replicaBlocksMetadata map[xtime.UnixNano]ReplicaBlockMetadata @@ -98,7 +98,7 @@ func (m replicaBlocksMetadata) Add(block ReplicaBlockMetadata) { m[xtime.ToUnixNano(block.Start())] = block } -func (m replicaBlocksMetadata) GetOrAdd(start time.Time, p HostBlockMetadataSlicePool) ReplicaBlockMetadata { +func (m replicaBlocksMetadata) GetOrAdd(start time.Time, p ReplicaMetadataSlicePool) ReplicaBlockMetadata { startNano := xtime.ToUnixNano(start) block, exists := m[startNano] if exists { @@ -160,28 +160,27 @@ func (m replicaSeriesMetadata) Close() { } type replicaMetadataComparer struct { - replicas int - metadata ReplicaSeriesMetadata - hostBlockMetadataSlicePool HostBlockMetadataSlicePool + origin topology.Host + metadata ReplicaSeriesMetadata + replicaMetadataSlicePool ReplicaMetadataSlicePool } // NewReplicaMetadataComparer creates a new replica metadata comparer -func NewReplicaMetadataComparer(replicas int, opts Options) ReplicaMetadataComparer { +func NewReplicaMetadataComparer(origin topology.Host, opts Options) ReplicaMetadataComparer { return replicaMetadataComparer{ - replicas: replicas, - metadata: NewReplicaSeriesMetadata(), - hostBlockMetadataSlicePool: opts.HostBlockMetadataSlicePool(), + origin: origin, + metadata: NewReplicaSeriesMetadata(), + replicaMetadataSlicePool: opts.ReplicaMetadataSlicePool(), } } -func (m replicaMetadataComparer) AddLocalMetadata(origin topology.Host, localIter block.FilteredBlocksMetadataIter) error { +func (m replicaMetadataComparer) AddLocalMetadata(localIter block.FilteredBlocksMetadataIter) error { for localIter.Next() { - id, block := localIter.Current() + id, localBlock := localIter.Current() blocks := m.metadata.GetOrAdd(id) - blocks.GetOrAdd(block.Start, m.hostBlockMetadataSlicePool).Add(HostBlockMetadata{ - Host: origin, - Size: block.Size, - Checksum: block.Checksum, + blocks.GetOrAdd(localBlock.Start, m.replicaMetadataSlicePool).Add(block.ReplicaMetadata{ + Host: m.origin, + Metadata: localBlock, }) } @@ -192,10 +191,9 @@ func (m replicaMetadataComparer) AddPeerMetadata(peerIter client.PeerBlockMetada for peerIter.Next() { peer, peerBlock := peerIter.Current() blocks := m.metadata.GetOrAdd(peerBlock.ID) - blocks.GetOrAdd(peerBlock.Start, m.hostBlockMetadataSlicePool).Add(HostBlockMetadata{ + blocks.GetOrAdd(peerBlock.Start, m.replicaMetadataSlicePool).Add(block.ReplicaMetadata{ Host: peer, - Size: peerBlock.Size, - Checksum: peerBlock.Checksum, + Metadata: peerBlock, }) } @@ -214,35 +212,43 @@ func (m replicaMetadataComparer) Compare() MetadataComparisonResult { bm := b.Metadata() var ( - numHostsWithSize int - sizeVal int64 - sameSize = true - firstSize = true - numHostsWithChecksum int - checksumVal uint32 - sameChecksum = true - firstChecksum = true + originContainsBlock = false + sizeVal int64 + sameSize = true + firstSize = true + checksumVal uint32 + sameChecksum = true + firstChecksum = true ) for _, hm := range bm { - // Check size - if hm.Size != 0 { - numHostsWithSize++ - if firstSize { - sizeVal = hm.Size - firstSize = false - } else if hm.Size != sizeVal { - sameSize = false + if !originContainsBlock { + if hm.Host.String() == m.origin.String() { + originContainsBlock = true } } - // Check checksum - if hm.Checksum != nil { - numHostsWithChecksum++ + // Check size. + if firstSize { + sizeVal = hm.Metadata.Size + firstSize = false + } else if hm.Metadata.Size != sizeVal { + sameSize = false + } + + // If a previous metadata had a checksum and this one does not + // then assume the checksums mismatch. + if hm.Metadata.Checksum == nil && !firstChecksum { + sameChecksum = false + continue + } + + // Check checksum. + if hm.Metadata.Checksum != nil { if firstChecksum { - checksumVal = *hm.Checksum + checksumVal = *hm.Metadata.Checksum firstChecksum = false - } else if *hm.Checksum != checksumVal { + } else if *hm.Metadata.Checksum != checksumVal { sameChecksum = false } } @@ -250,13 +256,13 @@ func (m replicaMetadataComparer) Compare() MetadataComparisonResult { // If only a subset of hosts in the replica set have sizes, or the sizes differ, // we record this block - if !(numHostsWithSize == m.replicas && sameSize) { + if !originContainsBlock || !sameSize { sizeDiff.GetOrAdd(series.ID).Add(b) } // If only a subset of hosts in the replica set have checksums, or the checksums // differ, we record this block - if !(numHostsWithChecksum == m.replicas && sameChecksum) { + if !originContainsBlock || !sameChecksum { checkSumDiff.GetOrAdd(series.ID).Add(b) } } diff --git a/src/dbnode/storage/repair/metadata_pool.go b/src/dbnode/storage/repair/metadata_pool.go index a113390bde..b51be85b0a 100644 --- a/src/dbnode/storage/repair/metadata_pool.go +++ b/src/dbnode/storage/repair/metadata_pool.go @@ -20,28 +20,31 @@ package repair -import "github.com/m3db/m3/src/x/pool" +import ( + "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/x/pool" +) -type hostBlockMetadataSlicePool struct { +type replicaMetadataSlicePool struct { pool pool.ObjectPool capacity int } -// NewHostBlockMetadataSlicePool creates a new hostBlockMetadataSlice pool -func NewHostBlockMetadataSlicePool(opts pool.ObjectPoolOptions, capacity int) HostBlockMetadataSlicePool { - p := &hostBlockMetadataSlicePool{pool: pool.NewObjectPool(opts), capacity: capacity} +// NewReplicaMetadataSlicePool creates a new replicaMetadataSlicePool pool +func NewReplicaMetadataSlicePool(opts pool.ObjectPoolOptions, capacity int) ReplicaMetadataSlicePool { + p := &replicaMetadataSlicePool{pool: pool.NewObjectPool(opts), capacity: capacity} p.pool.Init(func() interface{} { - metadata := make([]HostBlockMetadata, 0, capacity) - return newPooledHostBlockMetadataSlice(metadata, p) + metadata := make([]block.ReplicaMetadata, 0, capacity) + return newPooledReplicaMetadataSlice(metadata, p) }) return p } -func (p *hostBlockMetadataSlicePool) Get() HostBlockMetadataSlice { - return p.pool.Get().(HostBlockMetadataSlice) +func (p *replicaMetadataSlicePool) Get() ReplicaMetadataSlice { + return p.pool.Get().(ReplicaMetadataSlice) } -func (p *hostBlockMetadataSlicePool) Put(res HostBlockMetadataSlice) { +func (p *replicaMetadataSlicePool) Put(res ReplicaMetadataSlice) { res.Reset() p.pool.Put(res) } diff --git a/src/dbnode/storage/repair/metadata_pool_test.go b/src/dbnode/storage/repair/metadata_pool_test.go index b6c9721bd6..e194edaa39 100644 --- a/src/dbnode/storage/repair/metadata_pool_test.go +++ b/src/dbnode/storage/repair/metadata_pool_test.go @@ -23,18 +23,19 @@ package repair import ( "testing" + "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/x/pool" "github.com/stretchr/testify/require" ) -func TestHostBlockMetadataSlicePoolResetOnPut(t *testing.T) { +func TestReplicaMetadataSlicePoolResetOnPut(t *testing.T) { opts := pool.NewObjectPoolOptions().SetSize(1) - p := NewHostBlockMetadataSlicePool(opts, 64) + p := NewReplicaMetadataSlicePool(opts, 64) res := p.Get() // Make res non-empty - res.Add(HostBlockMetadata{}) + res.Add(block.ReplicaMetadata{}) require.Equal(t, 1, len(res.Metadata())) // Return res to pool diff --git a/src/dbnode/storage/repair/metadata_test.go b/src/dbnode/storage/repair/metadata_test.go index ba59e20f67..8085d5a024 100644 --- a/src/dbnode/storage/repair/metadata_test.go +++ b/src/dbnode/storage/repair/metadata_test.go @@ -35,8 +35,8 @@ import ( "github.com/stretchr/testify/require" ) -func testHostBlockMetadataSlicePool() HostBlockMetadataSlicePool { - return NewHostBlockMetadataSlicePool(nil, 0) +func testReplicaMetadataSlicePool() ReplicaMetadataSlicePool { + return NewReplicaMetadataSlicePool(nil, 0) } func testRepairOptions() Options { @@ -44,11 +44,16 @@ func testRepairOptions() Options { } func TestReplicaBlockMetadataAdd(t *testing.T) { + meta1 := block.NewMetadata( + ident.StringID("some-id"), ident.Tags{}, time.Time{}, 1, nil, time.Time{}) + meta2 := block.NewMetadata( + ident.StringID("some-id"), ident.Tags{}, time.Time{}, 2, new(uint32), time.Time{}) + now := time.Now() - m := NewReplicaBlockMetadata(now, newHostBlockMetadataSlice()) - inputs := []HostBlockMetadata{ - {topology.NewHost("foo", "addrFoo"), 1, nil}, - {topology.NewHost("bar", "addrBar"), 2, new(uint32)}, + m := NewReplicaBlockMetadata(now, newReplicaMetadataSlice()) + inputs := []block.ReplicaMetadata{ + {Host: topology.NewHost("foo", "addrFoo"), Metadata: meta1}, + {Host: topology.NewHost("bar", "addrBar"), Metadata: meta2}, } for _, input := range inputs { m.Add(input) @@ -59,7 +64,7 @@ func TestReplicaBlockMetadataAdd(t *testing.T) { func TestReplicaBlocksMetadataAdd(t *testing.T) { now := time.Now() - block := NewReplicaBlockMetadata(now, newHostBlockMetadataSlice()) + block := NewReplicaBlockMetadata(now, newReplicaMetadataSlice()) m := NewReplicaBlocksMetadata() m.Add(block) @@ -77,7 +82,7 @@ func TestReplicaBlocksMetadataGetOrAdd(t *testing.T) { require.Equal(t, 0, len(m.Blocks())) // Add a block - b := m.GetOrAdd(now, testHostBlockMetadataSlicePool()) + b := m.GetOrAdd(now, testReplicaMetadataSlicePool()) require.Equal(t, now, b.Start()) blocks := m.Blocks() require.Equal(t, 1, len(blocks)) @@ -86,7 +91,7 @@ func TestReplicaBlocksMetadataGetOrAdd(t *testing.T) { require.Equal(t, now, block.Start()) // Add the same block and check we don't add new blocks - m.GetOrAdd(now, testHostBlockMetadataSlicePool()) + m.GetOrAdd(now, testReplicaMetadataSlicePool()) require.Equal(t, 1, len(m.Blocks())) } @@ -108,7 +113,7 @@ func TestReplicaSeriesMetadataGetOrAdd(t *testing.T) { type testBlock struct { id ident.ID ts time.Time - blocks []HostBlockMetadata + blocks []block.ReplicaMetadata } func assertEqual(t *testing.T, expected []testBlock, actual ReplicaSeriesMetadata) { @@ -146,14 +151,14 @@ func TestReplicaMetadataComparerAddLocalMetadata(t *testing.T) { localIter.EXPECT().Err().Return(nil), ) - m := NewReplicaMetadataComparer(3, testRepairOptions()).(replicaMetadataComparer) - err := m.AddLocalMetadata(origin, localIter) + m := NewReplicaMetadataComparer(origin, testRepairOptions()).(replicaMetadataComparer) + err := m.AddLocalMetadata(localIter) require.NoError(t, err) expected := []testBlock{ - {inputBlocks[0].ID, inputBlocks[0].Start, []HostBlockMetadata{{origin, inputBlocks[0].Size, inputBlocks[0].Checksum}}}, - {inputBlocks[1].ID, inputBlocks[1].Start, []HostBlockMetadata{{origin, inputBlocks[1].Size, inputBlocks[1].Checksum}}}, - {inputBlocks[2].ID, inputBlocks[2].Start, []HostBlockMetadata{{origin, inputBlocks[2].Size, inputBlocks[2].Checksum}}}, + {inputBlocks[0].ID, inputBlocks[0].Start, []block.ReplicaMetadata{{Host: origin, Metadata: inputBlocks[0]}}}, + {inputBlocks[1].ID, inputBlocks[1].Start, []block.ReplicaMetadata{{Host: origin, Metadata: inputBlocks[1]}}}, + {inputBlocks[2].ID, inputBlocks[2].Start, []block.ReplicaMetadata{{Host: origin, Metadata: inputBlocks[2]}}}, } assertEqual(t, expected, m.metadata) } @@ -164,28 +169,25 @@ func TestReplicaMetadataComparerAddPeerMetadata(t *testing.T) { now := time.Now() peerIter := client.NewMockPeerBlockMetadataIter(ctrl) - inputBlocks := []struct { - host topology.Host - meta block.Metadata - }{ + inputBlocks := []block.ReplicaMetadata{ { - host: topology.NewHost("1", "addr1"), - meta: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, + Host: topology.NewHost("1", "addr1"), + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now, int64(0), new(uint32), time.Time{}), }, { - host: topology.NewHost("1", "addr1"), - meta: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, + Host: topology.NewHost("1", "addr1"), + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(time.Second), int64(1), new(uint32), time.Time{}), }, { - host: topology.NewHost("2", "addr2"), - meta: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, + Host: topology.NewHost("2", "addr2"), + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now, int64(2), nil, time.Time{}), }, { - host: topology.NewHost("2", "addr2"), - meta: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, + Host: topology.NewHost("2", "addr2"), + Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(time.Second), int64(3), nil, time.Time{}), }, } @@ -193,30 +195,30 @@ func TestReplicaMetadataComparerAddPeerMetadata(t *testing.T) { gomock.InOrder( peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[0].host, inputBlocks[0].meta), + peerIter.EXPECT().Current().Return(inputBlocks[0].Host, inputBlocks[0].Metadata), peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[1].host, inputBlocks[1].meta), + peerIter.EXPECT().Current().Return(inputBlocks[1].Host, inputBlocks[1].Metadata), peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[2].host, inputBlocks[2].meta), + peerIter.EXPECT().Current().Return(inputBlocks[2].Host, inputBlocks[2].Metadata), peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[3].host, inputBlocks[3].meta), + peerIter.EXPECT().Current().Return(inputBlocks[3].Host, inputBlocks[3].Metadata), peerIter.EXPECT().Next().Return(false), peerIter.EXPECT().Err().Return(expectedErr), ) - m := NewReplicaMetadataComparer(3, testRepairOptions()).(replicaMetadataComparer) + m := NewReplicaMetadataComparer(inputBlocks[0].Host, testRepairOptions()).(replicaMetadataComparer) require.Equal(t, expectedErr, m.AddPeerMetadata(peerIter)) expected := []testBlock{ - {ident.StringID("foo"), inputBlocks[0].meta.Start, []HostBlockMetadata{ - {inputBlocks[0].host, inputBlocks[0].meta.Size, inputBlocks[0].meta.Checksum}, - {inputBlocks[2].host, inputBlocks[2].meta.Size, inputBlocks[2].meta.Checksum}, + {ident.StringID("foo"), inputBlocks[0].Metadata.Start, []block.ReplicaMetadata{ + inputBlocks[0], + inputBlocks[2], }}, - {ident.StringID("foo"), inputBlocks[1].meta.Start, []HostBlockMetadata{ - {inputBlocks[1].host, inputBlocks[1].meta.Size, inputBlocks[1].meta.Checksum}, + {ident.StringID("foo"), inputBlocks[1].Metadata.Start, []block.ReplicaMetadata{ + inputBlocks[1], }}, - {ident.StringID("bar"), inputBlocks[3].meta.Start, []HostBlockMetadata{ - {inputBlocks[3].host, inputBlocks[3].meta.Size, inputBlocks[3].meta.Checksum}, + {ident.StringID("bar"), inputBlocks[3].Metadata.Start, []block.ReplicaMetadata{ + inputBlocks[3], }}, } assertEqual(t, expected, m.metadata) @@ -231,61 +233,75 @@ func TestReplicaMetadataComparerCompare(t *testing.T) { metadata := NewReplicaSeriesMetadata() defer metadata.Close() - inputs := []struct { - host topology.Host - id string - ts time.Time - size int64 - checksum uint32 - hasChecksum bool - }{ - {hosts[0], "foo", now, int64(1), uint32(10), true}, - {hosts[1], "foo", now, int64(1), uint32(10), true}, - {hosts[0], "bar", now.Add(time.Second), int64(0), uint32(10), true}, - {hosts[1], "bar", now.Add(time.Second), int64(1), uint32(10), true}, - {hosts[0], "baz", now.Add(2 * time.Second), int64(2), uint32(20), true}, - {hosts[1], "baz", now.Add(2 * time.Second), int64(2), uint32(0), false}, - {hosts[0], "gah", now.Add(3 * time.Second), int64(1), uint32(10), true}, + ten := uint32(10) + twenty := uint32(20) + inputs := []block.ReplicaMetadata{ + block.ReplicaMetadata{ + Host: hosts[0], + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now, int64(1), &ten, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[1], + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now, int64(1), &ten, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[0], + Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(time.Second), int64(0), &ten, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[1], + Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(time.Second), int64(1), &ten, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[0], + Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &twenty, time.Time{}), + }, + block.ReplicaMetadata{ + Host: hosts[1], + Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), nil, time.Time{}), + }, + // Block only exists for host[1] but host[0] is the origin so should be consider a size/checksum mismatch. + block.ReplicaMetadata{ + Host: hosts[1], + Metadata: block.NewMetadata(ident.StringID("gah"), ident.Tags{}, now.Add(3*time.Second), int64(1), &ten, time.Time{}), + }, + // Block only exists for host[0] but host[0] is also the origin so should not be considered a size/checksum mismatch + // since the peer not the origin is missing data. + block.ReplicaMetadata{ + Host: hosts[0], + Metadata: block.NewMetadata(ident.StringID("grr"), ident.Tags{}, now.Add(3*time.Second), int64(1), &ten, time.Time{}), + }, } for _, input := range inputs { - var checkSum *uint32 - if input.hasChecksum { - ckSum := input.checksum - checkSum = &ckSum - } - metadata.GetOrAdd(ident.StringID(input.id)).GetOrAdd(input.ts, testHostBlockMetadataSlicePool()).Add(HostBlockMetadata{ - Host: input.host, - Size: input.size, - Checksum: checkSum, - }) + metadata.GetOrAdd(input.Metadata.ID).GetOrAdd(input.Metadata.Start, testReplicaMetadataSlicePool()).Add(input) } sizeExpected := []testBlock{ - {ident.StringID("bar"), now.Add(time.Second), []HostBlockMetadata{ - {hosts[0], int64(0), &inputs[2].checksum}, - {hosts[1], int64(1), &inputs[3].checksum}, + {ident.StringID("bar"), now.Add(time.Second), []block.ReplicaMetadata{ + inputs[2], + inputs[3], }}, - {ident.StringID("gah"), now.Add(3 * time.Second), []HostBlockMetadata{ - {hosts[0], int64(1), &inputs[6].checksum}, + {ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{ + inputs[6], }}, } checksumExpected := []testBlock{ - {ident.StringID("baz"), now.Add(2 * time.Second), []HostBlockMetadata{ - {hosts[0], int64(2), &inputs[4].checksum}, - {hosts[1], int64(2), nil}, + {ident.StringID("baz"), now.Add(2 * time.Second), []block.ReplicaMetadata{ + inputs[4], + inputs[5], }}, - {ident.StringID("gah"), now.Add(3 * time.Second), []HostBlockMetadata{ - {hosts[0], int64(1), &inputs[6].checksum}, + {ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{ + inputs[6], }}, } - m := NewReplicaMetadataComparer(2, testRepairOptions()).(replicaMetadataComparer) + m := NewReplicaMetadataComparer(hosts[0], testRepairOptions()).(replicaMetadataComparer) m.metadata = metadata res := m.Compare() - require.Equal(t, int64(4), res.NumSeries) - require.Equal(t, int64(4), res.NumBlocks) + require.Equal(t, int64(5), res.NumSeries) + require.Equal(t, int64(5), res.NumBlocks) assertEqual(t, sizeExpected, res.SizeDifferences) assertEqual(t, checksumExpected, res.ChecksumDifferences) } diff --git a/src/dbnode/storage/repair/options.go b/src/dbnode/storage/repair/options.go index b56294469c..57616b439b 100644 --- a/src/dbnode/storage/repair/options.go +++ b/src/dbnode/storage/repair/options.go @@ -25,17 +25,14 @@ import ( "time" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" ) const ( defaultRepairConsistencyLevel = topology.ReadConsistencyLevelMajority - defaultRepairInterval = 2 * time.Hour - defaultRepairTimeOffset = 30 * time.Minute - defaultRepairTimeJitter = time.Hour defaultRepairCheckInterval = time.Minute defaultRepairThrottle = 90 * time.Second - defaultRepairMaxRetries = 3 defaultRepairShardConcurrency = 1 defaultDebugShadowComparisonsEnabled = false defaultDebugShadowComparisonsPercentage = 1.0 @@ -43,15 +40,10 @@ const ( var ( errNoAdminClient = errors.New("no admin client in repair options") - errInvalidRepairInterval = errors.New("invalid repair interval in repair options") - errInvalidRepairTimeOffset = errors.New("invalid repair time offset in repair options") - errInvalidRepairTimeJitter = errors.New("invalid repair time jitter in repair options") - errTimeOffsetOrJitterTooBig = errors.New("repair time offset plus jitter should be no more than repair interval") errInvalidRepairCheckInterval = errors.New("invalid repair check interval in repair options") - errRepairCheckIntervalTooBig = errors.New("repair check interval too big in repair options") errInvalidRepairThrottle = errors.New("invalid repair throttle in repair options") - errInvalidRepairMaxRetries = errors.New("invalid repair max retries in repair options") - errNoHostBlockMetadataSlicePool = errors.New("no host block metadata pool in repair options") + errNoReplicaMetadataSlicePool = errors.New("no replica metadata pool in repair options") + errNoResultOptions = errors.New("no result options in repair options") errInvalidDebugShadowComparisonsPercentage = errors.New("debug shadow comparisons percentage must be between 0 and 1") ) @@ -59,13 +51,10 @@ type options struct { adminClient client.AdminClient repairConsistencyLevel topology.ReadConsistencyLevel repairShardConcurrency int - repairInterval time.Duration - repairTimeOffset time.Duration - repairTimeJitter time.Duration repairCheckInterval time.Duration repairThrottle time.Duration - repairMaxRetries int - hostBlockMetadataSlicePool HostBlockMetadataSlicePool + replicaMetadataSlicePool ReplicaMetadataSlicePool + resultOptions result.Options debugShadowComparisonsEnabled bool debugShadowComparisonsPercentage float64 } @@ -75,13 +64,10 @@ func NewOptions() Options { return &options{ repairConsistencyLevel: defaultRepairConsistencyLevel, repairShardConcurrency: defaultRepairShardConcurrency, - repairInterval: defaultRepairInterval, - repairTimeOffset: defaultRepairTimeOffset, - repairTimeJitter: defaultRepairTimeJitter, repairCheckInterval: defaultRepairCheckInterval, repairThrottle: defaultRepairThrottle, - repairMaxRetries: defaultRepairMaxRetries, - hostBlockMetadataSlicePool: NewHostBlockMetadataSlicePool(nil, 0), + replicaMetadataSlicePool: NewReplicaMetadataSlicePool(nil, 0), + resultOptions: result.NewOptions(), debugShadowComparisonsEnabled: defaultDebugShadowComparisonsEnabled, debugShadowComparisonsPercentage: defaultDebugShadowComparisonsPercentage, } @@ -117,36 +103,6 @@ func (o *options) RepairShardConcurrency() int { return o.repairShardConcurrency } -func (o *options) SetRepairInterval(value time.Duration) Options { - opts := *o - opts.repairInterval = value - return &opts -} - -func (o *options) RepairInterval() time.Duration { - return o.repairInterval -} - -func (o *options) SetRepairTimeOffset(value time.Duration) Options { - opts := *o - opts.repairTimeOffset = value - return &opts -} - -func (o *options) RepairTimeOffset() time.Duration { - return o.repairTimeOffset -} - -func (o *options) SetRepairTimeJitter(value time.Duration) Options { - opts := *o - opts.repairTimeJitter = value - return &opts -} - -func (o *options) RepairTimeJitter() time.Duration { - return o.repairTimeJitter -} - func (o *options) SetRepairCheckInterval(value time.Duration) Options { opts := *o opts.repairCheckInterval = value @@ -167,24 +123,24 @@ func (o *options) RepairThrottle() time.Duration { return o.repairThrottle } -func (o *options) SetRepairMaxRetries(value int) Options { +func (o *options) SetReplicaMetadataSlicePool(value ReplicaMetadataSlicePool) Options { opts := *o - opts.repairMaxRetries = value + opts.replicaMetadataSlicePool = value return &opts } -func (o *options) RepairMaxRetries() int { - return o.repairMaxRetries +func (o *options) ReplicaMetadataSlicePool() ReplicaMetadataSlicePool { + return o.replicaMetadataSlicePool } -func (o *options) SetHostBlockMetadataSlicePool(value HostBlockMetadataSlicePool) Options { +func (o *options) SetResultOptions(value result.Options) Options { opts := *o - opts.hostBlockMetadataSlicePool = value + opts.resultOptions = value return &opts } -func (o *options) HostBlockMetadataSlicePool() HostBlockMetadataSlicePool { - return o.hostBlockMetadataSlicePool +func (o *options) ResultOptions() result.Options { + return o.resultOptions } func (o *options) SetDebugShadowComparisonsEnabled(value bool) Options { @@ -211,32 +167,17 @@ func (o *options) Validate() error { if o.adminClient == nil { return errNoAdminClient } - if o.repairInterval < 0 { - return errInvalidRepairInterval - } - if o.repairTimeOffset < 0 { - return errInvalidRepairTimeOffset - } - if o.repairTimeJitter < 0 { - return errInvalidRepairTimeJitter - } - if o.repairTimeOffset+o.repairTimeJitter > o.repairInterval { - return errTimeOffsetOrJitterTooBig - } if o.repairCheckInterval < 0 { return errInvalidRepairCheckInterval } - if o.repairCheckInterval > o.repairInterval { - return errRepairCheckIntervalTooBig - } if o.repairThrottle < 0 { return errInvalidRepairThrottle } - if o.repairMaxRetries < 0 { - return errInvalidRepairMaxRetries + if o.replicaMetadataSlicePool == nil { + return errNoReplicaMetadataSlicePool } - if o.hostBlockMetadataSlicePool == nil { - return errNoHostBlockMetadataSlicePool + if o.resultOptions == nil { + return errNoResultOptions } if o.debugShadowComparisonsPercentage > 1.0 || o.debugShadowComparisonsPercentage < 0 { diff --git a/src/dbnode/storage/repair/types.go b/src/dbnode/storage/repair/types.go index 8c504c16eb..73b098a057 100644 --- a/src/dbnode/storage/repair/types.go +++ b/src/dbnode/storage/repair/types.go @@ -25,25 +25,19 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" ) -// HostBlockMetadata contains a host along with block metadata from that host -type HostBlockMetadata struct { - Host topology.Host - Size int64 - Checksum *uint32 -} - -// HostBlockMetadataSlice captures a slice of hostBlockMetadata -type HostBlockMetadataSlice interface { +// ReplicaMetadataSlice captures a slice of block.ReplicaMetadata +type ReplicaMetadataSlice interface { // Add adds the metadata to the slice - Add(metadata HostBlockMetadata) + Add(metadata block.ReplicaMetadata) // Metadata returns the metadata slice - Metadata() []HostBlockMetadata + Metadata() []block.ReplicaMetadata // Reset resets the metadata slice Reset() @@ -52,13 +46,13 @@ type HostBlockMetadataSlice interface { Close() } -// HostBlockMetadataSlicePool provides a pool for hostBlockMetadata slices -type HostBlockMetadataSlicePool interface { - // Get returns a hostBlockMetadata slice - Get() HostBlockMetadataSlice +// ReplicaMetadataSlicePool provides a pool for block.ReplicaMetadata slices +type ReplicaMetadataSlicePool interface { + // Get returns a ReplicaMetadata slice + Get() ReplicaMetadataSlice - // Put puts a hostBlockMetadata slice back to pool - Put(m HostBlockMetadataSlice) + // Put puts a ReplicaMetadata slice back to pool + Put(m ReplicaMetadataSlice) } // ReplicaBlockMetadata captures the block metadata from hosts in a shard replica set @@ -67,10 +61,10 @@ type ReplicaBlockMetadata interface { Start() time.Time // Metadata returns the metadata from all hosts - Metadata() []HostBlockMetadata + Metadata() []block.ReplicaMetadata // Add adds a metadata from a host - Add(metadata HostBlockMetadata) + Add(metadata block.ReplicaMetadata) // Close performs cleanup Close() @@ -88,7 +82,7 @@ type ReplicaBlocksMetadata interface { Add(block ReplicaBlockMetadata) // GetOrAdd returns the blocks metadata for a start time, creating one if it doesn't exist - GetOrAdd(start time.Time, p HostBlockMetadataSlicePool) ReplicaBlockMetadata + GetOrAdd(start time.Time, p ReplicaMetadataSlicePool) ReplicaBlockMetadata // Close performs cleanup Close() @@ -121,7 +115,7 @@ type ReplicaSeriesBlocksMetadata struct { // ReplicaMetadataComparer compares metadata from hosts in a replica set type ReplicaMetadataComparer interface { // AddLocalMetadata adds metadata from local host - AddLocalMetadata(origin topology.Host, localIter block.FilteredBlocksMetadataIter) error + AddLocalMetadata(localIter block.FilteredBlocksMetadataIter) error // AddPeerMetadata adds metadata from peers AddPeerMetadata(peerIter client.PeerBlockMetadataIter) error @@ -170,24 +164,6 @@ type Options interface { // RepairShardConcurrency returns the concurrency in which to repair shards with. RepairShardConcurrency() int - // SetRepairInterval sets the repair interval. - SetRepairInterval(value time.Duration) Options - - // RepairInterval returns the repair interval. - RepairInterval() time.Duration - - // SetRepairTimeOffset sets the repair time offset. - SetRepairTimeOffset(value time.Duration) Options - - // RepairTimeOffset returns the repair time offset. - RepairTimeOffset() time.Duration - - // SetRepairJitter sets the repair time jitter. - SetRepairTimeJitter(value time.Duration) Options - - // RepairTimeJitter returns the repair time jitter. - RepairTimeJitter() time.Duration - // SetRepairCheckInterval sets the repair check interval. SetRepairCheckInterval(value time.Duration) Options @@ -200,17 +176,17 @@ type Options interface { // RepairThrottle returns the repair throttle. RepairThrottle() time.Duration - // SetRepairMaxRetries sets the max number of retries for a block start. - SetRepairMaxRetries(value int) Options + // SetReplicaMetadataSlicePool sets the replicaMetadataSlice pool. + SetReplicaMetadataSlicePool(value ReplicaMetadataSlicePool) Options - // MaxRepairRetries returns the max number of retries for a block start. - RepairMaxRetries() int + // ReplicaMetadataSlicePool returns the replicaMetadataSlice pool. + ReplicaMetadataSlicePool() ReplicaMetadataSlicePool - // SetHostBlockMetadataSlicePool sets the hostBlockMetadataSlice pool. - SetHostBlockMetadataSlicePool(value HostBlockMetadataSlicePool) Options + // SetResultOptions sets the result options. + SetResultOptions(value result.Options) Options - // HostBlockMetadataSlicePool returns the hostBlockMetadataSlice pool. - HostBlockMetadataSlicePool() HostBlockMetadataSlicePool + // ResultOptions returns the result options. + ResultOptions() result.Options // SetDebugShadowComparisonsEnabled sets whether debug shadow comparisons are enabled. SetDebugShadowComparisonsEnabled(value bool) Options diff --git a/src/dbnode/storage/repair_test.go b/src/dbnode/storage/repair_test.go index 10dd222c8b..526ef10fa1 100644 --- a/src/dbnode/storage/repair_test.go +++ b/src/dbnode/storage/repair_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,6 @@ package storage import ( - "errors" "sync" "testing" "time" @@ -91,108 +90,6 @@ func TestDatabaseRepairerStartStop(t *testing.T) { } } -func TestDatabaseRepairerHaveNotReachedOffset(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - repairInterval = 2 * time.Hour - repairTimeOffset = time.Hour - now = time.Now().Truncate(repairInterval).Add(30 * time.Minute) - repaired = false - numIter = 0 - ) - - nowFn := func() time.Time { return now } - opts := DefaultTestOptions() - clockOpts := opts.ClockOptions().SetNowFn(nowFn) - repairOpts := testRepairOptions(ctrl). - SetRepairInterval(repairInterval). - SetRepairTimeOffset(repairTimeOffset) - opts = opts. - SetClockOptions(clockOpts.SetNowFn(nowFn)). - SetRepairOptions(repairOpts) - mockDatabase := NewMockdatabase(ctrl) - mockDatabase.EXPECT().Options().Return(opts).AnyTimes() - - databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts) - require.NoError(t, err) - repairer := databaseRepairer.(*dbRepairer) - - repairer.repairFn = func() error { - repaired = true - return nil - } - - repairer.sleepFn = func(_ time.Duration) { - if numIter == 0 { - repairer.closed = true - } - numIter++ - } - - repairer.run() - require.Equal(t, 1, numIter) - require.False(t, repaired) -} - -func TestDatabaseRepairerOnlyOncePerInterval(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - repairInterval = 2 * time.Hour - repairTimeOffset = time.Hour - now = time.Now().Truncate(repairInterval).Add(90 * time.Minute) - numRepairs = 0 - numIter = 0 - ) - - nowFn := func() time.Time { - switch numIter { - case 0: - return now - case 1: - return now.Add(time.Minute) - case 2: - return now.Add(time.Hour) - default: - return now.Add(2 * time.Hour) - } - } - - opts := DefaultTestOptions() - clockOpts := opts.ClockOptions().SetNowFn(nowFn) - repairOpts := testRepairOptions(ctrl). - SetRepairInterval(repairInterval). - SetRepairTimeOffset(repairTimeOffset) - opts = opts. - SetClockOptions(clockOpts.SetNowFn(nowFn)). - SetRepairOptions(repairOpts) - mockDatabase := NewMockdatabase(ctrl) - mockDatabase.EXPECT().Options().Return(opts).AnyTimes() - - databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts) - require.NoError(t, err) - repairer := databaseRepairer.(*dbRepairer) - - repairer.repairFn = func() error { - numRepairs++ - return nil - } - - repairer.sleepFn = func(_ time.Duration) { - if numIter == 3 { - repairer.closed = true - } - numIter++ - } - - repairer.run() - require.Equal(t, 4, numIter) - require.Equal(t, 2, numRepairs) -} - func TestDatabaseRepairerRepairNotBootstrapped(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -213,8 +110,7 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { defer ctrl.Finish() session := client.NewMockAdminSession(ctrl) - session.EXPECT().Origin().Return(topology.NewHost("0", "addr0")) - session.EXPECT().Replicas().Return(2) + session.EXPECT().Origin().Return(topology.NewHost("0", "addr0")).AnyTimes() mockClient := client.NewMockAdminClient(ctrl) mockClient.EXPECT().DefaultAdminSession().Return(session, nil) @@ -244,8 +140,8 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { IncludeLastRead: false, } - sizes = []int64{1, 2, 3} - checksums = []uint32{4, 5, 6} + sizes = []int64{1, 2, 3, 4} + checksums = []uint32{4, 5, 6, 7} lastRead = now.Add(-time.Minute) shardID = uint32(0) shard = NewMockdatabaseShard(ctrl) @@ -276,27 +172,32 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { FetchBlocksMetadataV2(any, start, end, any, nonNilPageToken, fetchOpts). Return(expectedResults, nil, nil) shard.EXPECT().ID().Return(shardID).AnyTimes() + shard.EXPECT().Load(gomock.Any()) peerIter := client.NewMockPeerBlockMetadataIter(ctrl) - inputBlocks := []struct { - host topology.Host - meta block.Metadata - }{ - {topology.NewHost("1", "addr1"), block.NewMetadata(ident.StringID("foo"), ident.Tags{}, - now.Add(30*time.Minute), sizes[0], &checksums[0], lastRead)}, - {topology.NewHost("1", "addr1"), block.NewMetadata(ident.StringID("foo"), ident.Tags{}, - now.Add(time.Hour), sizes[0], &checksums[1], lastRead)}, - {topology.NewHost("1", "addr1"), block.NewMetadata(ident.StringID("bar"), ident.Tags{}, - now.Add(30*time.Minute), sizes[2], &checksums[2], lastRead)}, + inputBlocks := []block.ReplicaMetadata{ + { + Host: topology.NewHost("1", "addr1"), + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(30*time.Minute), sizes[0], &checksums[0], lastRead), + }, + { + Host: topology.NewHost("1", "addr1"), + Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(time.Hour), sizes[0], &checksums[1], lastRead), + }, + { + Host: topology.NewHost("1", "addr1"), + // Mismatch checksum so should trigger repair of this series. + Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(30*time.Minute), sizes[2], &checksums[3], lastRead), + }, } gomock.InOrder( peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[0].host, inputBlocks[0].meta), + peerIter.EXPECT().Current().Return(inputBlocks[0].Host, inputBlocks[0].Metadata), peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[1].host, inputBlocks[1].meta), + peerIter.EXPECT().Current().Return(inputBlocks[1].Host, inputBlocks[1].Metadata), peerIter.EXPECT().Next().Return(true), - peerIter.EXPECT().Current().Return(inputBlocks[2].host, inputBlocks[2].meta), + peerIter.EXPECT().Current().Return(inputBlocks[2].Host, inputBlocks[2].Metadata), peerIter.EXPECT().Next().Return(false), peerIter.EXPECT().Err().Return(nil), ) @@ -305,6 +206,26 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { rpOpts.RepairConsistencyLevel(), gomock.Any()). Return(peerIter, nil) + peerBlocksIter := client.NewMockPeerBlocksIter(ctrl) + dbBlock1 := block.NewMockDatabaseBlock(ctrl) + dbBlock1.EXPECT().StartTime().Return(inputBlocks[2].Metadata.Start).AnyTimes() + dbBlock2 := block.NewMockDatabaseBlock(ctrl) + dbBlock2.EXPECT().StartTime().Return(inputBlocks[2].Metadata.Start).AnyTimes() + // Ensure merging logic works. + dbBlock1.EXPECT().Merge(dbBlock2) + gomock.InOrder( + peerBlocksIter.EXPECT().Next().Return(true), + peerBlocksIter.EXPECT().Current().Return(inputBlocks[2].Host, inputBlocks[2].Metadata.ID, dbBlock1), + peerBlocksIter.EXPECT().Next().Return(true), + peerBlocksIter.EXPECT().Current().Return(inputBlocks[2].Host, inputBlocks[2].Metadata.ID, dbBlock2), + peerBlocksIter.EXPECT().Next().Return(false), + ) + nsMeta, err := namespace.NewMetadata(namespaceID, namespace.NewOptions()) + require.NoError(t, err) + session.EXPECT(). + FetchBlocksFromPeers(nsMeta, shardID, rpOpts.RepairConsistencyLevel(), inputBlocks[2:], gomock.Any()). + Return(peerBlocksIter, nil) + var ( resNamespace ident.ID resShard databaseShard @@ -319,166 +240,172 @@ func TestDatabaseShardRepairerRepair(t *testing.T) { resDiff = diffRes } - ctx := context.NewContext() - nsCtx := namespace.Context{ID: namespaceID} - repairer.Repair(ctx, nsCtx, repairTimeRange, shard) + var ( + ctx = context.NewContext() + nsCtx = namespace.Context{ID: namespaceID} + ) + require.NoError(t, err) + repairer.Repair(ctx, nsCtx, nsMeta, repairTimeRange, shard) + require.Equal(t, namespaceID, resNamespace) require.Equal(t, resShard, shard) require.Equal(t, int64(2), resDiff.NumSeries) require.Equal(t, int64(3), resDiff.NumBlocks) - require.Equal(t, 0, resDiff.ChecksumDifferences.Series().Len()) - sizeDiffSeries := resDiff.SizeDifferences.Series() - require.Equal(t, 1, sizeDiffSeries.Len()) - series, exists := sizeDiffSeries.Get(ident.StringID("foo")) + + checksumDiffSeries := resDiff.ChecksumDifferences.Series() + require.Equal(t, 1, checksumDiffSeries.Len()) + series, exists := checksumDiffSeries.Get(ident.StringID("bar")) require.True(t, exists) blocks := series.Metadata.Blocks() require.Equal(t, 1, len(blocks)) - block, exists := blocks[xtime.ToUnixNano(now.Add(time.Hour))] + currBlock, exists := blocks[xtime.ToUnixNano(now.Add(30*time.Minute))] require.True(t, exists) - require.Equal(t, now.Add(time.Hour), block.Start()) - expected := []repair.HostBlockMetadata{ - {Host: topology.NewHost("0", "addr0"), Size: sizes[1], Checksum: &checksums[1]}, - {Host: topology.NewHost("1", "addr1"), Size: sizes[0], Checksum: &checksums[1]}, + require.Equal(t, now.Add(30*time.Minute), currBlock.Start()) + expected := []block.ReplicaMetadata{ + // Checksum difference for series "bar". + {Host: topology.NewHost("0", "addr0"), Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(30*time.Minute), sizes[2], &checksums[2], lastRead)}, + {Host: topology.NewHost("1", "addr1"), Metadata: inputBlocks[2].Metadata}, } - require.Equal(t, expected, block.Metadata()) -} - -func TestRepairerRepairTimes(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() + require.Equal(t, expected, currBlock.Metadata()) - now := time.Unix(188000, 0) - opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) - clockOpts := opts.ClockOptions() - opts = opts.SetClockOptions(clockOpts.SetNowFn(func() time.Time { return now })) - database := NewMockdatabase(ctrl) - database.EXPECT().Options().Return(opts).AnyTimes() - - inputTimes := []struct { - bs time.Time - rs repairState - }{ - {time.Unix(14400, 0), repairState{repairFailed, 2}}, - {time.Unix(28800, 0), repairState{repairFailed, 3}}, - {time.Unix(36000, 0), repairState{repairNotStarted, 0}}, - {time.Unix(43200, 0), repairState{repairSuccess, 1}}, - } - repairer, err := newDatabaseRepairer(database, opts) - require.NoError(t, err) - r := repairer.(*dbRepairer) - for _, input := range inputTimes { - r.repairStatesByNs.setRepairState(defaultTestNs1ID, input.bs, input.rs) + sizeDiffSeries := resDiff.SizeDifferences.Series() + require.Equal(t, 1, sizeDiffSeries.Len()) + series, exists = sizeDiffSeries.Get(ident.StringID("foo")) + require.True(t, exists) + blocks = series.Metadata.Blocks() + require.Equal(t, 1, len(blocks)) + currBlock, exists = blocks[xtime.ToUnixNano(now.Add(time.Hour))] + require.True(t, exists) + require.Equal(t, now.Add(time.Hour), currBlock.Start()) + expected = []block.ReplicaMetadata{ + // Size difference for series "foo". + {Host: topology.NewHost("0", "addr0"), Metadata: block.NewMetadata(ident.StringID("foo"), ident.Tags{}, now.Add(time.Hour), sizes[1], &checksums[1], lastRead)}, + {Host: topology.NewHost("1", "addr1"), Metadata: inputBlocks[1].Metadata}, } + require.Equal(t, expected, currBlock.Metadata()) +} - testNs, closer := newTestNamespace(t) - defer closer() - res := r.namespaceRepairTimeRanges(testNs) - expectedRanges := xtime.Ranges{}. - AddRange(xtime.Range{Start: time.Unix(14400, 0), End: time.Unix(28800, 0)}). - AddRange(xtime.Range{Start: time.Unix(36000, 0), End: time.Unix(43200, 0)}). - AddRange(xtime.Range{Start: time.Unix(50400, 0), End: time.Unix(187200, 0)}) - require.Equal(t, expectedRanges, res) +type expectedRepair struct { + repairRange xtime.Range } -func TestRepairerRepairWithTime(t *testing.T) { +func TestDatabaseRepairPrioritizationLogic(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - repairTimeRange := xtime.Range{Start: time.Unix(7200, 0), End: time.Unix(14400, 0)} - opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) - database := NewMockdatabase(ctrl) - database.EXPECT().Options().Return(opts).AnyTimes() + var ( + rOpts = retention.NewOptions(). + SetRetentionPeriod(retention.NewOptions().BlockSize() * 2) + nsOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts) + blockSize = rOpts.BlockSize() - repairer, err := newDatabaseRepairer(database, opts) - require.NoError(t, err) - r := repairer.(*dbRepairer) + // Set current time such that the previous block is flushable. + now = time.Now().Truncate(blockSize).Add(rOpts.BufferPast()).Add(time.Second) - inputs := []struct { - name string - err error - }{ - {"foo", errors.New("some error")}, - {"bar", errors.New("some other error")}, - {"baz", nil}, - } - for _, input := range inputs { - ns := NewMockdatabaseNamespace(ctrl) - id := ident.StringID(input.name) - ropts := retention.NewOptions() - nsOpts := namespace.NewMockOptions(ctrl) - nsOpts.EXPECT().RetentionOptions().Return(ropts) - ns.EXPECT().Options().Return(nsOpts) - ns.EXPECT().Repair(gomock.Not(nil), repairTimeRange).Return(input.err) - ns.EXPECT().ID().Return(id).AnyTimes() - err := r.repairNamespaceWithTimeRange(ns, repairTimeRange) - rs, ok := r.repairStatesByNs.repairStates(id, time.Unix(7200, 0)) - require.True(t, ok) - if input.err == nil { - require.NoError(t, err) - require.Equal(t, repairState{Status: repairSuccess}, rs) - } else { - require.Error(t, err) - require.Equal(t, repairState{Status: repairFailed, NumFailures: 1}, rs) - } - } -} + flushTimeStart = retention.FlushTimeStart(rOpts, now) + flushTimeEnd = retention.FlushTimeEnd(rOpts, now) -func TestRepairerTimesMultipleNamespaces(t *testing.T) { - // tf2(i) returns the start time of the i_th 2 hour block since epoch - tf2 := func(i int) time.Time { - return time.Unix(int64(i*7200), 0) - } - // tf4(i) returns the start time of the i_th 4 hour block since epoch - tf4 := func(i int) time.Time { - return tf2(2 * i) + flushTimeStartNano = xtime.ToUnixNano(flushTimeStart) + flushTimeEndNano = xtime.ToUnixNano(flushTimeEnd) + ) + require.NoError(t, nsOpts.Validate()) + // Ensure only two flushable blocks in retention to make test logic simpler. + require.Equal(t, blockSize, flushTimeEnd.Sub(flushTimeStart)) + + testCases := []struct { + title string + repairState repairStatesByNs + expectedNS1Repair expectedRepair + expectedNS2Repair expectedRepair + }{ + { + title: "repairs most recent block if no repair state", + expectedNS1Repair: expectedRepair{xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}}, + expectedNS2Repair: expectedRepair{xtime.Range{Start: flushTimeEnd, End: flushTimeEnd.Add(blockSize)}}, + }, + { + title: "repairs next unrepaired block in reverse order if some (but not all) blocks have been repaired", + repairState: repairStatesByNs{ + "ns1": namespaceRepairStateByTime{ + flushTimeEndNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}, + }, + }, + "ns2": namespaceRepairStateByTime{ + flushTimeEndNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}, + }, + }, + }, + expectedNS1Repair: expectedRepair{xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}}, + expectedNS2Repair: expectedRepair{xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}}, + }, + { + title: "repairs least recently repaired block if all blocks have been repaired", + repairState: repairStatesByNs{ + "ns1": namespaceRepairStateByTime{ + flushTimeStartNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}, + }, + flushTimeEndNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}.Add(time.Second), + }, + }, + "ns2": namespaceRepairStateByTime{ + flushTimeStartNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}, + }, + flushTimeEndNano: repairState{ + Status: repairSuccess, + LastAttempt: time.Time{}.Add(time.Second), + }, + }, + }, + expectedNS1Repair: expectedRepair{xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}}, + expectedNS2Repair: expectedRepair{xtime.Range{Start: flushTimeStart, End: flushTimeStart.Add(blockSize)}}, + }, } - ctrl := gomock.NewController(t) - defer ctrl.Finish() + for _, tc := range testCases { + t.Run(tc.title, func(t *testing.T) { + opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) + mockDatabase := NewMockdatabase(ctrl) - now := time.Unix(188000, 0) - opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) - clockOpts := opts.ClockOptions() - opts = opts.SetClockOptions(clockOpts.SetNowFn(func() time.Time { return now })) - database := NewMockdatabase(ctrl) - database.EXPECT().Options().Return(opts).AnyTimes() - - inputTimes := []struct { - ns ident.ID - bs time.Time - rs repairState - }{ - {defaultTestNs1ID, tf2(2), repairState{repairFailed, 2}}, - {defaultTestNs1ID, tf2(4), repairState{repairFailed, 3}}, - {defaultTestNs1ID, tf2(5), repairState{repairNotStarted, 0}}, - {defaultTestNs1ID, tf2(6), repairState{repairSuccess, 1}}, - {defaultTestNs2ID, tf4(1), repairState{repairFailed, 1}}, - {defaultTestNs2ID, tf4(2), repairState{repairFailed, 3}}, - {defaultTestNs2ID, tf4(4), repairState{repairNotStarted, 0}}, - {defaultTestNs2ID, tf4(6), repairState{repairSuccess, 1}}, - } - repairer, err := newDatabaseRepairer(database, opts) - require.NoError(t, err) - r := repairer.(*dbRepairer) - for _, input := range inputTimes { - r.repairStatesByNs.setRepairState(input.ns, input.bs, input.rs) + databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts) + require.NoError(t, err) + repairer := databaseRepairer.(*dbRepairer) + repairer.nowFn = func() time.Time { + return now + } + if tc.repairState == nil { + tc.repairState = repairStatesByNs{} + } + repairer.repairStatesByNs = tc.repairState + + mockDatabase.EXPECT().IsBootstrapped().Return(true) + + var ( + ns1 = NewMockdatabaseNamespace(ctrl) + ns2 = NewMockdatabaseNamespace(ctrl) + namespaces = []databaseNamespace{ns1, ns2} + ) + ns1.EXPECT().Options().Return(nsOpts).AnyTimes() + ns2.EXPECT().Options().Return(nsOpts).AnyTimes() + + ns1.EXPECT().ID().Return(ident.StringID("ns1")).AnyTimes() + ns2.EXPECT().ID().Return(ident.StringID("ns2")).AnyTimes() + + ns1.EXPECT().Repair(gomock.Any(), tc.expectedNS1Repair.repairRange) + ns2.EXPECT().Repair(gomock.Any(), tc.expectedNS2Repair.repairRange) + + mockDatabase.EXPECT().GetOwnedNamespaces().Return(namespaces, nil) + require.Nil(t, repairer.Repair()) + }) } - - testNs1, closer1 := newTestNamespaceWithIDOpts(t, defaultTestNs1ID, defaultTestNs1Opts) - defer closer1() - res := r.namespaceRepairTimeRanges(testNs1) - expectedRanges := xtime.Ranges{}. - AddRange(xtime.Range{Start: tf2(2), End: tf2(4)}). - AddRange(xtime.Range{Start: tf2(5), End: tf2(6)}). - AddRange(xtime.Range{Start: tf2(7), End: tf2(26)}) - require.Equal(t, expectedRanges, res) - - testNs2, closer2 := newTestNamespaceWithIDOpts(t, defaultTestNs2ID, defaultTestNs2Opts) - defer closer2() - res = r.namespaceRepairTimeRanges(testNs2) - expectedRanges = xtime.Ranges{}. - AddRange(xtime.Range{Start: tf4(1), End: tf4(2)}). - AddRange(xtime.Range{Start: tf4(3), End: tf4(6)}). - AddRange(xtime.Range{Start: tf4(7), End: tf4(13)}) - require.Equal(t, expectedRanges, res) } diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 36426ee45d..964b81e883 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -730,12 +730,24 @@ func (b *dbBuffer) FetchBlocksMetadata( if opts.IncludeLastRead { resultLastRead = bv.lastRead() } - // NB(r): Ignore if opts.IncludeChecksum because we avoid - // calculating checksum since block is open and is being mutated + + var ( + checksum *uint32 + err error + ) + if opts.IncludeChecksums { + // Checksum calculations are best effort since we can't calculate one if there + // are multiple streams without performing an expensive merge. + checksum, err = bv.checksumIfSingleStream() + if err != nil { + return nil, err + } + } res.Add(block.FetchBlockMetadataResult{ Start: bv.start, Size: resultSize, LastRead: resultLastRead, + Checksum: checksum, }) } @@ -905,6 +917,13 @@ func (b *BufferBucketVersions) streamsLen() int { return res } +func (b *BufferBucketVersions) checksumIfSingleStream() (*uint32, error) { + if len(b.buckets) != 1 { + return nil, nil + } + return b.buckets[0].checksumIfSingleStream() +} + func (b *BufferBucketVersions) write( timestamp time.Time, value float64, @@ -1184,6 +1203,38 @@ func (b *BufferBucket) streamsLen() int { return length } +func (b *BufferBucket) checksumIfSingleStream() (*uint32, error) { + if b.hasJustSingleEncoder() { + enc := b.encoders[0].encoder + stream, ok := enc.Stream(encoding.StreamOptions{}) + if !ok { + return nil, nil + } + + segment, err := stream.Segment() + if err != nil { + return nil, err + } + + if segment.Len() == 0 { + return nil, nil + } + + checksum := digest.SegmentChecksum(segment) + return &checksum, nil + } + + if b.hasJustSingleLoadedBlock() { + checksum, err := b.loadedBlocks[0].Checksum() + if err != nil { + return nil, err + } + return &checksum, nil + } + + return nil, nil +} + func (b *BufferBucket) resetEncoders() { var zeroed inOrderEncoder for i := range b.encoders { diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index 3404002112..bd78bebc39 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -771,11 +771,20 @@ func TestBufferFetchBlocksMetadata(t *testing.T) { require.NoError(t, err) res := metadata.Results() require.Equal(t, 1, len(res)) - assert.Equal(t, b.start, res[0].Start) - assert.Equal(t, expectedSize, res[0].Size) - // checksum is never available for buffer block. - assert.Equal(t, (*uint32)(nil), res[0].Checksum) - assert.True(t, expectedLastRead.Equal(res[0].LastRead)) + require.Equal(t, b.start, res[0].Start) + require.Equal(t, expectedSize, res[0].Size) + // Checksum not available since there are multiple streams. + require.Equal(t, (*uint32)(nil), res[0].Checksum) + require.True(t, expectedLastRead.Equal(res[0].LastRead)) + + // Tick to merge all of the streams into one. + buffer.Tick(ShardBlockStateSnapshot{}, namespace.Context{}) + metadata, err = buffer.FetchBlocksMetadata(ctx, start, end, fetchOpts) + require.NoError(t, err) + res = metadata.Results() + require.Equal(t, 1, len(res)) + // Checksum should be available now since there was only one stream. + require.NotNil(t, res[0].Checksum) } func TestBufferTickReordersOutOfOrderBuffers(t *testing.T) { diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 31fef27f6e..6b6616a455 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -417,15 +417,17 @@ func (s *dbSeries) addBlockWithLock(b block.DatabaseBlock) { func (s *dbSeries) Load( opts LoadOptions, - bootstrappedBlocks block.DatabaseSeriesBlocks, + blocksToLoad block.DatabaseSeriesBlocks, blockStates BootstrappedBlockStateSnapshot, ) (LoadResult, error) { if opts.Bootstrap { - bsResult, err := s.bootstrap(bootstrappedBlocks, blockStates) + bsResult, err := s.bootstrap(blocksToLoad, blockStates) return LoadResult{Bootstrap: bsResult}, err } - s.load(bootstrappedBlocks, blockStates) + s.Lock() + s.loadWithLock(false, blocksToLoad, blockStates) + s.Unlock() return LoadResult{}, nil } @@ -448,26 +450,33 @@ func (s *dbSeries) bootstrap( return result, nil } - s.loadWithLock(bootstrappedBlocks, blockStates) + s.loadWithLock(true, bootstrappedBlocks, blockStates) result.NumBlocksMovedToBuffer += int64(bootstrappedBlocks.Len()) return result, nil } -func (s *dbSeries) load( - bootstrappedBlocks block.DatabaseSeriesBlocks, - blockStates BootstrappedBlockStateSnapshot, -) { - s.Lock() - s.loadWithLock(bootstrappedBlocks, blockStates) - s.Unlock() -} - func (s *dbSeries) loadWithLock( - bootstrappedBlocks block.DatabaseSeriesBlocks, + isBootstrap bool, + blocksToLoad block.DatabaseSeriesBlocks, blockStates BootstrappedBlockStateSnapshot, ) { - for _, block := range bootstrappedBlocks.AllBlocks() { + for _, block := range blocksToLoad.AllBlocks() { + if !isBootstrap { + // The data being loaded is not part of the bootstrap process then it needs to be + // loaded as a cold write because the load could be happening concurrently with + // other processes like the flush (as opposed to bootstrap which cannot happen + // concurrently with a flush) and there is no way to know if this series/block + // combination has been warm flushed or not yet since updating the shard block state + // doesn't happen until the entire flush completes. + // + // As a result the only safe operation is to load the block as a cold write which + // ensures that the data will eventually be flushed and merged with the existing data + // on disk in the two scenarios where the Load() API is used (cold writes and repairs). + s.buffer.Load(block, ColdWrite) + continue + } + blStartNano := xtime.ToUnixNano(block.StartTime()) blState := blockStates.Snapshot[blStartNano] if !blState.WarmRetrievable { diff --git a/src/dbnode/storage/series/series_test.go b/src/dbnode/storage/series/series_test.go index 666a5707d1..522c913e6b 100644 --- a/src/dbnode/storage/series/series_test.go +++ b/src/dbnode/storage/series/series_test.go @@ -220,33 +220,26 @@ func TestSeriesWriteFlushRead(t *testing.T) { // TestSeriesLoad tests the behavior the Bootstrap()/Load()s method by ensuring that they actually load // data into the series and that the data (merged with any existing data) can be retrieved. // -// It also ensures that blocks for blockStarts that have not been warm flushed yet are loaded as -// warm write and block for blockStarts that have already been warm flushed are loaded as cold writes. +// It also ensures that blocks for the bootstrap path blockStarts that have not been warm flushed yet +// are loaded as warm writes and block for blockStarts that have already been warm flushed are loaded as +// cold writes and that for the load path everything is loaded as cold writes. func TestSeriesBootstrapAndLoad(t *testing.T) { testCases := []struct { - title string - f func( + title string + loadOpts LoadOptions + f func( series DatabaseSeries, blocks block.DatabaseSeriesBlocks, blockStates BootstrappedBlockStateSnapshot) }{ { - title: "load", - f: func(series DatabaseSeries, - blocks block.DatabaseSeriesBlocks, - blockStates BootstrappedBlockStateSnapshot, - ) { - series.Load(LoadOptions{}, blocks, blockStates) - }}, + title: "load", + loadOpts: LoadOptions{}, + }, { - title: "bootstrap", - f: func(series DatabaseSeries, - blocks block.DatabaseSeriesBlocks, - blockStates BootstrappedBlockStateSnapshot, - ) { - _, err := series.Load(LoadOptions{Bootstrap: true}, blocks, blockStates) - require.NoError(t, err) - }}, + title: "bootstrap", + loadOpts: LoadOptions{Bootstrap: true}, + }, } for _, tc := range testCases { @@ -309,7 +302,8 @@ func TestSeriesBootstrapAndLoad(t *testing.T) { blocks.AddBlock(dbBlock) } - tc.f(series, blocks, blockStates) + _, err := series.Load(tc.loadOpts, blocks, blockStates) + require.NoError(t, err) t.Run("Data can be read", func(t *testing.T) { ctx := context.NewContext() @@ -322,16 +316,26 @@ func TestSeriesBootstrapAndLoad(t *testing.T) { requireReaderValuesEqual(t, expectedData, results, opts, nsCtx) }) - t.Run("Unflushed blocks loaded as warm writes and flushed blocks loaded as cold writes", func(t *testing.T) { + t.Run("blocks loaded as warm/cold writes correctly", func(t *testing.T) { optimizedTimes := series.ColdFlushBlockStarts(blockStates) - coldFlushBlockStarts := []xtime.UnixNano{} optimizedTimes.ForEach(func(blockStart xtime.UnixNano) { coldFlushBlockStarts = append(coldFlushBlockStarts, blockStart) }) - - expectedColdFlushBlockStarts := []xtime.UnixNano{xtime.ToUnixNano(alreadyWarmFlushedBlockStart)} - require.Equal(t, expectedColdFlushBlockStarts, coldFlushBlockStarts) + if tc.loadOpts.Bootstrap { + // If its a bootstrap then we need to make sure that everything gets loaded as warm/cold writes + // correctly based on the flush state. + expectedColdFlushBlockStarts := []xtime.UnixNano{xtime.ToUnixNano(alreadyWarmFlushedBlockStart)} + require.Equal(t, expectedColdFlushBlockStarts, coldFlushBlockStarts) + } else { + // If its just a regular load then everything should be loaded as cold writes for correctness + // since flushes and loads can happen concurrently. + expectedColdFlushBlockStarts := []xtime.UnixNano{ + xtime.ToUnixNano(alreadyWarmFlushedBlockStart), + xtime.ToUnixNano(notYetWarmFlushedBlockStart), + } + require.Equal(t, expectedColdFlushBlockStarts, coldFlushBlockStarts) + } }) }) } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 731bd31dba..1ff8bffb78 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2421,10 +2421,11 @@ func (s *dbShard) CleanupCompactedFileSets() error { func (s *dbShard) Repair( ctx context.Context, nsCtx namespace.Context, + nsMeta namespace.Metadata, tr xtime.Range, repairer databaseShardRepairer, ) (repair.MetadataComparisonResult, error) { - return repairer.Repair(ctx, nsCtx, tr, s) + return repairer.Repair(ctx, nsCtx, nsMeta, tr, s) } func (s *dbShard) TagsFromSeriesID(seriesID ident.ID) (ident.Tags, bool, error) { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index bc7c765a06..281a5e7ba1 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1820,18 +1820,18 @@ func (mr *MockdatabaseShardMockRecorder) CleanupCompactedFileSets() *gomock.Call } // Repair mocks base method -func (m *MockdatabaseShard) Repair(ctx context.Context, nsCtx namespace.Context, tr time0.Range, repairer databaseShardRepairer) (repair.MetadataComparisonResult, error) { +func (m *MockdatabaseShard) Repair(ctx context.Context, nsCtx namespace.Context, nsMeta namespace.Metadata, tr time0.Range, repairer databaseShardRepairer) (repair.MetadataComparisonResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Repair", ctx, nsCtx, tr, repairer) + ret := m.ctrl.Call(m, "Repair", ctx, nsCtx, nsMeta, tr, repairer) ret0, _ := ret[0].(repair.MetadataComparisonResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Repair indicates an expected call of Repair -func (mr *MockdatabaseShardMockRecorder) Repair(ctx, nsCtx, tr, repairer interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) Repair(ctx, nsCtx, nsMeta, tr, repairer interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShard)(nil).Repair), ctx, nsCtx, tr, repairer) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShard)(nil).Repair), ctx, nsCtx, nsMeta, tr, repairer) } // TagsFromSeriesID mocks base method @@ -2457,18 +2457,18 @@ func (mr *MockdatabaseShardRepairerMockRecorder) Options() *gomock.Call { } // Repair mocks base method -func (m *MockdatabaseShardRepairer) Repair(ctx context.Context, nsCtx namespace.Context, tr time0.Range, shard databaseShard) (repair.MetadataComparisonResult, error) { +func (m *MockdatabaseShardRepairer) Repair(ctx context.Context, nsCtx namespace.Context, nsMeta namespace.Metadata, tr time0.Range, shard databaseShard) (repair.MetadataComparisonResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Repair", ctx, nsCtx, tr, shard) + ret := m.ctrl.Call(m, "Repair", ctx, nsCtx, nsMeta, tr, shard) ret0, _ := ret[0].(repair.MetadataComparisonResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Repair indicates an expected call of Repair -func (mr *MockdatabaseShardRepairerMockRecorder) Repair(ctx, nsCtx, tr, shard interface{}) *gomock.Call { +func (mr *MockdatabaseShardRepairerMockRecorder) Repair(ctx, nsCtx, nsMeta, tr, shard interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShardRepairer)(nil).Repair), ctx, nsCtx, tr, shard) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShardRepairer)(nil).Repair), ctx, nsCtx, nsMeta, tr, shard) } // MockdatabaseRepairer is a mock of databaseRepairer interface diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 6336556736..ec252806df 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -507,6 +507,7 @@ type databaseShard interface { Repair( ctx context.Context, nsCtx namespace.Context, + nsMeta namespace.Metadata, tr xtime.Range, repairer databaseShardRepairer, ) (repair.MetadataComparisonResult, error) @@ -680,6 +681,7 @@ type databaseShardRepairer interface { Repair( ctx context.Context, nsCtx namespace.Context, + nsMeta namespace.Metadata, tr xtime.Range, shard databaseShard, ) (repair.MetadataComparisonResult, error) diff --git a/src/dbnode/x/xio/block_reader.go b/src/dbnode/x/xio/block_reader.go index be977548c2..37d561d02b 100644 --- a/src/dbnode/x/xio/block_reader.go +++ b/src/dbnode/x/xio/block_reader.go @@ -56,3 +56,42 @@ func (b *BlockReader) ResetWindowed(segment ts.Segment, start time.Time, blockSi b.Start = start b.BlockSize = blockSize } + +// FilterEmptyBlockReadersSliceOfSlicesInPlace filters a [][]BlockReader in place (I.E by modifying +// the existing data structures instead of allocating new ones) such that the returned [][]BlockReader +// will only contain BlockReaders that contain non-empty segments. +// +// Note that if any of the Block/Segment readers are backed by async implementations then this function +// will not return until all of the async execution has completed. +func FilterEmptyBlockReadersSliceOfSlicesInPlace(brSliceOfSlices [][]BlockReader) ([][]BlockReader, error) { + filteredSliceOfSlices := brSliceOfSlices[:0] + for _, brSlice := range brSliceOfSlices { + filteredBrSlice, err := FilterEmptyBlockReadersInPlace(brSlice) + if err != nil { + return nil, err + } + if len(filteredBrSlice) > 0 { + filteredSliceOfSlices = append(filteredSliceOfSlices, filteredBrSlice) + } + } + return filteredSliceOfSlices, nil +} + +// FilterEmptyBlockReadersInPlace is the same as FilterEmptyBlockReadersSliceOfSlicesInPlace except for +// one dimensional slices instead of two. +func FilterEmptyBlockReadersInPlace(brs []BlockReader) ([]BlockReader, error) { + filtered := brs[:0] + for _, br := range brs { + if br.SegmentReader == nil { + continue + } + segment, err := br.Segment() + if err != nil { + return nil, err + } + if segment.Len() > 0 { + filtered = append(filtered, br) + } + } + return filtered, nil +} diff --git a/src/dbnode/x/xio/block_reader_test.go b/src/dbnode/x/xio/block_reader_test.go index ebc986c997..2ed8da4a8a 100644 --- a/src/dbnode/x/xio/block_reader_test.go +++ b/src/dbnode/x/xio/block_reader_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/checked" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -205,3 +206,40 @@ func TestBlockIsNotEmpty(t *testing.T) { }.IsNotEmpty()) assert.True(t, block.IsNotEmpty()) } + +func TestFilterEmptyBlockReadersSliceOfSlicesInPlace(t *testing.T) { + var ( + head = checked.NewBytes([]byte("some-data"), checked.NewBytesOptions()) + segment = ts.NewSegment(head, nil, 0) + segmentReader = NewSegmentReader(segment) + ) + notEmpty := BlockReader{ + SegmentReader: segmentReader, + } + + noneEmpty := []BlockReader{notEmpty} + someEmpty := []BlockReader{notEmpty, EmptyBlockReader} + allEmpty := []BlockReader{EmptyBlockReader} + unfiltered := [][]BlockReader{noneEmpty, someEmpty, allEmpty} + filtered, err := FilterEmptyBlockReadersSliceOfSlicesInPlace(unfiltered) + require.NoError(t, err) + require.Equal(t, 2, len(filtered)) + require.Equal(t, 1, len(filtered[0])) + require.Equal(t, 1, len(filtered[1])) +} + +func TestFilterEmptyBlockReadersInPlace(t *testing.T) { + var ( + head = checked.NewBytes([]byte("some-data"), checked.NewBytesOptions()) + segment = ts.NewSegment(head, nil, 0) + segmentReader = NewSegmentReader(segment) + ) + notEmpty := BlockReader{ + SegmentReader: segmentReader, + } + + unfiltered := []BlockReader{notEmpty, EmptyBlockReader} + filtered, err := FilterEmptyBlockReadersInPlace(unfiltered) + require.NoError(t, err) + require.Equal(t, 1, len(filtered)) +} diff --git a/src/x/time/range.go b/src/x/time/range.go index f2ccd79436..8d67189176 100644 --- a/src/x/time/range.go +++ b/src/x/time/range.go @@ -135,6 +135,26 @@ func (r Range) Subtract(other Range) []Range { return res } +// IterateForward iterates through a time range by step size in the +// forwards direction. +func (r Range) IterateForward(stepSize time.Duration, f func(t time.Time) (shouldContinue bool)) { + for t := r.Start; t.Before(r.End); t = t.Add(stepSize) { + if shouldContinue := f(t); !shouldContinue { + break + } + } +} + +// IterateBackward iterates through a time range by step size in the +// backwards direction. +func (r Range) IterateBackward(stepSize time.Duration, f func(t time.Time) (shouldContinue bool)) { + for t := r.End; t.After(r.Start); t = t.Add(-stepSize) { + if shouldContinue := f(t); !shouldContinue { + break + } + } +} + // String returns the string representation of the range. func (r Range) String() string { return fmt.Sprintf("(%v,%v)", r.Start, r.End) diff --git a/src/x/time/range_test.go b/src/x/time/range_test.go index bf735c46da..b333a834c4 100644 --- a/src/x/time/range_test.go +++ b/src/x/time/range_test.go @@ -21,6 +21,7 @@ package time import ( + "fmt" "testing" "time" @@ -338,6 +339,84 @@ func TestRangeSubtract(t *testing.T) { } } +func TestRangeIterateForward(t *testing.T) { + testCases := []struct { + r Range + stepSize time.Duration + expected []time.Time + }{ + { + r: Range{Start: time.Time{}, End: time.Time{}}, + stepSize: time.Second, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(time.Millisecond)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}}, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(time.Second)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}}, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(3 * time.Second)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}, time.Time{}.Add(time.Second), time.Time{}.Add(2 * time.Second)}, + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%s", tc.r.String()), func(t *testing.T) { + var actual []time.Time + tc.r.IterateForward(tc.stepSize, func(currStep time.Time) bool { + actual = append(actual, currStep) + return true + }) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestRangeIterateBackward(t *testing.T) { + testCases := []struct { + r Range + stepSize time.Duration + expected []time.Time + }{ + { + r: Range{Start: time.Time{}, End: time.Time{}}, + stepSize: time.Second, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(time.Millisecond)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}.Add(time.Millisecond)}, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(time.Second)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}.Add(time.Second)}, + }, + { + r: Range{Start: time.Time{}, End: time.Time{}.Add(3 * time.Second)}, + stepSize: time.Second, + expected: []time.Time{time.Time{}.Add(3 * time.Second), time.Time{}.Add(2 * time.Second), time.Time{}.Add(time.Second)}, + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%s", tc.r.String()), func(t *testing.T) { + var actual []time.Time + tc.r.IterateBackward(tc.stepSize, func(currStep time.Time) bool { + actual = append(actual, currStep) + return true + }) + require.Equal(t, tc.expected, actual) + }) + } +} + func TestRangeString(t *testing.T) { start := time.Unix(1465430400, 0).UTC() r := Range{Start: start, End: start.Add(2 * time.Hour)}