diff --git a/worker/worker-manager/score-worker-manager-impl/src/main/java/io/cloudslang/worker/management/services/InBuffer.java b/worker/worker-manager/score-worker-manager-impl/src/main/java/io/cloudslang/worker/management/services/InBuffer.java index 6fc0500ce8..9a309350ad 100644 --- a/worker/worker-manager/score-worker-manager-impl/src/main/java/io/cloudslang/worker/management/services/InBuffer.java +++ b/worker/worker-manager/score-worker-manager-impl/src/main/java/io/cloudslang/worker/management/services/InBuffer.java @@ -1,12 +1,12 @@ /******************************************************************************* -* (c) Copyright 2014 Hewlett-Packard Development Company, L.P. -* All rights reserved. This program and the accompanying materials -* are made available under the terms of the Apache License v2.0 which accompany this distribution. -* -* The Apache License is available at -* http://www.apache.org/licenses/LICENSE-2.0 -* -*******************************************************************************/ + * (c) Copyright 2014 Hewlett-Packard Development Company, L.P. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License v2.0 which accompany this distribution. + * + * The Apache License is available at + * http://www.apache.org/licenses/LICENSE-2.0 + * + *******************************************************************************/ package io.cloudslang.worker.management.services; @@ -41,38 +41,41 @@ public class InBuffer implements WorkerRecoveryListener, ApplicationListener, Ru private static final Logger logger = Logger.getLogger(InBuffer.class); private final static long MEMORY_THRESHOLD = 50000000; // 50 Mega byte + private final static int MINIMUM_GC_DELTA = 10000; // minimum delta between garbage collections in milliseconds - @Autowired - private QueueDispatcherService queueDispatcher; + @Autowired + private QueueDispatcherService queueDispatcher; - @Resource - private String workerUuid; + @Resource + private String workerUuid; - @Autowired - @Qualifier("inBufferCapacity") - private Integer capacity; + @Autowired + @Qualifier("inBufferCapacity") + private Integer capacity; - @Autowired(required = false) - @Qualifier("coolDownPollingMillis") - private Integer coolDownPollingMillis = 200; + @Autowired(required = false) + @Qualifier("coolDownPollingMillis") + private Integer coolDownPollingMillis = 200; - private Thread fillBufferThread = new Thread(this); + private Thread fillBufferThread = new Thread(this); - private boolean inShutdown; + private boolean inShutdown; - private boolean endOfInit = false; + private boolean endOfInit = false; - @Autowired - private WorkerManager workerManager; + private long gcTimer = System.currentTimeMillis(); - @Autowired - private SimpleExecutionRunnableFactory simpleExecutionRunnableFactory; + @Autowired + private WorkerManager workerManager; + + @Autowired + private SimpleExecutionRunnableFactory simpleExecutionRunnableFactory; @Autowired private OutboundBuffer outBuffer; - @Autowired - private SynchronizationManager syncManager; + @Autowired + private SynchronizationManager syncManager; @Autowired(required = false) private ExecutionsActivityListener executionsActivityListener; @@ -133,7 +136,7 @@ private void fillBufferPeriodically() { syncManager.finishGetMessages(); //release all locks before going to sleep!!! - Thread.sleep(coolDownPollingMillis/8); //cool down - sleep a while + Thread.sleep(coolDownPollingMillis/8); //cool down - sleep a while } else { syncManager.finishGetMessages(); //release all locks before going to sleep!!! @@ -144,7 +147,7 @@ private void fillBufferPeriodically() { else { syncManager.finishGetMessages(); //release all locks before going to sleep!!! - Thread.sleep(coolDownPollingMillis); //if the buffer is not empty enough yet or in recovery - sleep a while + Thread.sleep(coolDownPollingMillis); //if the buffer is not empty enough yet or in recovery - sleep a while } } } catch (InterruptedException ex) { @@ -211,21 +214,21 @@ private void addExecutionMessageInner(ExecutionMessage msg) { } @Override - public void onApplicationEvent(ApplicationEvent applicationEvent) { - if (applicationEvent instanceof ContextRefreshedEvent && ! endOfInit) { - endOfInit = true; - inShutdown = false; - fillBufferThread.setName("WorkerFillBufferThread"); - fillBufferThread.start(); - } else if (applicationEvent instanceof ContextClosedEvent) { - inShutdown = true; - } - } - - @Override - public void run() { - fillBufferPeriodically(); - } + public void onApplicationEvent(ApplicationEvent applicationEvent) { + if (applicationEvent instanceof ContextRefreshedEvent && ! endOfInit) { + endOfInit = true; + inShutdown = false; + fillBufferThread.setName("WorkerFillBufferThread"); + fillBufferThread.start(); + } else if (applicationEvent instanceof ContextClosedEvent) { + inShutdown = true; + } + } + + @Override + public void run() { + fillBufferPeriodically(); + } public boolean checkFreeMemorySpace(long threshold){ double allocatedMemory = Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory(); @@ -233,6 +236,12 @@ public boolean checkFreeMemorySpace(long threshold){ boolean result = presumableFreeMemory > threshold; if (! result) { logger.warn("InBuffer would not poll messages, because there is not enough free memory."); + if (System.currentTimeMillis() > (gcTimer + MINIMUM_GC_DELTA)){ + logger.warn("Trying to initiate garbage collection"); + //Todo find a better solution than manually triggering GC + System.gc(); + gcTimer = System.currentTimeMillis(); + } } return result; }