diff --git a/src/main/java/com/pinterest/secor/common/FileRegistry.java b/src/main/java/com/pinterest/secor/common/FileRegistry.java index 3b3d68c70..0f6d0f5ed 100644 --- a/src/main/java/com/pinterest/secor/common/FileRegistry.java +++ b/src/main/java/com/pinterest/secor/common/FileRegistry.java @@ -139,11 +139,6 @@ public FileWriter getOrCreateWriter(LogFilePath path, CompressionCodec codec) mWriters.put(path, writer); mCreationTimes.put(path, System.currentTimeMillis() / 1000L); LOG.debug("created writer for path {}", path.getLogFilePath()); - LOG.debug("Register deleteOnExit for path {}", path.getLogFilePath()); - FileUtil.deleteOnExit(path.getLogFileParentDir()); - FileUtil.deleteOnExit(path.getLogFileDir()); - FileUtil.deleteOnExit(path.getLogFilePath()); - FileUtil.deleteOnExit(path.getLogFileCrcPath()); } return writer; } diff --git a/src/main/java/com/pinterest/secor/common/ShutdownHookRegistry.java b/src/main/java/com/pinterest/secor/common/ShutdownHookRegistry.java new file mode 100644 index 000000000..82b384b4d --- /dev/null +++ b/src/main/java/com/pinterest/secor/common/ShutdownHookRegistry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.pinterest.secor.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for shutdown hooks. + * Allows running shutdown hooks by specific order by executing multiple Runnables on single shutdown hook. + * + * @author Paulius Dambrauskas (p.dambrauskas@gmail.com) + * + */ +public final class ShutdownHookRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ShutdownHookRegistry.class); + private static final Map> HOOKS = new ConcurrentHashMap<>(); + + private ShutdownHookRegistry() { + // static class cannot be initiated + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(ShutdownHookRegistry::runHooks)); + } + + public static void registerHook(int priority, Runnable hook) { + HOOKS.computeIfAbsent(priority, key -> new ArrayList<>()).add(hook); + LOG.info("Shut down hook with priority {} added to shut down hook registry", priority); + } + + public static void runHooks() { + HOOKS.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { + LOG.info("Running hooks for priority {}", entry.getKey()); + entry.getValue().parallelStream().forEach(Runnable::run); + }); + } + + public static void clear() { + HOOKS.clear(); + } +} diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index d6731019c..2a11bbef5 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -18,10 +18,7 @@ */ package com.pinterest.secor.consumer; -import com.pinterest.secor.common.DeterministicUploadPolicyTracker; -import com.pinterest.secor.common.FileRegistry; -import com.pinterest.secor.common.OffsetTracker; -import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; import com.pinterest.secor.message.ParsedMessage; import com.pinterest.secor.monitoring.MetricCollector; @@ -116,7 +113,7 @@ private void init() throws Exception { if (mDeterministicUploadPolicyTracker != null) { throw new RuntimeException("Can't set secor.upload.on.shutdown with secor.upload.deterministic!"); } - Runtime.getRuntime().addShutdownHook(this.new FinalUploadShutdownHook()); + ShutdownHookRegistry.registerHook(1, new FinalUploadShutdownHook()); } } diff --git a/src/main/java/com/pinterest/secor/io/StagingDirectoryCleaner.java b/src/main/java/com/pinterest/secor/io/StagingDirectoryCleaner.java new file mode 100644 index 000000000..2293cebc2 --- /dev/null +++ b/src/main/java/com/pinterest/secor/io/StagingDirectoryCleaner.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.pinterest.secor.io; + +import java.io.File; +import java.io.IOException; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Runnable used to delete staging folder content. + * Deletes folders content, while keeping folder itself. + * + * @author Paulius Dambrauskas (p.dambrauskas@gmail.com) + * + */ +public class StagingDirectoryCleaner implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(StagingDirectoryCleaner.class); + + private final File mStagingDir; + + public StagingDirectoryCleaner(String stagingPath) { + this.mStagingDir = new File(stagingPath); + } + + @Override + public void run() { + try { + FileUtils.deleteDirectory(this.mStagingDir); + } catch (IOException e) { + LOG.error("Failed deleting file", e); + } + } +} diff --git a/src/main/java/com/pinterest/secor/main/ConsumerMain.java b/src/main/java/com/pinterest/secor/main/ConsumerMain.java index c3b60acc0..5ac41c4d9 100644 --- a/src/main/java/com/pinterest/secor/main/ConsumerMain.java +++ b/src/main/java/com/pinterest/secor/main/ConsumerMain.java @@ -20,9 +20,12 @@ import com.pinterest.secor.common.OstrichAdminService; import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.ShutdownHookRegistry; import com.pinterest.secor.consumer.Consumer; +import com.pinterest.secor.io.StagingDirectoryCleaner; import com.pinterest.secor.tools.LogFileDeleter; import com.pinterest.secor.util.FileUtil; +import com.pinterest.secor.util.IdUtil; import com.pinterest.secor.util.RateLimitUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +57,8 @@ public static void main(String[] args) { } try { SecorConfig config = SecorConfig.load(); + String stagingDirectoryPath = config.getLocalPath() + '/' + IdUtil.getLocalMessageDir(); + ShutdownHookRegistry.registerHook(10, new StagingDirectoryCleaner(stagingDirectoryPath)); OstrichAdminService ostrichService = new OstrichAdminService(config.getOstrichPort()); ostrichService.start(); FileUtil.configure(config); diff --git a/src/main/java/com/pinterest/secor/util/FileUtil.java b/src/main/java/com/pinterest/secor/util/FileUtil.java index de8d18e7f..a81654a91 100644 --- a/src/main/java/com/pinterest/secor/util/FileUtil.java +++ b/src/main/java/com/pinterest/secor/util/FileUtil.java @@ -198,11 +198,6 @@ public static void delete(String path) throws IOException { } } - public static void deleteOnExit(String path) { - File file = new File(path); - file.deleteOnExit(); - } - public static void moveToCloud(String srcLocalPath, String dstCloudPath) throws IOException { Path srcPath = new Path(srcLocalPath); Path dstPath = new Path(dstCloudPath); diff --git a/src/test/java/com/pinterest/secor/common/ShutdownHookRegistryTest.java b/src/test/java/com/pinterest/secor/common/ShutdownHookRegistryTest.java new file mode 100644 index 000000000..7c9962f97 --- /dev/null +++ b/src/test/java/com/pinterest/secor/common/ShutdownHookRegistryTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.pinterest.secor.common; + +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ShutdownHookRegistryTest { + + @After + public void cleanup() { + ShutdownHookRegistry.clear(); + } + + @Test + public void testHookExecutionOrder() { + List results = new ArrayList<>(); + ShutdownHookRegistry.registerHook(9, () -> results.add("priority9")); + ShutdownHookRegistry.registerHook(1, () -> results.add("priority1")); + + ShutdownHookRegistry.runHooks(); + + assertEquals("priority1", results.get(0)); + assertEquals("priority9", results.get(1)); + } + +} \ No newline at end of file diff --git a/src/test/java/com/pinterest/secor/io/StagingDirectoryCleanerTest.java b/src/test/java/com/pinterest/secor/io/StagingDirectoryCleanerTest.java new file mode 100644 index 000000000..0e39e4579 --- /dev/null +++ b/src/test/java/com/pinterest/secor/io/StagingDirectoryCleanerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.pinterest.secor.io; + +import java.io.IOException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.*; + +public class StagingDirectoryCleanerTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testCleanFolderContent() throws IOException { + // Given + folder.newFolder("foo"); + folder.newFolder("bar"); + + // When + new StagingDirectoryCleaner(folder.getRoot().getPath()).run(); + + // Then + assertFalse(folder.getRoot().exists()); + } +}