Skip to content

Commit

Permalink
CachedRecordStore should check if the record state is expired (#6783)
Browse files Browse the repository at this point in the history
  • Loading branch information
dannytbecker authored May 1, 2024
1 parent a8a5894 commit 881034a
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
record);
}
} else if (record.checkExpired(currentDriverTime)) {
} else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
commitRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -49,16 +56,22 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test the basic {@link MembershipStore} membership functionality.
*/
public class TestStateStoreMembershipState extends TestStateStoreBase {

private static Logger LOG = LoggerFactory.getLogger(
TestStateStoreMembershipState.class);

private static MembershipStore membershipStore;

@BeforeClass
Expand Down Expand Up @@ -529,6 +542,94 @@ public void testRegistrationExpiredAndDeletion()
}, 100, 3000);
}

@Test
public void testRegistrationExpiredRaceCondition()
throws InterruptedException, IOException, TimeoutException, ExecutionException {

// Populate the state store with a single NN element
// 1) ns0:nn0 - Expired
// Create a thread to refresh the cached records, pulling the expired record
// into the thread's memory
// Then insert an active record, and confirm that the refresh thread does not
// override the active record with the expired record it has in memory

MembershipState.setDeletionMs(-1);

MembershipState expiredReport = createRegistration(
NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
FederationNamenodeServiceState.ACTIVE);
expiredReport.setDateModified(Time.monotonicNow() - 5000);
expiredReport.setState(FederationNamenodeServiceState.EXPIRED);
assertTrue(namenodeHeartbeat(expiredReport));

// Load cache
MembershipStore memStoreSpy = spy(membershipStore);
DelayAnswer delayer = new DelayAnswer(LOG);
doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());

ExecutorService pool = Executors.newFixedThreadPool(1);

Future<Boolean> cacheRefreshFuture = pool.submit(() -> {
try {
return memStoreSpy.loadCache(true);
} catch (IOException e) {
LOG.error("Exception while loading cache:", e);
}
return false;
});

// Verify quorum and entry
MembershipState quorumEntry = getNamenodeRegistration(
expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
assertNull(quorumEntry);


MembershipState record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.EXPIRED,
record.getState());

// Insert active while the other thread refreshing it's cache
MembershipState activeReport = createRegistration(
NAMESERVICES[0], NAMENODES[0], ROUTERS[0],
FederationNamenodeServiceState.ACTIVE);

delayer.waitForCall();
assertTrue(namenodeHeartbeat(activeReport));

record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
record.getState());

quorumEntry = getExpiredNamenodeRegistration(
expiredReport.getNameserviceId(), expiredReport.getNamenodeId());
assertNull(quorumEntry);

// Allow the thread to finish refreshing the cache
delayer.proceed();
assertTrue(cacheRefreshFuture.get(5, TimeUnit.SECONDS));

// The state store should still be the active report
record = membershipStore.getDriver()
.get(MembershipState.class).getRecords().get(0);
assertNotNull(record);
assertEquals(ROUTERS[0], record.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
record.getState());

membershipStore.loadCache(true);

quorumEntry = getExpiredNamenodeRegistration(
expiredReport.getNameserviceId(),
expiredReport.getNamenodeId());
assertNull(quorumEntry);
}

@Test
public void testNamespaceInfoWithUnavailableNameNodeRegistration()
throws IOException {
Expand Down

0 comments on commit 881034a

Please sign in to comment.