Skip to content

Commit

Permalink
HDDS-1551. Implement Bucket Write Requests to use Cache and DoubleBuf…
Browse files Browse the repository at this point in the history
…fer.
  • Loading branch information
bharatviswa504 committed May 22, 2019
1 parent 7221078 commit 0433ab9
Show file tree
Hide file tree
Showing 36 changed files with 1,762 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.ozone.om.protocol;

import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand Down Expand Up @@ -168,54 +166,4 @@ void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList,
*/
void applyDeleteVolume(String volume, String owner,
VolumeList newVolumeList) throws IOException;

/**
* Start Create Bucket Transaction.
* @param omBucketInfo
* @return OmBucketInfo
* @throws IOException
*/
OmBucketInfo startCreateBucket(OmBucketInfo omBucketInfo) throws IOException;

/**
* Apply Create Bucket Changes to OM DB.
* @param omBucketInfo
* @throws IOException
*/
void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException;

/**
* Start Delete Bucket Transaction.
* @param volumeName
* @param bucketName
* @throws IOException
*/
void startDeleteBucket(String volumeName, String bucketName)
throws IOException;

/**
* Apply Delete Bucket changes to OM DB.
* @param volumeName
* @param bucketName
* @throws IOException
*/
void applyDeleteBucket(String volumeName, String bucketName)
throws IOException;

/**
* Start SetBucket Property Transaction.
* @param omBucketArgs
* @return OmBucketInfo
* @throws IOException
*/
OmBucketInfo startSetBucketProperty(OmBucketArgs omBucketArgs)
throws IOException;

/**
* Apply SetBucket Property changes to OM DB.
* @param omBucketInfo
* @throws IOException
*/
void applySetBucketProperty(OmBucketInfo omBucketInfo) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ message BucketInfo {
repeated OzoneAclInfo acls = 3;
required bool isVersionEnabled = 4 [default = false];
required StorageTypeProto storageType = 5 [default = DISK];
required uint64 creationTime = 6;
optional uint64 creationTime = 6;
repeated hadoop.hdds.KeyValue metadata = 7;
optional BucketEncryptionInfoProto beinfo = 8;
}
Expand Down Expand Up @@ -490,11 +490,7 @@ message InfoBucketResponse {
}

message SetBucketPropertyRequest {
//TODO: See if we can merge bucketArgs and bucketInfo
optional BucketArgs bucketArgs = 1;
// This will be set during startTransaction, and used to apply to OM DB
// during applyTransaction.
optional BucketInfo bucketInfo = 2;
}

message SetBucketPropertyResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ public void testBucketOps() throws IOException {
Mockito.doNothing().when(mockS3Bm).deleteS3Bucket("random");
Mockito.doReturn(true).when(mockS3Bm).createOzoneVolumeIfNeeded(null);

Mockito.doReturn(null).when(mockBm).createBucket(null);
Mockito.doReturn(null).when(mockBm).createBucket(null);
Mockito.doNothing().when(mockBm).createBucket(null);
Mockito.doNothing().when(mockBm).createBucket(null);
Mockito.doNothing().when(mockBm).deleteBucket(null, null);
Mockito.doReturn(null).when(mockBm).getBucketInfo(null, null);
Mockito.doReturn(null).when(mockBm).setBucketProperty(null);
Mockito.doNothing().when(mockBm).setBucketProperty(null);
Mockito.doReturn(null).when(mockBm).listBuckets(null, null, null, 0);

HddsWhiteboxTestUtils.setInternalState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ private void createKeyTest(boolean checkSuccess) throws Exception {
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e);
"NotLeaderException", e);
}
} else {
throw e;
Expand Down Expand Up @@ -446,7 +446,7 @@ private void createVolumeTest(boolean checkSuccess) throws Exception {
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
GenericTestUtils.assertExceptionContains(
"RaftRetryFailureException", e);
"NotLeaderException", e);
}
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,7 @@ public interface BucketManager {
* Creates a bucket.
* @param bucketInfo - OmBucketInfo for creating bucket.
*/
OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException;

/**
* Apply Create Bucket changes to OM DB.
* @param omBucketInfo
* @throws IOException
*/
void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException;
void createBucket(OmBucketInfo bucketInfo) throws IOException;


/**
Expand All @@ -53,14 +46,7 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName)
* @param args - BucketArgs.
* @throws IOException
*/
OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException;

/**
* Apply SetBucket Property changes to OM DB.
* @param omBucketInfo
* @throws IOException
*/
void applySetBucketProperty(OmBucketInfo omBucketInfo) throws IOException;
void setBucketProperty(OmBucketArgs args) throws IOException;

/**
* Deletes an existing empty bucket from volume.
Expand All @@ -70,15 +56,6 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName)
*/
void deleteBucket(String volumeName, String bucketName) throws IOException;

/**
* Apply Delete Bucket changes to OM DB.
* @param volumeName
* @param bucketName
* @throws IOException
*/
void applyDeleteBucket(String volumeName, String bucketName)
throws IOException;

/**
* Returns a list of buckets represented by {@link OmBucketInfo}
* in the given volume.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ KeyProviderCryptoExtension getKMSProvider() {
* @param bucketInfo - OmBucketInfo.
*/
@Override
public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException {
public void createBucket(OmBucketInfo bucketInfo) throws IOException {
Preconditions.checkNotNull(bucketInfo);
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
Expand Down Expand Up @@ -165,11 +165,8 @@ public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException {
}

OmBucketInfo omBucketInfo = omBucketInfoBuilder.build();
if (!isRatisEnabled) {
commitCreateBucketInfoToDB(omBucketInfo);
}
commitCreateBucketInfoToDB(omBucketInfo);
LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
return omBucketInfo;
} catch (IOException | DBException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Bucket creation failed for bucket:{} in volume:{}",
Expand All @@ -182,18 +179,6 @@ public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException {
}
}


public void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException {
Preconditions.checkNotNull(omBucketInfo);
try {
commitCreateBucketInfoToDB(omBucketInfo);
} catch (IOException ex) {
LOG.error("Apply CreateBucket Failed for bucket: {}, volume: {}",
omBucketInfo.getBucketName(), omBucketInfo.getVolumeName(), ex);
throw ex;
}
}

private void commitCreateBucketInfoToDB(OmBucketInfo omBucketInfo)
throws IOException {
String dbBucketKey =
Expand Down Expand Up @@ -243,7 +228,7 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
* @throws IOException - On Failure.
*/
@Override
public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException {
public void setBucketProperty(OmBucketArgs args) throws IOException {
Preconditions.checkNotNull(args);
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
Expand Down Expand Up @@ -296,11 +281,7 @@ public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException {
bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());

OmBucketInfo omBucketInfo = bucketInfoBuilder.build();

if (!isRatisEnabled) {
commitSetBucketPropertyInfoToDB(omBucketInfo);
}
return omBucketInfo;
commitSetBucketPropertyInfoToDB(omBucketInfo);
} catch (IOException | DBException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
Expand All @@ -312,18 +293,6 @@ public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException {
}
}

public void applySetBucketProperty(OmBucketInfo omBucketInfo)
throws IOException {
try {
commitSetBucketPropertyInfoToDB(omBucketInfo);
} catch (IOException ex) {
LOG.error("Apply SetBucket property failed for bucket:{} in " +
"volume:{}", omBucketInfo.getBucketName(),
omBucketInfo.getVolumeName(), ex);
throw ex;
}
}

private void commitSetBucketPropertyInfoToDB(OmBucketInfo omBucketInfo)
throws IOException {
commitCreateBucketInfoToDB(omBucketInfo);
Expand Down Expand Up @@ -377,10 +346,7 @@ public void deleteBucket(String volumeName, String bucketName)
throw new OMException("Bucket is not empty",
OMException.ResultCodes.BUCKET_NOT_EMPTY);
}

if (!isRatisEnabled) {
commitDeleteBucketInfoToOMDB(bucketKey);
}
commitDeleteBucketInfoToOMDB(bucketKey);
} catch (IOException ex) {
if (!(ex instanceof OMException)) {
LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName,
Expand All @@ -392,20 +358,6 @@ public void deleteBucket(String volumeName, String bucketName)
}
}

public void applyDeleteBucket(String volumeName, String bucketName)
throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
try {
commitDeleteBucketInfoToOMDB(metadataManager.getBucketKey(volumeName,
bucketName));
} catch (IOException ex) {
LOG.error("Apply DeleteBucket Failed for bucket: {}, volume: {}",
bucketName, volumeName, ex);
throw ex;
}
}

private void commitDeleteBucketInfoToOMDB(String dbBucketKey)
throws IOException {
metadataManager.getBucketTable().delete(dbBucketKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);

metadataManager = new OmMetadataManagerImpl(configuration);
startRatisServer();
startRatisClient();

Expand All @@ -325,7 +326,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,

secConfig = new SecurityConfig(configuration);

metadataManager = new OmMetadataManagerImpl(configuration);
volumeManager = new VolumeManagerImpl(metadataManager, configuration);

// Create the KMS Key Provider
Expand Down Expand Up @@ -1270,7 +1270,7 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {

BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer,
omRatisClient, isRatisEnabled));
isRatisEnabled));
return startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
Expand Down Expand Up @@ -1709,67 +1709,6 @@ public void applyDeleteVolume(String volume, String owner,
volumeManager.applyDeleteVolume(volume, owner, newVolumeList);
}


@Override
public OmBucketInfo startCreateBucket(OmBucketInfo omBucketInfo)
throws IOException {
Preconditions.checkNotNull(omBucketInfo);
if(isAclEnabled) {
checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE,
omBucketInfo.getVolumeName(), omBucketInfo.getBucketName(), null);
}

return bucketManager.createBucket(omBucketInfo);
}

@Override
public void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
bucketManager.applyCreateBucket(omBucketInfo);
}


@Override
public void startDeleteBucket(String volumeName, String bucketName)
throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
if(isAclEnabled) {
checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE,
volumeName, bucketName, null);
}

bucketManager.deleteBucket(volumeName, bucketName);
}


@Override
public void applyDeleteBucket(String volumeName, String bucketName)
throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
bucketManager.applyDeleteBucket(volumeName, bucketName);
}


@Override
public OmBucketInfo startSetBucketProperty(OmBucketArgs omBucketArgs)
throws IOException {
Preconditions.checkNotNull(omBucketArgs);
// TODO: Need to add metrics and Audit log for HA requests
if(isAclEnabled) {
checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE,
omBucketArgs.getVolumeName(), omBucketArgs.getBucketName(), null);
}
return bucketManager.setBucketProperty(omBucketArgs);
}


@Override
public void applySetBucketProperty(OmBucketInfo omBucketInfo)
throws IOException {
// TODO: Need to add metrics and Audit log for HA requests
bucketManager.applySetBucketProperty(omBucketInfo);
}

/**
* Checks if current caller has acl permissions.
*
Expand Down Expand Up @@ -3037,4 +2976,8 @@ public String getComponent() {
public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return null;
}

public OMMetrics getOmMetrics() {
return metrics;
}
}
Loading

0 comments on commit 0433ab9

Please sign in to comment.