Skip to content

Commit

Permalink
HDDS-1339. Implement ratis snapshots on OM (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanisha Koneru authored Apr 4, 2019
1 parent 7b5b783 commit f09a78f
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,7 @@ private OzoneConsts() {

// Dummy OMNodeID for OM Clients to use for a non-HA OM setup
public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";

// OM Ratis snapshot file to store the last applied index
public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex";
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void setNodeProperties() {
*
* @return the directory path
*/
private File getCurrentDir() {
public File getCurrentDir() {
return new File(storageDir, STORAGE_DIR_CURRENT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
*/
ArrayList<Table> listTables() throws IOException;

/**
* Flush the DB buffer onto persistent storage.
* @throws IOException
*/
void flush() throws IOException;

/**
* Compact the entire database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ public ArrayList<Table> listTables() throws IOException {
return returnList;
}

@Override
public void flush() throws IOException {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true);
try {
db.flush(flushOptions);
} catch (RocksDBException e) {
LOG.error("Unable to Flush RocksDB data", e);
throw toIOException("Unable to Flush RocksDB data", e);
}
}

@Override
public DBCheckpoint getCheckpoint(boolean flush) {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush);
Expand Down
13 changes: 11 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1603,18 +1603,27 @@
<property>
<name>ozone.om.ratis.log.appender.queue.num-elements</name>
<value>1024</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Number of operation pending with Raft's Log Worker.
</description>
</property>
<property>
<name>ozone.om.ratis.log.appender.queue.byte-limit</name>
<value>32MB</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>

<property>
<name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
<value>400000</value>
<tag>OZONE, DEBUG, OM, RATIS</tag>
<description>The log index threshold after ratis will auto trigger
snapshot on the OM state machine.
</description>
</property>

<property>
<name>ozone.om.ratis.server.request.timeout</name>
<value>3s</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ private OMConfigKeys() {
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";

// OM Snapshot configurations
public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
= "ozone.om.ratis.snapshot.auto.trigger.threshold";
public static final long
OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT
= 400000;

// OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
*/
public interface OzoneManagerHAProtocol {

/**
* Store the snapshot index i.e. the raft log index, corresponding to the
* last transaction applied to the OM RocksDB, in OM metadata dir on disk.
* @return the snapshot index
* @throws IOException
*/
long saveRatisSnapshot() throws IOException;

/**
* Add a allocate block, it is assumed that the client is having an open
* key session going on. This block will be appended to this open key session.
Expand All @@ -56,7 +64,6 @@ public interface OzoneManagerHAProtocol {
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException;


/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
Expand All @@ -81,5 +88,4 @@ void applyOpenKey(KeyArgs omKeyArgs, KeyInfo keyInfo, long clientID)
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public OzoneManager getOzoneManager(int index) {
return this.ozoneManagers.get(index);
}

public OzoneManager getOzoneManager(String omNodeId) {
return this.ozoneManagerMap.get(omNodeId);
}

@Override
public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class TestOzoneManagerHA {
private String clusterId;
private String scmId;
private int numOfOMs = 3;
private static final long SNAPSHOT_THRESHOLD = 50;

@Rule
public ExpectedException exception = ExpectedException.none();
Expand All @@ -99,7 +100,9 @@ public void init() throws Exception {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);

conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
Expand Down Expand Up @@ -326,9 +329,8 @@ private void createKeyTest(boolean checkSuccess) throws Exception {
throw e;
}
}


}

/**
* Create a volume and test its attribute.
*/
Expand Down Expand Up @@ -370,8 +372,6 @@ private void createVolumeTest(boolean checkSuccess) throws Exception {
}
}



/**
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.
Expand Down Expand Up @@ -533,4 +533,84 @@ public void testReadRequest() throws Exception {
proxyProvider.getCurrentProxyOMNodeId());
}
}

@Test
public void testOMRatisSnapshot() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);

retVolumeinfo.createBucket(bucketName);
OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);

String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
.getCurrentProxyOMNodeId();
OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);

// Send commands to ratis to increase the log index so that ratis
// triggers a snapshot on the state machine.

long appliedLogIndex = 0;
while (appliedLogIndex <= SNAPSHOT_THRESHOLD) {
createKey(ozoneBucket);
appliedLogIndex = ozoneManager.getOmRatisServer()
.getStateMachineLastAppliedIndex();
}

GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
return true;
}
return false;
}, 1000, 100000);

// The current lastAppliedLogIndex on the state machine should be greater
// than or equal to the saved snapshot index.
long smLastAppliedIndex =
ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex();
long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
Assert.assertTrue("LastAppliedIndex on OM State Machine ("
+ smLastAppliedIndex + ") is less than the saved snapshot index("
+ ratisSnapshotIndex + ").",
smLastAppliedIndex >= ratisSnapshotIndex);

// Add more transactions to Ratis to trigger another snapshot
while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) {
createKey(ozoneBucket);
appliedLogIndex = ozoneManager.getOmRatisServer()
.getStateMachineLastAppliedIndex();
}

GenericTestUtils.waitFor(() -> {
if (ozoneManager.loadRatisSnapshotIndex() > 0) {
return true;
}
return false;
}, 1000, 100000);

// The new snapshot index must be greater than the previous snapshot index
long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex();
Assert.assertTrue("Latest snapshot index must be greater than previous " +
"snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex);

}

private void createKey(OzoneBucket ozoneBucket) throws IOException {
String keyName = "key" + RandomStringUtils.randomNumeric(5);
String data = "data" + RandomStringUtils.randomNumeric(5);
OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
data.length(), ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, new HashMap<>());
ozoneOutputStream.write(data.getBytes(), 0, data.length());
ozoneOutputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
Expand Down Expand Up @@ -179,6 +180,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;

import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys
Expand Down Expand Up @@ -233,11 +235,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
private OMNodeDetails omNodeDetails;
private List<OMNodeDetails> peerNodes;
private boolean isRatisEnabled;
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerRatisClient omRatisClient;
private final OMMetadataManager metadataManager;
private final VolumeManager volumeManager;
private final BucketManager bucketManager;
Expand Down Expand Up @@ -266,6 +264,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private volatile boolean isOmRpcServerRunning = false;
private String omComponent;

private boolean isRatisEnabled;
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerRatisClient omRatisClient;
private OMNodeDetails omNodeDetails;
private final File ratisSnapshotFile;
private long snapshotIndex;

private KeyProviderCryptoExtension kmsProvider = null;
private static String keyProviderUriKeyName =
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
Expand Down Expand Up @@ -306,6 +311,10 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
startRatisServer();
startRatisClient();

this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
OM_RATIS_SNAPSHOT_INDEX);
this.snapshotIndex = loadRatisSnapshotIndex();

InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());

Expand Down Expand Up @@ -1307,6 +1316,33 @@ private void startRatisClient() throws IOException {
}
}

@VisibleForTesting
public long loadRatisSnapshotIndex() {
if (ratisSnapshotFile.exists()) {
try {
return PersistentLongFile.readFile(ratisSnapshotFile, 0);
} catch (IOException e) {
LOG.error("Unable to read the ratis snapshot index (last applied " +
"transaction log index)", e);
}
}
return 0;
}

@Override
public long saveRatisSnapshot() throws IOException {
snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();

// Flush the OM state to disk
getMetadataManager().getStore().flush();

PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
snapshotIndex);

return snapshotIndex;
}

/**
* Stop service.
*/
Expand Down Expand Up @@ -2103,7 +2139,6 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
}
}


@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException {
Expand Down
Loading

0 comments on commit f09a78f

Please sign in to comment.