From b92c78ce5df2888c7a73abe59772530353376080 Mon Sep 17 00:00:00 2001 From: ghollins Date: Tue, 1 Dec 2020 23:54:04 -0800 Subject: [PATCH 1/2] * added basic SQS thread throttling * cleanup of AWS client resources * bumped AWS SDK library version to near latest --- .../process/initiation/aws/S3DataManager.java | 29 ++++--- .../process/initiation/aws/S3Initiator.java | 75 ++++++++++--------- .../initiation/aws/SQSDispatcherThread.java | 64 +++++++++++----- install/cws-engine/cws-engine.properties | 3 +- install/cws-ui/cws-ui.properties | 1 + pom.xml | 2 +- 6 files changed, 107 insertions(+), 67 deletions(-) diff --git a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java index 852e6882..4dfcdcf3 100644 --- a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java +++ b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java @@ -26,25 +26,30 @@ * @author ghollins, jwood, ztaylor */ public class S3DataManager { - private static final Logger log = LoggerFactory.getLogger(S3DataManager.class); + private static final Logger log = LoggerFactory.getLogger(S3DataManager.class); - public static final int OLD_SECONDS_BACK_THRESHOLD = 120; + public static final int OLD_SECONDS_BACK_THRESHOLD = 120; - private Region regionUsed; + private Region regionUsed; - private S3Client s3; + private S3Client s3; - public S3DataManager(String region) { - init(region); // sets defaults - } + public S3DataManager(String region) { + init(region); // sets defaults + } - public void init(String region) { - s3 = S3Client.builder().region(Region.of(region)).build(); - regionUsed = Region.of(region); - } + public void init(String region) { + s3 = S3Client.builder().region(Region.of(region)).build(); + regionUsed = Region.of(region); + } + public void close() { + if (s3 != null) { + s3.close(); + } + } - public S3Client getClient() { + public S3Client getClient() { return s3; } diff --git a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java index 5c5dfa60..68d85de2 100644 --- a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java +++ b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java @@ -406,46 +406,49 @@ private void scheduleIfNotAlready(String initiationTrigger, Map p protected synchronized boolean skipScheduling(Map partners) { S3DataManager s3 = new S3DataManager(aws_default_region); - // Get sorted list of s3ObjKey/etags for all partners - List eTagList = new ArrayList<>(); - for (String s3ObjKey : partners.values()) { - HeadObjectResponse metaData = s3.getObjectMetadata(s3BucketName, s3ObjKey); - if (metaData != null) { - eTagList.add(s3ObjKey + metaData.eTag()); - } - else { - log.warn("Skipping scheduling process for inputs: " + partners + - ", since they don't all exist for this initiator (" + initiatorId + ")"); - return true; // not all objects exist, so skip scheduling + try { + // Get sorted list of s3ObjKey/etags for all partners + List eTagList = new ArrayList<>(); + for (String s3ObjKey : partners.values()) { + HeadObjectResponse metaData = s3.getObjectMetadata(s3BucketName, s3ObjKey); + if (metaData != null) { + eTagList.add(s3ObjKey + metaData.eTag()); + } else { + log.warn("Skipping scheduling process for inputs: " + partners + + ", since they don't all exist for this initiator (" + initiatorId + ")"); + return true; // not all objects exist, so skip scheduling + } } - } - // hashCode is different, depending on order, so sort - Collections.sort(eTagList); + // hashCode is different, depending on order, so sort + Collections.sort(eTagList); - // get hashcode - int hashCode = ( initiatorId + eTagList.toString() ).hashCode(); - if (recentlyProcessedInputs.get(hashCode) != null) { - log.info("Skipping scheduling process for inputs: " + partners + - ", since they have been recently scheduled (" + hashCode + ") for this initiator (" + initiatorId + ") " + - "within the last " + DUPLICATE_PREVENTION_PERIOD + " seconds."); - return true; // already processed this set of inputs - } - else { - recentlyProcessedInputs.put(hashCode, hashCode); - // also add in, for each partner, a hashcode into another (new) TTL map - // Then check this map in other parts of code . - // If none are found in mem in other part of code, then schedule immediately. - // this avoids the false positive of "old". - // Also, cleanup models , like XYZ to produce new RDR versions... - log.debug("added hash code: " + hashCode + ", to recentlyProcessedInputs. " + - recentlyProcessedInputs.size() + " (initiatorId = " + initiatorId + ")"); - - for (String partner : partners.values()) { - hashCode = (initiatorId + partner).hashCode(); - individualProcessedInputs.put(hashCode, hashCode); + // get hashcode + int hashCode = (initiatorId + eTagList.toString()).hashCode(); + if (recentlyProcessedInputs.get(hashCode) != null) { + log.info("Skipping scheduling process for inputs: " + partners + + ", since they have been recently scheduled (" + hashCode + ") for this initiator (" + initiatorId + ") " + + "within the last " + DUPLICATE_PREVENTION_PERIOD + " seconds."); + return true; // already processed this set of inputs + } else { + recentlyProcessedInputs.put(hashCode, hashCode); + // also add in, for each partner, a hashcode into another (new) TTL map + // Then check this map in other parts of code . + // If none are found in mem in other part of code, then schedule immediately. + // this avoids the false positive of "old". + // Also, cleanup models , like XYZ to produce new RDR versions... + log.debug("added hash code: " + hashCode + ", to recentlyProcessedInputs. " + + recentlyProcessedInputs.size() + " (initiatorId = " + initiatorId + ")"); + + for (String partner : partners.values()) { + hashCode = (initiatorId + partner).hashCode(); + individualProcessedInputs.put(hashCode, hashCode); + } + return false; } - return false; + } + finally { + s3.close(); } } diff --git a/cws-service/src/main/java/jpl/cws/process/initiation/aws/SQSDispatcherThread.java b/cws-service/src/main/java/jpl/cws/process/initiation/aws/SQSDispatcherThread.java index b85b8ec4..fe06b0e4 100644 --- a/cws-service/src/main/java/jpl/cws/process/initiation/aws/SQSDispatcherThread.java +++ b/cws-service/src/main/java/jpl/cws/process/initiation/aws/SQSDispatcherThread.java @@ -9,6 +9,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; @@ -21,6 +22,7 @@ import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; /** * This thread subscribes to an AWS SQS URL, and when a new message arrives, @@ -56,7 +58,7 @@ public class SQSDispatcherThread extends Thread implements InitializingBean { private SqsClient sqs; private long lastClientRefreshTime; - private static final int TOKEN_REFRESH_FREQUENCY = 60 * 30 * 1000; // 30 minutes in milliseconds + private static final int TOKEN_REFRESH_FREQUENCY = 60 * 10 * 1000; // 10 minutes in milliseconds private static final Integer SQS_CLIENT_WAIT_TIME_SECONDS = 20; private Gson gson; @@ -67,9 +69,20 @@ public class SQSDispatcherThread extends Thread implements InitializingBean { private Map> dispatcherMap; static final Object dispatcherMapLock = new Object(); + // Maximum number of simultaneous threads that may be dispatched before throttling occurs. + // If this value is met, then the SQS request rate will be throttled by the given amount until the + // system can catch up. + // + @Value("${aws.sqs.dispatcher.maxThreads}") private Integer maxThreads; + + // number of threads in messageHandlerThreadExecutor running at a given moment + private AtomicInteger numberThreads = new AtomicInteger(0); + private ExecutorService messageDeleterThreadExecutor = Executors.newFixedThreadPool(10); private ExecutorService messageHandlerThreadExecutor = Executors.newFixedThreadPool(20); + private long avgMsgHandleTimeMillis = 100; + public SQSDispatcherThread() { log.debug("SQSDispatcherThread ctor..........................................."); } @@ -95,6 +108,9 @@ public void run() { log.debug("SQSDispatcherThread STARTING..."); gson = new Gson(); + // See: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html + java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); + refreshAwsClient(true); ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() @@ -121,14 +137,23 @@ public void run() { } try { + // Will throttle looping if max number of threads has been exceeded + if (numberThreads.get() > maxThreads) { + long actualThrottleMillis = (long)(1.1 * avgMsgHandleTimeMillis) * (numberThreads.get() - maxThreads); + log.warn("Throttling by {} ms ({}/{}) avgMsgHandleTime={}", actualThrottleMillis, numberThreads, maxThreads, avgMsgHandleTimeMillis); + Thread.sleep(actualThrottleMillis); + avgMsgHandleTimeMillis += 10; // backoff + continue; + } + log.trace("about to receive message..."); long t0 = System.currentTimeMillis(); - // - // FIXME: This creates a new thread that doesn't get cleaned up!! - // + refreshAwsClient(false); List messages = sqs.receiveMessage(receiveMessageRequest).messages(); long t1 = System.currentTimeMillis(); - log.debug("bufferedSqs.receiveMessage (in " + (t1 - t0) + "ms) [" + messages.size() + " messages]"); + log.debug("bufferedSqs.receiveMessage (in " + (t1 - t0) + "ms) [" + + messages.size() + " messages, " + + numberThreads.get() + " handlerThreads]"); if (messages.isEmpty()) { log.trace("GOT " + messages.size() + " MESSAGE(S)"); @@ -140,6 +165,7 @@ public void run() { // For each received message // for (Message msg : messages) { + numberThreads.incrementAndGet(); handleMessageOnSeparateThread(msg); } } @@ -219,6 +245,7 @@ public void run() { } catch (Exception e) { log.error("Unable to parse message as JSON. Deleting this message from queue, and moving on to next message...", e); deleteMessageFromQueueOnSeparateThread(msg); + numberThreads.decrementAndGet(); return; } @@ -262,16 +289,23 @@ public void run() { } } catch (Exception e) { log.error("error while processing message", e); + numberThreads.decrementAndGet(); return; } finally { deleteMessageFromQueueOnSeparateThread(msg); } - - if ((System.currentTimeMillis() - d0) > 100) { - log.debug("Handled message (in " + (System.currentTimeMillis() - d0) + " ms)"); + int curThreads = numberThreads.decrementAndGet(); + long handleDuration = (System.currentTimeMillis() - d0); + if (handleDuration > 100) { + log.debug("Handled message (in " + (System.currentTimeMillis() - d0) + " ms) " + + curThreads + " threads now active)"); } + + // keep track of avg message handling duration... + if (avgMsgHandleTimeMillis > handleDuration) { avgMsgHandleTimeMillis--; } else { avgMsgHandleTimeMillis++; } + if (avgMsgHandleTimeMillis < 30) { avgMsgHandleTimeMillis = 30; } // floor } }); @@ -320,20 +354,16 @@ private void refreshAwsClient(boolean forceRefresh) { lastClientRefreshTime == 0 || ((System.currentTimeMillis() - lastClientRefreshTime) > TOKEN_REFRESH_FREQUENCY)) { - log.debug("About to refresh AWS SQS client..."); + log.debug("About to refresh AWS SQS client..."); + + if (sqs != null) { + sqs.close(); + } sqs = SqsClient.builder() .region(Region.of(aws_default_region)) .build(); - // See: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html - log.debug("networkaddress.cache.ttl = " + java.security.Security.getProperty("networkaddress.cache.ttl")); - java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); - log.debug("networkaddress.cache.ttl = " + java.security.Security.getProperty("networkaddress.cache.ttl")); - - // Create the buffered SQS client - //bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync); - lastClientRefreshTime = System.currentTimeMillis(); // update timestamp log.debug("AWS credentials / client refreshed."); } diff --git a/install/cws-engine/cws-engine.properties b/install/cws-engine/cws-engine.properties index a8342345..3f903b99 100644 --- a/install/cws-engine/cws-engine.properties +++ b/install/cws-engine/cws-engine.properties @@ -30,4 +30,5 @@ cws.db.username=__CWS_DB_USERNAME__ cws.db.password=__CWS_DB_PASSWORD__ aws.default.region=__AWS_DEFAULT_REGION__ aws.sqs.dispatcher.sqsUrl=__AWS_SQS_DISPATCHER_SQS_URL__ -aws.sqs.dispatcher.msgFetchLimit=__AWS_SQS_DISPATCHER_MSG_FETCH_LIMIT__ \ No newline at end of file +aws.sqs.dispatcher.msgFetchLimit=__AWS_SQS_DISPATCHER_MSG_FETCH_LIMIT__ +aws.sqs.dispatcher.maxThreads=100 \ No newline at end of file diff --git a/install/cws-ui/cws-ui.properties b/install/cws-ui/cws-ui.properties index ac95cdb2..63581a55 100644 --- a/install/cws-ui/cws-ui.properties +++ b/install/cws-ui/cws-ui.properties @@ -50,3 +50,4 @@ cws.metrics.publishing.interval=__CWS_METRICS_PUBLISHING_INTERVAL__ aws.default.region=__AWS_DEFAULT_REGION__ aws.sqs.dispatcher.sqsUrl=__AWS_SQS_DISPATCHER_SQS_URL__ aws.sqs.dispatcher.msgFetchLimit=__AWS_SQS_DISPATCHER_MSG_FETCH_LIMIT__ +aws.sqs.dispatcher.maxThreads=100 \ No newline at end of file diff --git a/pom.xml b/pom.xml index a3b1b612..080bc24e 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 5.14.5 3.1.4 - 2.8.7 + 2.15.27 7.13.0 1.5 From acc5bd6a7465aab77b43d81acdad7a3780129dd2 Mon Sep 17 00:00:00 2001 From: ghollins Date: Mon, 14 Dec 2020 06:34:33 -0800 Subject: [PATCH 2/2] Delta changes per PR suggestions from ztaylor54 --- .../jpl/cws/process/initiation/aws/S3DataManager.java | 5 +++-- .../java/jpl/cws/process/initiation/aws/S3Initiator.java | 8 ++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java index 4dfcdcf3..ef037327 100644 --- a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java +++ b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3DataManager.java @@ -25,7 +25,7 @@ /** * @author ghollins, jwood, ztaylor */ -public class S3DataManager { +public class S3DataManager implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(S3DataManager.class); public static final int OLD_SECONDS_BACK_THRESHOLD = 120; @@ -43,7 +43,8 @@ public void init(String region) { regionUsed = Region.of(region); } - public void close() { + @Override + public void close() throws Exception { if (s3 != null) { s3.close(); } diff --git a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java index 68d85de2..ff701fe7 100644 --- a/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java +++ b/cws-service/src/main/java/jpl/cws/process/initiation/aws/S3Initiator.java @@ -403,10 +403,9 @@ private void scheduleIfNotAlready(String initiationTrigger, Map p * The ETag reflects changes only to the contents of an object, * not its metadata */ - protected synchronized boolean skipScheduling(Map partners) { - S3DataManager s3 = new S3DataManager(aws_default_region); + protected synchronized boolean skipScheduling(Map partners) throws Exception { - try { + try ( S3DataManager s3 = new S3DataManager(aws_default_region) ) { // Get sorted list of s3ObjKey/etags for all partners List eTagList = new ArrayList<>(); for (String s3ObjKey : partners.values()) { @@ -447,9 +446,6 @@ protected synchronized boolean skipScheduling(Map partners) { return false; } } - finally { - s3.close(); - } }