From 7d13b75412c0d7cbbbfef10d02d5608171b3a54d Mon Sep 17 00:00:00 2001 From: xiangguangyxg Date: Thu, 12 Dec 2024 14:33:57 +0800 Subject: [PATCH] [Feature] Support restoring from a cluster snapshot for shared-data (part 1) Signed-off-by: xiangguangyxg --- bin/start_fe.sh | 7 +- build.sh | 1 + conf/restore_snapshot.yaml | 39 +++ .../main/java/com/starrocks/StarRocksFE.java | 6 + .../common/RestoreSnapshotConfig.java | 266 ++++++++++++++++++ .../starrocks/server/RestoreSnapshotMgr.java | 122 ++++++++ .../starrocks/system/SystemInfoService.java | 9 +- .../common/RestoreSnapshotConfigTest.java | 90 ++++++ .../server/RestoreSnapshotMgrTest.java | 36 +++ .../test/resources/conf/restore_snapshot.yaml | 39 +++ 10 files changed, 607 insertions(+), 8 deletions(-) create mode 100644 conf/restore_snapshot.yaml create mode 100644 fe/fe-core/src/main/java/com/starrocks/common/RestoreSnapshotConfig.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/server/RestoreSnapshotMgrTest.java create mode 100644 fe/fe-core/src/test/resources/conf/restore_snapshot.yaml diff --git a/bin/start_fe.sh b/bin/start_fe.sh index 72133eb1256aa4..bd94d5121717ec 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -25,6 +25,7 @@ OPTS=$(getopt \ -l 'daemon' \ -l 'helper:' \ -l 'host_type:' \ + -l 'restore_snapshot' \ -l 'debug' \ -l 'logconsole' \ -- "$@") @@ -34,6 +35,7 @@ eval set -- "$OPTS" RUN_DAEMON=0 HELPER= HOST_TYPE= +RESTORE_SNAPSHOT= ENABLE_DEBUGGER=0 RUN_LOG_CONSOLE=${SYS_LOG_TO_CONSOLE:-0} # min jdk version required @@ -43,6 +45,7 @@ while true; do --daemon) RUN_DAEMON=1 ; shift ;; --helper) HELPER=$2 ; shift 2 ;; --host_type) HOST_TYPE=$2 ; shift 2 ;; + --restore_snapshot) RESTORE_SNAPSHOT="-restore_snapshot" ; shift ;; --debug) ENABLE_DEBUGGER=1 ; shift ;; --logconsole) RUN_LOG_CONSOLE=1 ; shift ;; --) shift ; break ;; @@ -228,7 +231,7 @@ echo "start time: $(date), server uptime: $(uptime)" # StarRocksFE java process will write its process id into $pidfile if [ ${RUN_DAEMON} -eq 1 ]; then - nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" > { + + @Override + public Map deserialize(JsonParser parser, DeserializationContext context) + throws IOException, JsonProcessingException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + List> list = mapper.readValue(parser, + new TypeReference>>() { + }); + + Map properties = new HashMap<>(); + for (Map entry : list) { + String key = entry.get("key"); + String value = entry.get("value"); + if (key == null || key.isEmpty() || value == null || value.isEmpty()) { + throw new JsonProcessingException("Missing 'key' or 'value' in properties entry", + parser.getTokenLocation()) { + }; + } + properties.put(key, value); + } + return properties; + } + } + + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private StorageVolumeType type; + + @JsonProperty("location") + private String location; + + @JsonProperty("comment") + private String comment; + + @JsonProperty("properties") + @JsonDeserialize(using = PropertiesDeserializer.class) + private Map properties; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public StorageVolumeType getType() { + return type; + } + + public void setType(StorageVolumeType type) { + this.type = type; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + } + + @JsonProperty("frontends") + private List frontends; + + @JsonProperty("compute_nodes") + private List computeNodes; + + @JsonProperty("storage_volumes") + private List storageVolumes; + + public List getFrontends() { + return frontends; + } + + public List getComputeNodes() { + return computeNodes; + } + + public List getStorageVolumes() { + return storageVolumes; + } + + public static RestoreSnapshotConfig load(String restoreSnapshotYamlFile) { + try { + ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + RestoreSnapshotConfig config = objectMapper.readValue(new File(restoreSnapshotYamlFile), + RestoreSnapshotConfig.class); + return config; + } catch (Exception e) { + LOG.warn("Failed to load restore snapshot config {} ", restoreSnapshotYamlFile, e); + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java new file mode 100644 index 00000000000000..c7cad63559c26d --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java @@ -0,0 +1,122 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.server; + +import com.starrocks.common.Config; +import com.starrocks.common.DdlException; +import com.starrocks.common.RestoreSnapshotConfig; +import com.starrocks.ha.FrontendNodeType; +import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.Frontend; +import com.starrocks.system.SystemInfoService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class RestoreSnapshotMgr { + private static final Logger LOG = LogManager.getLogger(RestoreSnapshotMgr.class); + + private static RestoreSnapshotMgr instance; + + private RestoreSnapshotConfig config; + private boolean oldStartWithIncompleteMeta; + private boolean oldResetElectionGroup; + + private RestoreSnapshotMgr(String restoreSnapshotYamlFile) { + config = RestoreSnapshotConfig.load(restoreSnapshotYamlFile); + // Save the old config + oldStartWithIncompleteMeta = Config.start_with_incomplete_meta; + // Allow starting with only image and no log + Config.start_with_incomplete_meta = true; + // Save the old config + oldResetElectionGroup = Config.bdbje_reset_election_group; + Config.bdbje_reset_election_group = true; + } + + public static void init(String restoreSnapshotYamlFile, String[] args) { + for (String arg : args) { + if (arg.equalsIgnoreCase("-restore_snapshot")) { + LOG.info("FE start to restore from a snapshot"); + instance = new RestoreSnapshotMgr(restoreSnapshotYamlFile); + return; + } + } + } + + public static boolean isRestoring() { + return instance != null; + } + + public static void finishRestoring() throws DdlException { + RestoreSnapshotMgr self = instance; + if (self == null) { + return; + } + + try { + List frontends = self.config.getFrontends(); + if (frontends != null) { + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + // Drop old frontends + for (Frontend frontend : nodeMgr.getOtherFrontends()) { + LOG.info("Drop old frontend {}", frontend); + nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort()); + } + + // Add new frontends + for (RestoreSnapshotConfig.Frontend frontend : frontends) { + LOG.info("Add new frontend {}", frontend); + nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER, + frontend.getHost(), frontend.getEditLogPort()); + } + } + + List computeNodes = self.config.getComputeNodes(); + if (computeNodes != null) { + SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + for (Backend be : systemInfoService.getIdToBackend().values()) { + LOG.info("Drop old backend {}", be); + systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); + } + + // Drop old compute nodes + for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) { + LOG.info("Drop old compute node {}", cn); + systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + + // Add new compute nodes + for (RestoreSnapshotConfig.ComputeNode cn : computeNodes) { + LOG.info("Add new compute node {}", cn); + systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + } + + // TODO: Update storage volume + + } finally { + // Rollback config + Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta; + Config.bdbje_reset_election_group = self.oldResetElectionGroup; + + instance = null; + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 9c8912e646bc2d..faba846206a973 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -131,11 +131,6 @@ public SystemInfoService() { public void addComputeNodes(AddComputeNodeClause addComputeNodeClause) throws DdlException { - - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { - checkSameNodeExist(pair.first, pair.second); - } - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { addComputeNode(pair.first, pair.second, addComputeNodeClause.getWarehouse()); } @@ -169,7 +164,9 @@ public void dropComputeNode(ComputeNode computeNode) { } // Final entry of adding compute node - private void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + public void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + checkSameNodeExist(host, heartbeatPort); + ComputeNode newComputeNode = new ComputeNode(GlobalStateMgr.getCurrentState().getNextId(), host, heartbeatPort); idToComputeNodeRef.put(newComputeNode.getId(), newComputeNode); setComputeNodeOwner(newComputeNode); diff --git a/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java new file mode 100644 index 00000000000000..12f3165228a860 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.common; + +import org.junit.Assert; +import org.junit.Test; + +public class RestoreSnapshotConfigTest { + + @Test + public void testLoadFromFile() { + RestoreSnapshotConfig config = RestoreSnapshotConfig.load("src/test/resources/conf/restore_snapshot.yaml"); + Assert.assertEquals(2, config.getFrontends().size()); + Assert.assertEquals(2, config.getComputeNodes().size()); + Assert.assertEquals(2, config.getStorageVolumes().size()); + + RestoreSnapshotConfig.Frontend frontend1 = config.getFrontends().get(0); + Assert.assertEquals("172.26.92.1", frontend1.getHost()); + Assert.assertEquals(9010, frontend1.getEditLogPort()); + Assert.assertEquals(RestoreSnapshotConfig.Frontend.FrontendType.FOLLOWER, frontend1.getType()); + Assert.assertTrue(frontend1.isFollower()); + Assert.assertFalse(frontend1.isObserver()); + + RestoreSnapshotConfig.Frontend frontend2 = config.getFrontends().get(1); + Assert.assertEquals("172.26.92.2", frontend2.getHost()); + Assert.assertEquals(9010, frontend2.getEditLogPort()); + Assert.assertEquals(RestoreSnapshotConfig.Frontend.FrontendType.OBSERVER, frontend2.getType()); + Assert.assertFalse(frontend2.isFollower()); + Assert.assertTrue(frontend2.isObserver()); + + frontend1.toString(); + frontend1.setHost(frontend2.getHost()); + frontend1.setEditLogPort(frontend2.getEditLogPort()); + frontend1.setType(frontend2.getType()); + + RestoreSnapshotConfig.ComputeNode computeNode1 = config.getComputeNodes().get(0); + Assert.assertEquals("172.26.92.11", computeNode1.getHost()); + Assert.assertEquals(9050, computeNode1.getHeartbeatServicePort()); + + RestoreSnapshotConfig.ComputeNode computeNode2 = config.getComputeNodes().get(1); + Assert.assertEquals("172.26.92.12", computeNode2.getHost()); + Assert.assertEquals(9050, computeNode2.getHeartbeatServicePort()); + + computeNode1.toString(); + computeNode1.setHost(computeNode2.getHost()); + computeNode1.setHeartbeatServicePort(computeNode2.getHeartbeatServicePort()); + + RestoreSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0); + Assert.assertEquals("my_s3_volume", storageVolume1.getName()); + Assert.assertEquals(RestoreSnapshotConfig.StorageVolume.StorageVolumeType.S3, storageVolume1.getType()); + Assert.assertEquals("s3://defaultbucket/test/", storageVolume1.getLocation()); + Assert.assertEquals("my s3 volume", storageVolume1.getComment()); + Assert.assertEquals(4, storageVolume1.getProperties().size()); + Assert.assertEquals("us-west-2", storageVolume1.getProperties().get("aws.s3.region")); + Assert.assertEquals("https://s3.us-west-2.amazonaws.com", + storageVolume1.getProperties().get("aws.s3.endpoint")); + Assert.assertEquals("xxxxxxxxxx", storageVolume1.getProperties().get("aws.s3.access_key")); + Assert.assertEquals("yyyyyyyyyy", storageVolume1.getProperties().get("aws.s3.secret_key")); + + RestoreSnapshotConfig.StorageVolume storageVolume2 = config.getStorageVolumes().get(1); + Assert.assertEquals("my_hdfs_volume", storageVolume2.getName()); + Assert.assertEquals(RestoreSnapshotConfig.StorageVolume.StorageVolumeType.HDFS, storageVolume2.getType()); + Assert.assertEquals("hdfs://127.0.0.1:9000/sr/test/", storageVolume2.getLocation()); + Assert.assertEquals("my hdfs volume", storageVolume2.getComment()); + Assert.assertEquals(2, storageVolume2.getProperties().size()); + Assert.assertEquals("simple", storageVolume2.getProperties().get("hadoop.security.authentication")); + Assert.assertEquals("starrocks", storageVolume2.getProperties().get("username")); + + storageVolume1.setName(storageVolume2.getName()); + storageVolume1.setType(storageVolume2.getType()); + storageVolume1.setLocation(storageVolume2.getLocation()); + storageVolume1.setComment(storageVolume2.getComment()); + storageVolume1.setProperties(storageVolume2.getProperties()); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/server/RestoreSnapshotMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/server/RestoreSnapshotMgrTest.java new file mode 100644 index 00000000000000..037ececa751df2 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/server/RestoreSnapshotMgrTest.java @@ -0,0 +1,36 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.server; + +import com.starrocks.utframe.UtFrameUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RestoreSnapshotMgrTest { + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + } + + @Test + public void testRestoreSnapshotMgr() throws Exception { + RestoreSnapshotMgr.init("src/test/resources/conf/restore_snapshot.yaml", new String[] { "-restore_snapshot" }); + Assert.assertTrue(RestoreSnapshotMgr.isRestoring()); + + RestoreSnapshotMgr.finishRestoring(); + Assert.assertFalse(RestoreSnapshotMgr.isRestoring()); + } +} diff --git a/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml b/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml new file mode 100644 index 00000000000000..76bb62c37759de --- /dev/null +++ b/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml @@ -0,0 +1,39 @@ +# do not include leader fe +frontends: + - host: 172.26.92.1 + edit_log_port: 9010 + type: follower #default follower + - host: 172.26.92.2 + edit_log_port: 9010 + type: observer + +compute_nodes: + - host: 172.26.92.11 + heartbeat_service_port: 9050 + - host: 172.26.92.12 + heartbeat_service_port: 9050 + +# used for restoring a cloned snapshot +storage_volumes: + - name: my_s3_volume + type: S3 + location: s3://defaultbucket/test/ + comment: my s3 volume + properties: + - key: aws.s3.region + value: us-west-2 + - key: aws.s3.endpoint + value: https://s3.us-west-2.amazonaws.com + - key: aws.s3.access_key + value: xxxxxxxxxx + - key: aws.s3.secret_key + value: yyyyyyyyyy + - name: my_hdfs_volume + type: HDFS + location: hdfs://127.0.0.1:9000/sr/test/ + comment: my hdfs volume + properties: + - key: hadoop.security.authentication + value: simple + - key: username + value: starrocks