-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
This commit will fix defect 28687 #59
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
/******************************************************************************* | ||
* (c) Copyright 2014 Hewlett-Packard Development Company, L.P. | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Apache License v2.0 which accompany this distribution. | ||
* | ||
* The Apache License is available at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*******************************************************************************/ | ||
* (c) Copyright 2014 Hewlett-Packard Development Company, L.P. | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Apache License v2.0 which accompany this distribution. | ||
* | ||
* The Apache License is available at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
*******************************************************************************/ | ||
|
||
package io.cloudslang.worker.management.services; | ||
|
||
|
@@ -41,38 +41,41 @@ public class InBuffer implements WorkerRecoveryListener, ApplicationListener, Ru | |
private static final Logger logger = Logger.getLogger(InBuffer.class); | ||
|
||
private final static long MEMORY_THRESHOLD = 50000000; // 50 Mega byte | ||
private final static int MINIMUM_GC_DELTA = 10000; // minimum delta between garbage collections in milliseconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this must be configured (optionally) by env var, so user can customize this... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
@Autowired | ||
private QueueDispatcherService queueDispatcher; | ||
@Autowired | ||
private QueueDispatcherService queueDispatcher; | ||
|
||
@Resource | ||
private String workerUuid; | ||
@Resource | ||
private String workerUuid; | ||
|
||
@Autowired | ||
@Qualifier("inBufferCapacity") | ||
private Integer capacity; | ||
@Autowired | ||
@Qualifier("inBufferCapacity") | ||
private Integer capacity; | ||
|
||
@Autowired(required = false) | ||
@Qualifier("coolDownPollingMillis") | ||
private Integer coolDownPollingMillis = 200; | ||
@Autowired(required = false) | ||
@Qualifier("coolDownPollingMillis") | ||
private Integer coolDownPollingMillis = 200; | ||
|
||
private Thread fillBufferThread = new Thread(this); | ||
private Thread fillBufferThread = new Thread(this); | ||
|
||
private boolean inShutdown; | ||
private boolean inShutdown; | ||
|
||
private boolean endOfInit = false; | ||
private boolean endOfInit = false; | ||
|
||
@Autowired | ||
private WorkerManager workerManager; | ||
private long gcTimer = System.currentTimeMillis(); | ||
|
||
@Autowired | ||
private SimpleExecutionRunnableFactory simpleExecutionRunnableFactory; | ||
@Autowired | ||
private WorkerManager workerManager; | ||
|
||
@Autowired | ||
private SimpleExecutionRunnableFactory simpleExecutionRunnableFactory; | ||
|
||
@Autowired | ||
private OutboundBuffer outBuffer; | ||
|
||
@Autowired | ||
private SynchronizationManager syncManager; | ||
@Autowired | ||
private SynchronizationManager syncManager; | ||
|
||
@Autowired(required = false) | ||
private ExecutionsActivityListener executionsActivityListener; | ||
|
@@ -133,7 +136,7 @@ private void fillBufferPeriodically() { | |
|
||
syncManager.finishGetMessages(); //release all locks before going to sleep!!! | ||
|
||
Thread.sleep(coolDownPollingMillis/8); //cool down - sleep a while | ||
Thread.sleep(coolDownPollingMillis/8); //cool down - sleep a while | ||
} | ||
else { | ||
syncManager.finishGetMessages(); //release all locks before going to sleep!!! | ||
|
@@ -144,7 +147,7 @@ private void fillBufferPeriodically() { | |
else { | ||
syncManager.finishGetMessages(); //release all locks before going to sleep!!! | ||
|
||
Thread.sleep(coolDownPollingMillis); //if the buffer is not empty enough yet or in recovery - sleep a while | ||
Thread.sleep(coolDownPollingMillis); //if the buffer is not empty enough yet or in recovery - sleep a while | ||
} | ||
} | ||
} catch (InterruptedException ex) { | ||
|
@@ -211,28 +214,34 @@ private void addExecutionMessageInner(ExecutionMessage msg) { | |
} | ||
|
||
@Override | ||
public void onApplicationEvent(ApplicationEvent applicationEvent) { | ||
if (applicationEvent instanceof ContextRefreshedEvent && ! endOfInit) { | ||
endOfInit = true; | ||
inShutdown = false; | ||
fillBufferThread.setName("WorkerFillBufferThread"); | ||
fillBufferThread.start(); | ||
} else if (applicationEvent instanceof ContextClosedEvent) { | ||
inShutdown = true; | ||
} | ||
} | ||
|
||
@Override | ||
public void run() { | ||
fillBufferPeriodically(); | ||
} | ||
public void onApplicationEvent(ApplicationEvent applicationEvent) { | ||
if (applicationEvent instanceof ContextRefreshedEvent && ! endOfInit) { | ||
endOfInit = true; | ||
inShutdown = false; | ||
fillBufferThread.setName("WorkerFillBufferThread"); | ||
fillBufferThread.start(); | ||
} else if (applicationEvent instanceof ContextClosedEvent) { | ||
inShutdown = true; | ||
} | ||
} | ||
|
||
@Override | ||
public void run() { | ||
fillBufferPeriodically(); | ||
} | ||
|
||
public boolean checkFreeMemorySpace(long threshold){ | ||
double allocatedMemory = Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory(); | ||
double presumableFreeMemory = Runtime.getRuntime().maxMemory() - allocatedMemory; | ||
boolean result = presumableFreeMemory > threshold; | ||
if (! result) { | ||
logger.warn("InBuffer would not poll messages, because there is not enough free memory."); | ||
if (System.currentTimeMillis() > (gcTimer + MINIMUM_GC_DELTA)){ | ||
logger.warn("Trying to initiate garbage collection"); | ||
//Todo find a better solution than manually triggering GC | ||
System.gc(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a log msg after the GC ended? at least in debug... |
||
gcTimer = System.currentTimeMillis(); | ||
} | ||
} | ||
return result; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this must be configured (optionally) by env var, so user can customize this..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the original inBuffer code, nothing new here, and i don't this particular parameter should be left for the user discretion