From f09a78f73fbb5bed5bac87ab86354b63bb605e5b Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Wed, 3 Apr 2019 22:50:28 -0700 Subject: [PATCH] HDDS-1339. Implement ratis snapshots on OM (#651) --- .../org/apache/hadoop/ozone/OzoneConsts.java | 3 + .../apache/hadoop/ozone/common/Storage.java | 2 +- .../org/apache/hadoop/utils/db/DBStore.java | 6 + .../org/apache/hadoop/utils/db/RDBStore.java | 11 ++ .../src/main/resources/ozone-default.xml | 13 +- .../apache/hadoop/ozone/om/OMConfigKeys.java | 6 + .../om/protocol/OzoneManagerHAProtocol.java | 10 +- .../hadoop/ozone/MiniOzoneHAClusterImpl.java | 4 + .../hadoop/ozone/om/TestOzoneManagerHA.java | 90 +++++++++++++- .../apache/hadoop/ozone/om/OzoneManager.java | 45 ++++++- .../om/ratis/OzoneManagerRatisServer.java | 28 +++-- .../om/ratis/OzoneManagerStateMachine.java | 115 +++++++++++------- 12 files changed, 265 insertions(+), 68 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 8e3b02a9507ac..3e15241e73c1e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -279,4 +279,7 @@ private OzoneConsts() { // Dummy OMNodeID for OM Clients to use for a non-HA OM setup public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy"; + + // OM Ratis snapshot file to store the last applied index + public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex"; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java index 9ad87ae740c25..f393ed9c3a4f4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java @@ -145,7 +145,7 @@ private void setNodeProperties() { * * @return the directory path */ - private File getCurrentDir() { + public File getCurrentDir() { return new File(storageDir, STORAGE_DIR_CURRENT); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 0bc30d0f4c2fd..56166ab9ffc8d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -65,6 +65,12 @@ Table getTable(String name, */ ArrayList listTables() throws IOException; + /** + * Flush the DB buffer onto persistent storage. + * @throws IOException + */ + void flush() throws IOException; + /** * Compact the entire database. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 9a7119e5c0059..5bb0fa41399ba 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -272,6 +272,17 @@ public ArrayList
listTables() throws IOException { return returnList; } + @Override + public void flush() throws IOException { + final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true); + try { + db.flush(flushOptions); + } catch (RocksDBException e) { + LOG.error("Unable to Flush RocksDB data", e); + throw toIOException("Unable to Flush RocksDB data", e); + } + } + @Override public DBCheckpoint getCheckpoint(boolean flush) { final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush); diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index cbd249a0a0efc..5580548bcccf7 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1603,18 +1603,27 @@ ozone.om.ratis.log.appender.queue.num-elements 1024 - OZONE, DEBUG, CONTAINER, RATIS + OZONE, DEBUG, OM, RATIS Number of operation pending with Raft's Log Worker. ozone.om.ratis.log.appender.queue.byte-limit 32MB - OZONE, DEBUG, CONTAINER, RATIS + OZONE, DEBUG, OM, RATIS Byte limit for Raft's Log Worker queue. + + ozone.om.ratis.snapshot.auto.trigger.threshold + 400000 + OZONE, DEBUG, OM, RATIS + The log index threshold after ratis will auto trigger + snapshot on the OM state machine. + + + ozone.om.ratis.server.request.timeout 3s diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 7b13471534832..60dde441196b6 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -136,6 +136,12 @@ private OMConfigKeys() { public static final String OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB"; + // OM Snapshot configurations + public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY + = "ozone.om.ratis.snapshot.auto.trigger.threshold"; + public static final long + OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT + = 400000; // OM Ratis server configurations public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index 8357df22e0169..f5989979cf6b9 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -36,6 +36,14 @@ */ public interface OzoneManagerHAProtocol { + /** + * Store the snapshot index i.e. the raft log index, corresponding to the + * last transaction applied to the OM RocksDB, in OM metadata dir on disk. + * @return the snapshot index + * @throws IOException + */ + long saveRatisSnapshot() throws IOException; + /** * Add a allocate block, it is assumed that the client is having an open * key session going on. This block will be appended to this open key session. @@ -56,7 +64,6 @@ public interface OzoneManagerHAProtocol { OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, KeyLocation keyLocation) throws IOException; - /** * Add the openKey entry with given keyInfo and clientID in to openKeyTable. * This will be called only from applyTransaction, once after calling @@ -81,5 +88,4 @@ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID) */ OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs, String multipartUploadID) throws IOException; - } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index f84f95ea8e792..03c2a2c590e6e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -87,6 +87,10 @@ public OzoneManager getOzoneManager(int index) { return this.ozoneManagers.get(index); } + public OzoneManager getOzoneManager(String omNodeId) { + return this.ozoneManagerMap.get(omNodeId); + } + @Override public void restartOzoneManager() throws IOException { for (OzoneManager ozoneManager : ozoneManagers) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 93c120a7a999d..06009e20803b8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -76,6 +76,7 @@ public class TestOzoneManagerHA { private String clusterId; private String scmId; private int numOfOMs = 3; + private static final long SNAPSHOT_THRESHOLD = 50; @Rule public ExpectedException exception = ExpectedException.none(); @@ -99,7 +100,9 @@ public void init() throws Exception { conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10); conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10); - + conf.setLong( + OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, + SNAPSHOT_THRESHOLD); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) .setScmId(scmId) @@ -326,9 +329,8 @@ private void createKeyTest(boolean checkSuccess) throws Exception { throw e; } } - - } + /** * Create a volume and test its attribute. */ @@ -370,8 +372,6 @@ private void createVolumeTest(boolean checkSuccess) throws Exception { } } - - /** * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the * cluster. @@ -533,4 +533,84 @@ public void testReadRequest() throws Exception { proxyProvider.getCurrentProxyOMNodeId()); } } + + @Test + public void testOMRatisSnapshot() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + retVolumeinfo.createBucket(bucketName); + OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); + + String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider() + .getCurrentProxyOMNodeId(); + OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId); + + // Send commands to ratis to increase the log index so that ratis + // triggers a snapshot on the state machine. + + long appliedLogIndex = 0; + while (appliedLogIndex <= SNAPSHOT_THRESHOLD) { + createKey(ozoneBucket); + appliedLogIndex = ozoneManager.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + } + + GenericTestUtils.waitFor(() -> { + if (ozoneManager.loadRatisSnapshotIndex() > 0) { + return true; + } + return false; + }, 1000, 100000); + + // The current lastAppliedLogIndex on the state machine should be greater + // than or equal to the saved snapshot index. + long smLastAppliedIndex = + ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex(); + long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex(); + Assert.assertTrue("LastAppliedIndex on OM State Machine (" + + smLastAppliedIndex + ") is less than the saved snapshot index(" + + ratisSnapshotIndex + ").", + smLastAppliedIndex >= ratisSnapshotIndex); + + // Add more transactions to Ratis to trigger another snapshot + while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) { + createKey(ozoneBucket); + appliedLogIndex = ozoneManager.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + } + + GenericTestUtils.waitFor(() -> { + if (ozoneManager.loadRatisSnapshotIndex() > 0) { + return true; + } + return false; + }, 1000, 100000); + + // The new snapshot index must be greater than the previous snapshot index + long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex(); + Assert.assertTrue("Latest snapshot index must be greater than previous " + + "snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex); + + } + + private void createKey(OzoneBucket ozoneBucket) throws IOException { + String keyName = "key" + RandomStringUtils.randomNumeric(5); + String data = "data" + RandomStringUtils.randomNumeric(5); + OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, + data.length(), ReplicationType.STAND_ALONE, + ReplicationFactor.ONE, new HashMap<>()); + ozoneOutputStream.write(data.getBytes(), 0, data.length()); + ozoneOutputStream.close(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index b8da717a04399..7a87b537f24b9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client; @@ -179,6 +180,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE; +import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX; import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys @@ -233,11 +235,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private RPC.Server omRpcServer; private InetSocketAddress omRpcAddress; private String omId; - private OMNodeDetails omNodeDetails; private List peerNodes; - private boolean isRatisEnabled; - private OzoneManagerRatisServer omRatisServer; - private OzoneManagerRatisClient omRatisClient; private final OMMetadataManager metadataManager; private final VolumeManager volumeManager; private final BucketManager bucketManager; @@ -266,6 +264,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private volatile boolean isOmRpcServerRunning = false; private String omComponent; + private boolean isRatisEnabled; + private OzoneManagerRatisServer omRatisServer; + private OzoneManagerRatisClient omRatisClient; + private OMNodeDetails omNodeDetails; + private final File ratisSnapshotFile; + private long snapshotIndex; + private KeyProviderCryptoExtension kmsProvider = null; private static String keyProviderUriKeyName = CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; @@ -306,6 +311,10 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, startRatisServer(); startRatisClient(); + this.ratisSnapshotFile = new File(omStorage.getCurrentDir(), + OM_RATIS_SNAPSHOT_INDEX); + this.snapshotIndex = loadRatisSnapshotIndex(); + InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress(); omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString()); @@ -1307,6 +1316,33 @@ private void startRatisClient() throws IOException { } } + @VisibleForTesting + public long loadRatisSnapshotIndex() { + if (ratisSnapshotFile.exists()) { + try { + return PersistentLongFile.readFile(ratisSnapshotFile, 0); + } catch (IOException e) { + LOG.error("Unable to read the ratis snapshot index (last applied " + + "transaction log index)", e); + } + } + return 0; + } + + @Override + public long saveRatisSnapshot() throws IOException { + snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex(); + + // Flush the OM state to disk + getMetadataManager().getStore().flush(); + + PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex); + LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}", + snapshotIndex); + + return snapshotIndex; + } + /** * Stop service. */ @@ -2103,7 +2139,6 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID, } } - @Override public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID, KeyLocation keyLocation) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 01979e4dcd267..b16f9f23ca313 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMNodeDetails; -import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys; @@ -60,7 +59,6 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.SizeInBytes; @@ -83,6 +81,7 @@ public final class OzoneManagerRatisServer { private final RaftPeerId raftPeerId; private final OzoneManagerServerProtocol ozoneManager; + private final OzoneManagerStateMachine omStateMachine; private final ClientId clientId = ClientId.randomId(); private final ScheduledExecutorService scheduledRoleChecker; @@ -130,11 +129,13 @@ private OzoneManagerRatisServer(Configuration conf, LOG.info("Instantiating OM Ratis server with GroupID: {} and " + "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2)); + this.omStateMachine = getStateMachine(); + this.server = RaftServer.newBuilder() .setServerId(this.raftPeerId) .setGroup(this.raftGroup) .setProperties(serverProperties) - .setStateMachine(getStateMachine(this.raftGroupId)) + .setStateMachine(omStateMachine) .build(); // Run a scheduler to check and update the server role on the leader @@ -156,7 +157,7 @@ public void run() { * Creates an instance of OzoneManagerRatisServer. */ public static OzoneManagerRatisServer newOMRatisServer( - Configuration ozoneConf, OzoneManager om, + Configuration ozoneConf, OzoneManagerServerProtocol omProtocol, OMNodeDetails omNodeDetails, List peerNodes) throws IOException { @@ -186,7 +187,7 @@ public static OzoneManagerRatisServer newOMRatisServer( raftPeers.add(raftPeer); } - return new OzoneManagerRatisServer(ozoneConf, om, omServiceId, + return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId, localRaftPeerId, ratisAddr, raftPeers); } @@ -197,7 +198,7 @@ public RaftGroup getRaftGroup() { /** * Returns OzoneManager StateMachine. */ - private BaseStateMachine getStateMachine(RaftGroupId gid) { + private OzoneManagerStateMachine getStateMachine() { return new OzoneManagerStateMachine(this); } @@ -382,10 +383,13 @@ private RaftProperties newRaftProperties(Configuration conf) { this.roleCheckInitialDelayMs = leaderElectionMinTimeout .toLong(TimeUnit.MILLISECONDS); - /** - * TODO: when ratis snapshots are implemented, set snapshot threshold and - * queue size. - */ + long snapshotAutoTriggerThreshold = conf.getLong( + OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, + OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT); + RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled( + properties, true); + RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold( + properties, snapshotAutoTriggerThreshold); return properties; } @@ -517,4 +521,8 @@ public static String getOMRatisDirectory(Configuration conf) { private UUID getRaftGroupIdFromOmServiceId(String omServiceId) { return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8)); } + + public long getStateMachineLastAppliedIndex() { + return omStateMachine.getLastAppliedIndex(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 420ffb5a79a06..2f3445af3066c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -69,6 +69,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final OzoneManagerServerProtocol ozoneManager; private RequestHandler handler; private RaftGroupId raftGroupId; + private long lastAppliedIndex = 0; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; @@ -95,6 +96,7 @@ public void initialize( * should be rejected. * @throws IOException thrown by the state machine while validating */ + @Override public TransactionContext startTransaction( RaftClientRequest raftClientRequest) throws IOException { ByteString messageContent = raftClientRequest.getMessage().getContent(); @@ -115,7 +117,63 @@ public TransactionContext startTransaction( return ctxt; } return handleStartTransactionRequests(raftClientRequest, omRequest); + } + /* + * Apply a committed log entry to the state machine. + */ + @Override + public CompletableFuture applyTransaction(TransactionContext trx) { + try { + OMRequest request = OMRatisHelper.convertByteStringToOMRequest( + trx.getStateMachineLogEntry().getLogData()); + long trxLogIndex = trx.getLogEntry().getIndex(); + CompletableFuture future = CompletableFuture + .supplyAsync(() -> runCommand(request, trxLogIndex)); + return future; + } catch (IOException e) { + return completeExceptionally(e); + } + } + + /** + * Query the state machine. The request must be read-only. + */ + @Override + public CompletableFuture query(Message request) { + try { + OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest( + request.getContent()); + return CompletableFuture.completedFuture(queryCommand(omRequest)); + } catch (IOException e) { + return completeExceptionally(e); + } + } + + /** + * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index + * is the log index corresponding to the last applied transaction on the OM + * State Machine. + * + * @return the last applied index on the state machine which has been + * stored in the snapshot file. + */ + @Override + public long takeSnapshot() throws IOException { + LOG.info("Saving Ratis snapshot on the OM."); + if (ozoneManager != null) { + return ozoneManager.saveRatisSnapshot(); + } + return 0; + } + + /** + * Notifies the state machine that the raft peer is no longer leader. + */ + @Override + public void notifyNotLeader(Collection pendingEntries) + throws IOException { + omRatisServer.updateServerRole(); } /** @@ -142,10 +200,8 @@ private TransactionContext handleStartTransactionRequests( .setLogData(raftClientRequest.getMessage().getContent()) .build(); } - } - private TransactionContext handleInitiateMultipartUpload( RaftClientRequest raftClientRequest, OMRequest omRequest) { @@ -237,7 +293,6 @@ private TransactionContext handleCreateKeyRequest( .build(); } - /** * Handle AllocateBlock Request, which needs a special handling. This * request needs to be executed on the leader, where it connects to SCM and @@ -250,7 +305,6 @@ private TransactionContext handleAllocateBlock( RaftClientRequest raftClientRequest, OMRequest omRequest) { OMResponse omResponse = handler.handle(omRequest); - // If request is failed, no need to proceed further. // Setting the exception with omResponse message and code. @@ -270,7 +324,6 @@ private TransactionContext handleAllocateBlock( return transactionContext; } - // Get original request OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest = omRequest.getAllocateBlockRequest(); @@ -294,7 +347,6 @@ private TransactionContext handleAllocateBlock( .setServerRole(RaftProtos.RaftPeerRole.LEADER) .setLogData(messageContent) .build(); - } /** @@ -308,56 +360,33 @@ private IOException constructExceptionForFailedRequest( STATUS_CODE + omResponse.getStatus()); } - /* - * Apply a committed log entry to the state machine. - */ - @Override - public CompletableFuture applyTransaction(TransactionContext trx) { - try { - OMRequest request = OMRatisHelper.convertByteStringToOMRequest( - trx.getStateMachineLogEntry().getLogData()); - CompletableFuture future = CompletableFuture - .supplyAsync(() -> runCommand(request)); - return future; - } catch (IOException e) { - return completeExceptionally(e); - } - } - - /** - * Query the state machine. The request must be read-only. - */ - @Override - public CompletableFuture query(Message request) { - try { - OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest( - request.getContent()); - return CompletableFuture.completedFuture(runCommand(omRequest)); - } catch (IOException e) { - return completeExceptionally(e); - } - } - /** - * Notifies the state machine that the raft peer is no longer leader. + * Submits write request to OM and returns the response Message. + * @param request OMRequest + * @return response from OM + * @throws ServiceException */ - @Override - public void notifyNotLeader(Collection pendingEntries) - throws IOException { - omRatisServer.updateServerRole(); + private Message runCommand(OMRequest request, long trxLogIndex) { + OMResponse response = handler.handle(request); + lastAppliedIndex = trxLogIndex; + return OMRatisHelper.convertResponseToMessage(response); } /** - * Submits request to OM and returns the response Message. + * Submits read request to OM and returns the response Message. * @param request OMRequest * @return response from OM * @throws ServiceException */ - private Message runCommand(OMRequest request) { + private Message queryCommand(OMRequest request) { OMResponse response = handler.handle(request); return OMRatisHelper.convertResponseToMessage(response); } + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + private static CompletableFuture completeExceptionally(Exception e) { final CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e);