-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial cut at thread throttling / memory management #2
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -26,25 +26,30 @@ | |||||||
* @author ghollins, jwood, ztaylor | ||||||||
*/ | ||||||||
public class S3DataManager { | ||||||||
private static final Logger log = LoggerFactory.getLogger(S3DataManager.class); | ||||||||
private static final Logger log = LoggerFactory.getLogger(S3DataManager.class); | ||||||||
|
||||||||
public static final int OLD_SECONDS_BACK_THRESHOLD = 120; | ||||||||
public static final int OLD_SECONDS_BACK_THRESHOLD = 120; | ||||||||
|
||||||||
private Region regionUsed; | ||||||||
private Region regionUsed; | ||||||||
|
||||||||
private S3Client s3; | ||||||||
private S3Client s3; | ||||||||
|
||||||||
public S3DataManager(String region) { | ||||||||
init(region); // sets defaults | ||||||||
} | ||||||||
public S3DataManager(String region) { | ||||||||
init(region); // sets defaults | ||||||||
} | ||||||||
|
||||||||
public void init(String region) { | ||||||||
s3 = S3Client.builder().region(Region.of(region)).build(); | ||||||||
regionUsed = Region.of(region); | ||||||||
} | ||||||||
public void init(String region) { | ||||||||
s3 = S3Client.builder().region(Region.of(region)).build(); | ||||||||
regionUsed = Region.of(region); | ||||||||
} | ||||||||
|
||||||||
public void close() { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Add override annotation for AutoClosable interface |
||||||||
if (s3 != null) { | ||||||||
s3.close(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
public S3Client getClient() { | ||||||||
public S3Client getClient() { | ||||||||
return s3; | ||||||||
} | ||||||||
|
||||||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -406,46 +406,49 @@ private void scheduleIfNotAlready(String initiationTrigger, Map<String,String> p | |||||||
protected synchronized boolean skipScheduling(Map<String,String> partners) { | ||||||||
S3DataManager s3 = new S3DataManager(aws_default_region); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Define below in |
||||||||
|
||||||||
// Get sorted list of s3ObjKey/etags for all partners | ||||||||
List<String> eTagList = new ArrayList<>(); | ||||||||
for (String s3ObjKey : partners.values()) { | ||||||||
HeadObjectResponse metaData = s3.getObjectMetadata(s3BucketName, s3ObjKey); | ||||||||
if (metaData != null) { | ||||||||
eTagList.add(s3ObjKey + metaData.eTag()); | ||||||||
} | ||||||||
else { | ||||||||
log.warn("Skipping scheduling process for inputs: " + partners + | ||||||||
", since they don't all exist for this initiator (" + initiatorId + ")"); | ||||||||
return true; // not all objects exist, so skip scheduling | ||||||||
try { | ||||||||
// Get sorted list of s3ObjKey/etags for all partners | ||||||||
List<String> eTagList = new ArrayList<>(); | ||||||||
for (String s3ObjKey : partners.values()) { | ||||||||
HeadObjectResponse metaData = s3.getObjectMetadata(s3BucketName, s3ObjKey); | ||||||||
if (metaData != null) { | ||||||||
eTagList.add(s3ObjKey + metaData.eTag()); | ||||||||
} else { | ||||||||
log.warn("Skipping scheduling process for inputs: " + partners + | ||||||||
", since they don't all exist for this initiator (" + initiatorId + ")"); | ||||||||
return true; // not all objects exist, so skip scheduling | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// hashCode is different, depending on order, so sort | ||||||||
Collections.sort(eTagList); | ||||||||
// hashCode is different, depending on order, so sort | ||||||||
Collections.sort(eTagList); | ||||||||
|
||||||||
// get hashcode | ||||||||
int hashCode = ( initiatorId + eTagList.toString() ).hashCode(); | ||||||||
if (recentlyProcessedInputs.get(hashCode) != null) { | ||||||||
log.info("Skipping scheduling process for inputs: " + partners + | ||||||||
", since they have been recently scheduled (" + hashCode + ") for this initiator (" + initiatorId + ") " + | ||||||||
"within the last " + DUPLICATE_PREVENTION_PERIOD + " seconds."); | ||||||||
return true; // already processed this set of inputs | ||||||||
} | ||||||||
else { | ||||||||
recentlyProcessedInputs.put(hashCode, hashCode); | ||||||||
// also add in, for each partner, a hashcode into another (new) TTL map | ||||||||
// Then check this map in other parts of code . | ||||||||
// If none are found in mem in other part of code, then schedule immediately. | ||||||||
// this avoids the false positive of "old". | ||||||||
// Also, cleanup models , like XYZ to produce new RDR versions... | ||||||||
log.debug("added hash code: " + hashCode + ", to recentlyProcessedInputs. " + | ||||||||
recentlyProcessedInputs.size() + " (initiatorId = " + initiatorId + ")"); | ||||||||
|
||||||||
for (String partner : partners.values()) { | ||||||||
hashCode = (initiatorId + partner).hashCode(); | ||||||||
individualProcessedInputs.put(hashCode, hashCode); | ||||||||
// get hashcode | ||||||||
int hashCode = (initiatorId + eTagList.toString()).hashCode(); | ||||||||
if (recentlyProcessedInputs.get(hashCode) != null) { | ||||||||
log.info("Skipping scheduling process for inputs: " + partners + | ||||||||
", since they have been recently scheduled (" + hashCode + ") for this initiator (" + initiatorId + ") " + | ||||||||
"within the last " + DUPLICATE_PREVENTION_PERIOD + " seconds."); | ||||||||
return true; // already processed this set of inputs | ||||||||
} else { | ||||||||
recentlyProcessedInputs.put(hashCode, hashCode); | ||||||||
// also add in, for each partner, a hashcode into another (new) TTL map | ||||||||
// Then check this map in other parts of code . | ||||||||
// If none are found in mem in other part of code, then schedule immediately. | ||||||||
// this avoids the false positive of "old". | ||||||||
// Also, cleanup models , like XYZ to produce new RDR versions... | ||||||||
log.debug("added hash code: " + hashCode + ", to recentlyProcessedInputs. " + | ||||||||
recentlyProcessedInputs.size() + " (initiatorId = " + initiatorId + ")"); | ||||||||
|
||||||||
for (String partner : partners.values()) { | ||||||||
hashCode = (initiatorId + partner).hashCode(); | ||||||||
individualProcessedInputs.put(hashCode, hashCode); | ||||||||
} | ||||||||
return false; | ||||||||
} | ||||||||
return false; | ||||||||
} | ||||||||
finally { | ||||||||
s3.close(); | ||||||||
} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
|
||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we instead make this class AutoClosable, so that it'll get cleaned up even in sneaky places we don't expect. It also allows us to use the try with resources paradigm instead of
try...finally
.