From a44c8ec31c0657e272967b8503533012847adf18 Mon Sep 17 00:00:00 2001 From: srlch Date: Thu, 9 Jan 2025 14:14:35 +0800 Subject: [PATCH] fix Signed-off-by: srlch --- .../lake/snapshot/ClusterSnapshot.java | 41 ++++++++++-- .../lake/snapshot/ClusterSnapshotJob.java | 65 ++++++------------- .../lake/snapshot/ClusterSnapshotMgr.java | 17 +++-- 3 files changed, 64 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java index 3e8d48e3eea49e..62667abc28b9da 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java @@ -23,8 +23,8 @@ public class ClusterSnapshot { public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL } - @SerializedName(value = "snapshotId") - private long snapshotId; + @SerializedName(value = "id") + private long id; @SerializedName(value = "snapshotName") private String snapshotName; @SerializedName(value = "type") @@ -42,9 +42,9 @@ public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL } public ClusterSnapshot() {} - public ClusterSnapshot(long snapshotId, String snapshotName, String storageVolumeName, long createdTime, + public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTime, long finishedTime, long feJournalId, long starMgrJournalId) { - this.snapshotId = snapshotId; + this.id = id; this.snapshotName = snapshotName; this.type = ClusterSnapshotType.AUTOMATED; this.storageVolumeName = storageVolumeName; @@ -54,20 +54,49 @@ public ClusterSnapshot(long snapshotId, String snapshotName, String storageVolum this.starMgrJournalId = starMgrJournalId; } + public void setJournalIds(long feJournalId, long starMgrJournalId) { + this.feJournalId = feJournalId; + this.starMgrJournalId = starMgrJournalId; + } + + public void setFinishedTime(long finishedTime) { + this.finishedTime = finishedTime; + } + public String getSnapshotName() { return snapshotName; } + public String getStorageVolumeName() { + return storageVolumeName; + } + + public long getCreatedTime() { + return createdTime; + } + public long getFinishedTime() { return finishedTime; } + public long getFeJournalId() { + return feJournalId; + } + + public long getStarMgrJournalId() { + return starMgrJournalId; + } + + public long getId() { + return id; + } + public TClusterSnapshotsItem getInfo() { TClusterSnapshotsItem item = new TClusterSnapshotsItem(); item.setSnapshot_name(snapshotName); item.setSnapshot_type(type.name()); - item.setCreated_time(createdTime); - item.setFinished_time(finishedTime); + item.setCreated_time(createdTime / 1000); + item.setFinished_time(finishedTime / 1000); item.setFe_jouranl_id(feJournalId); item.setStarmgr_jouranl_id(starMgrJournalId); item.setProperties(""); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java index f1131af05ceadc..b6bacf6ba70af0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java @@ -27,7 +27,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.time.Instant; public class ClusterSnapshotJob implements Writable { public static final Logger LOG = LogManager.getLogger(ClusterSnapshotJob.class); @@ -40,33 +39,15 @@ public class ClusterSnapshotJob implements Writable { */ public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINISHED, ERROR } - @SerializedName(value = "jobId") - private long jobId; - @SerializedName(value = "snapshotName") - private String snapshotName; - @SerializedName(value = "storageVolumeName") - private String storageVolumeName; - @SerializedName(value = "createdTime") - private long createdTime; - @SerializedName(value = "finishedTime") - private long finishedTime; - @SerializedName(value = "feJournalId") - private long feJournalId; - @SerializedName(value = "starMgrJournalId") - private long starMgrJournalId; + @SerializedName(value = "snapshot") + private ClusterSnapshot snapshot; @SerializedName(value = "state") private ClusterSnapshotJobState state; @SerializedName(value = "errMsg") private String errMsg; - public ClusterSnapshotJob(long jobId, String snapshotName, String storageVolumeName, long createdTime) { - this.jobId = jobId; - this.snapshotName = snapshotName; - this.storageVolumeName = storageVolumeName; - this.createdTime = createdTime; - this.finishedTime = -1; - this.feJournalId = 0; - this.starMgrJournalId = 0; + public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTime) { + this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTime, -1, 0, 0); this.state = ClusterSnapshotJobState.INITIALIZING; this.errMsg = ""; } @@ -74,13 +55,12 @@ public ClusterSnapshotJob(long jobId, String snapshotName, String storageVolumeN public void setState(ClusterSnapshotJobState state) { this.state = state; if (state == ClusterSnapshotJobState.FINISHED) { - this.finishedTime = Instant.now().getEpochSecond(); + snapshot.setFinishedTime(System.currentTimeMillis()); } } public void setJournalIds(long feJournalId, long starMgrJournalId) { - this.feJournalId = feJournalId; - this.starMgrJournalId = starMgrJournalId; + snapshot.setJournalIds(feJournalId, starMgrJournalId); } public void setErrMsg(String errMsg) { @@ -88,31 +68,31 @@ public void setErrMsg(String errMsg) { } public String getSnapshotName() { - return snapshotName; + return snapshot.getSnapshotName(); } public String getStorageVolumeName() { - return storageVolumeName; + return snapshot.getStorageVolumeName(); } public long getCreatedTime() { - return createdTime; + return snapshot.getCreatedTime(); } public long getFinishedTime() { - return finishedTime; + return snapshot.getFinishedTime(); } public long getFeJournalId() { - return feJournalId; + return snapshot.getFeJournalId(); } public long getStarMgrJournalId() { - return starMgrJournalId; + return snapshot.getStarMgrJournalId(); } - public long getJobId() { - return jobId; + public long getId() { + return snapshot.getId(); } public ClusterSnapshotJobState getState() { @@ -133,21 +113,18 @@ public void logJob() { public void addAutomatedClusterSnapshot() { if (state == ClusterSnapshotJobState.FINISHED) { - ClusterSnapshot newAutomatedClusterSnapshot = new ClusterSnapshot( - GlobalStateMgr.getCurrentState().getNextId(), snapshotName, storageVolumeName, - createdTime, finishedTime, feJournalId, starMgrJournalId); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().addAutomatedClusterSnapshot(newAutomatedClusterSnapshot); - - LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().addAutomatedClusterSnapshot(this.snapshot); + LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", + getId(), getSnapshotName()); } } public TClusterSnapshotJobsItem getInfo() { TClusterSnapshotJobsItem item = new TClusterSnapshotJobsItem(); - item.setSnapshot_name(snapshotName); - item.setJob_id(jobId); - item.setCreated_time(createdTime); - item.setFinished_time(finishedTime); + item.setSnapshot_name(getSnapshotName()); + item.setJob_id(getId()); + item.setCreated_time(getCreatedTime() / 1000); + item.setFinished_time(getFinishedTime() / 1000); item.setState(state.name()); item.setDetail_info(""); item.setError_message(errMsg); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java index 7041cbce70727b..bdb0706aed11ec 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.Instant; import java.util.Map; import java.util.TreeMap; @@ -118,16 +117,16 @@ protected void addAutomatedClusterSnapshot(ClusterSnapshot newAutomatedClusterSn } public ClusterSnapshotJob createAutomatedSnapshotJob() { - long createTime = Instant.now().getEpochSecond(); - long jobId = GlobalStateMgr.getCurrentState().getNextId(); + long createTime = System.currentTimeMillis(); + long id = GlobalStateMgr.getCurrentState().getNextId(); String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTime); String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName(); - ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotName, storageVolumeName, createTime); + ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTime); job.logJob(); addJob(job); - LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); + LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", id, snapshotName); return job; } @@ -153,7 +152,7 @@ public synchronized void addJob(ClusterSnapshotJob job) { historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) { historyAutomatedSnapshotJobs.pollFirstEntry(); } - historyAutomatedSnapshotJobs.put(job.getJobId(), job); + historyAutomatedSnapshotJobs.put(job.getId(), job); } public TClusterSnapshotJobsResponse getAllJobsInfo() { @@ -208,9 +207,9 @@ public void replayLog(ClusterSnapshotLog log) { case UPLOADING: case FINISHED: case ERROR: { - if (historyAutomatedSnapshotJobs.containsKey(job.getJobId())) { - historyAutomatedSnapshotJobs.remove(job.getJobId()); - historyAutomatedSnapshotJobs.put(job.getJobId(), job); + if (historyAutomatedSnapshotJobs.containsKey(job.getId())) { + historyAutomatedSnapshotJobs.remove(job.getId()); + historyAutomatedSnapshotJobs.put(job.getId(), job); } break; }