diff --git a/hadoop-ssm-project/pom.xml b/hadoop-ssm-project/pom.xml
index d45d900724af3..397dd0ef8cb11 100644
--- a/hadoop-ssm-project/pom.xml
+++ b/hadoop-ssm-project/pom.xml
@@ -30,6 +30,11 @@
sqlite-jdbc
3.16.1
+
+ com.squareup
+ tape
+ 1.2.3
+
com.google.protobuf
protobuf-java
diff --git a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/AccessCountFetcher.java b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/AccessCountFetcher.java
similarity index 69%
rename from hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/AccessCountFetcher.java
rename to hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/AccessCountFetcher.java
index f99c8d7c7c4b1..e03d10df476cf 100644
--- a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/AccessCountFetcher.java
+++ b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/AccessCountFetcher.java
@@ -15,44 +15,53 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.ssm;
+package org.apache.hadoop.ssm.fetcher;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.FilesAccessInfo;
import org.apache.hadoop.ssm.sql.tables.AccessCountTableManager;
import java.io.IOException;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class AccessCountFetcher {
private static final Long DEFAULT_INTERVAL = 5 * 1000L;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Long fetchInterval;
+ private ScheduledFuture scheduledFuture;
private FetchTask fetchTask;
- private Long fetchInterval;
- private Timer timer;
public AccessCountFetcher(DFSClient client, AccessCountTableManager manager) {
this(DEFAULT_INTERVAL, client, manager);
}
public AccessCountFetcher(Long fetchInterval, DFSClient client,
- AccessCountTableManager manager) {
- this.timer = new Timer();
+ AccessCountTableManager manager) {
+ this(fetchInterval, client, manager, Executors.newSingleThreadScheduledExecutor());
+ }
+
+ public AccessCountFetcher(Long fetchInterval, DFSClient client,
+ AccessCountTableManager manager, ScheduledExecutorService service) {
this.fetchInterval = fetchInterval;
this.fetchTask = new FetchTask(client, manager);
+ this.scheduledExecutorService = service;
}
public void start() {
Long current = System.currentTimeMillis();
Long toWait = fetchInterval - (current % fetchInterval);
- timer.schedule(fetchTask, toWait, fetchInterval);
+ this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
+ fetchTask, toWait, fetchInterval, TimeUnit.MILLISECONDS);
}
public void stop() {
- this.timer.cancel();
+ this.scheduledFuture.cancel(false);
}
- private static class FetchTask extends TimerTask {
+ private static class FetchTask implements Runnable {
private final DFSClient client;
private final AccessCountTableManager manager;
diff --git a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventApplier.java b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventApplier.java
new file mode 100644
index 0000000000000..fe15fd01b587f
--- /dev/null
+++ b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventApplier.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ssm.fetcher;
+
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.ssm.sql.DBAdapter;
+
+public class InotifyEventApplier {
+ private final DBAdapter adapter;
+
+ public InotifyEventApplier(DBAdapter adapter) {
+ this.adapter = adapter;
+ }
+
+ public void apply(Event[] events) {
+
+ }
+}
diff --git a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventFetcher.java b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventFetcher.java
new file mode 100644
index 0000000000000..82276222be518
--- /dev/null
+++ b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyEventFetcher.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ssm.fetcher;
+
+import com.squareup.tape.QueueFile;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.ssm.sql.DBAdapter;
+import org.apache.hadoop.ssm.utils.EventBatchSerializer;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InotifyEventFetcher {
+ private final DFSClient client;
+ private final DBAdapter adapter;
+ private final NamespaceFetcher nameSpaceFetcher;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final InotifyEventApplier applier;
+ private ScheduledFuture inotifyFetchFuture;
+ private ScheduledFuture fetchAndApplyFuture;
+ private java.io.File inotifyFile;
+ private QueueFile queueFile;
+
+ public InotifyEventFetcher(DFSClient client, DBAdapter adapter,
+ ScheduledExecutorService service, InotifyEventApplier applier) {
+ this.client = client;
+ this.adapter = adapter;
+ this.applier = applier;
+ this.scheduledExecutorService = service;
+ this.nameSpaceFetcher = new NamespaceFetcher(client, adapter, service);
+ }
+
+ public void start() throws IOException, InterruptedException {
+ this.inotifyFile = java.io.File.createTempFile("", ".inotify");
+ this.queueFile = new QueueFile(inotifyFile);
+ this.nameSpaceFetcher.startFetch();
+ this.inotifyFetchFuture = scheduledExecutorService.scheduleAtFixedRate(
+ new InotifyFetchTask(queueFile, client), 0, 100, TimeUnit.MILLISECONDS);
+ EventApplyTask eventApplyTask = new EventApplyTask(nameSpaceFetcher, applier, queueFile);
+ eventApplyTask.start();
+ eventApplyTask.join();
+
+ long lastId = eventApplyTask.getLastId();
+ this.inotifyFetchFuture.cancel(false);
+ this.queueFile.close();
+ InotifyFetchAndApplyTask fetchAndApplyTask =
+ new InotifyFetchAndApplyTask(client, applier, lastId);
+ this.fetchAndApplyFuture = scheduledExecutorService.scheduleAtFixedRate(
+ fetchAndApplyTask, 0, 100, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ this.fetchAndApplyFuture.cancel(false);
+ }
+
+ private static class InotifyFetchTask implements Runnable {
+ private final QueueFile queueFile;
+ private AtomicLong lastId;
+ private DFSInotifyEventInputStream inotifyEventInputStream;
+
+ public InotifyFetchTask(QueueFile queueFile, DFSClient client) throws IOException {
+ this.queueFile = queueFile;
+ this.lastId = new AtomicLong(-1);
+ this.inotifyEventInputStream = client.getInotifyEventStream();
+ }
+
+ @Override
+ public void run() {
+ try {
+ EventBatch eventBatch = inotifyEventInputStream.poll();
+ while (eventBatch != null) {
+ this.queueFile.add(EventBatchSerializer.serialize(eventBatch));
+ this.lastId.getAndSet(eventBatch.getTxid());
+ eventBatch = inotifyEventInputStream.poll();
+ }
+ } catch (IOException | MissingEventsException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public long getLastId() {
+ return this.lastId.get();
+ }
+ }
+
+ private static class EventApplyTask extends Thread {
+ private final NamespaceFetcher namespaceFetcher;
+ private final InotifyEventApplier applier;
+ private final QueueFile queueFile;
+ private long lastId;
+
+ public EventApplyTask(NamespaceFetcher namespaceFetcher, InotifyEventApplier applier,
+ QueueFile queueFile) {
+ this.namespaceFetcher = namespaceFetcher;
+ this.queueFile = queueFile;
+ this.applier = applier;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ if (!namespaceFetcher.fetchFinished()) {
+ Thread.sleep(100);
+ } else {
+ while (!queueFile.isEmpty()) {
+ EventBatch batch = EventBatchSerializer.deserialize(queueFile.peek());
+ this.applier.apply(batch.getEvents());
+ this.lastId = batch.getTxid();
+ }
+ }
+ }
+ } catch (InterruptedException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public long getLastId() {
+ return this.lastId;
+ }
+ }
+}
diff --git a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyFetchAndApplyTask.java b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyFetchAndApplyTask.java
new file mode 100644
index 0000000000000..d0ce8f262ce93
--- /dev/null
+++ b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/InotifyFetchAndApplyTask.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ssm.fetcher;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InotifyFetchAndApplyTask implements Runnable {
+ private final AtomicLong lastId;
+ private final InotifyEventApplier applier;
+ private DFSInotifyEventInputStream inotifyEventInputStream;
+
+ public InotifyFetchAndApplyTask(DFSClient client, InotifyEventApplier applier, long startId)
+ throws IOException {
+ this.applier = applier;
+ this.lastId = new AtomicLong(-1);
+ this.inotifyEventInputStream = client.getInotifyEventStream(startId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ EventBatch eventBatch = inotifyEventInputStream.poll();
+ while (eventBatch != null) {
+ this.applier.apply(eventBatch.getEvents());
+ this.lastId.getAndSet(eventBatch.getTxid());
+ eventBatch = inotifyEventInputStream.poll();
+ }
+ } catch (IOException | MissingEventsException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public long getLastId() {
+ return this.lastId.get();
+ }
+}
diff --git a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/NamespaceFetcher.java b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/NamespaceFetcher.java
similarity index 78%
rename from hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/NamespaceFetcher.java
rename to hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/NamespaceFetcher.java
index b8126d4707d76..2bf7118c240e2 100644
--- a/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/NamespaceFetcher.java
+++ b/hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/fetcher/NamespaceFetcher.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.ssm;
+package org.apache.hadoop.ssm.fetcher;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -27,31 +27,47 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
public class NamespaceFetcher {
private static final Long DEFAULT_INTERVAL = 1000L;
+
+ private final ScheduledExecutorService scheduledExecutorService;
private final long fetchInterval;
+ private ScheduledFuture fetchTaskFuture;
+ private ScheduledFuture consumerFuture;
private FileStatusConsumer consumer;
private FetchTask fetchTask;
- private Timer timer;
public NamespaceFetcher(DFSClient client, DBAdapter adapter) {
this(client, adapter, DEFAULT_INTERVAL);
}
+ public NamespaceFetcher(DFSClient client, DBAdapter adapter, ScheduledExecutorService service) {
+ this(client, adapter, DEFAULT_INTERVAL, service);
+ }
+
public NamespaceFetcher(DFSClient client, DBAdapter adapter, long fetchInterval) {
- this.timer = new Timer();
+ this(client, adapter, fetchInterval, Executors.newSingleThreadScheduledExecutor());
+ }
+
+ public NamespaceFetcher(DFSClient client, DBAdapter adapter, long fetchInterval,
+ ScheduledExecutorService service) {
this.fetchTask = new FetchTask(client);
this.consumer = new FileStatusConsumer(adapter, fetchTask);
this.fetchInterval = fetchInterval;
+ this.scheduledExecutorService = service;
}
public void startFetch() throws IOException {
- this.timer.schedule(fetchTask, 0, fetchInterval);
- this.consumer.start();
+ this.fetchTaskFuture = this.scheduledExecutorService.scheduleAtFixedRate(
+ fetchTask, 0, fetchInterval, TimeUnit.MILLISECONDS);
+ this.consumerFuture = this.scheduledExecutorService.scheduleAtFixedRate(
+ consumer, 0, 100, TimeUnit.MILLISECONDS);
}
public boolean fetchFinished() {
@@ -59,11 +75,11 @@ public boolean fetchFinished() {
}
public void stop() {
- this.timer.cancel();
- this.consumer.interrupt();
+ this.fetchTaskFuture.cancel(false);
+ this.consumerFuture.cancel(false);
}
- private static class FetchTask extends TimerTask {
+ private static class FetchTask implements Runnable {
private final static int DEFAULT_BATCH_SIZE = 20;
private final static String ROOT = "/";
private final HdfsFileStatus[] EMPTY_STATUS = new HdfsFileStatus[0];
@@ -165,7 +181,7 @@ private HdfsFileStatus[] listStatus(String src) throws IOException {
}
}
- private static class FileStatusConsumer extends Thread {
+ private static class FileStatusConsumer implements Runnable {
private final DBAdapter dbAdapter;
private final FetchTask fetchTask;
@@ -176,24 +192,16 @@ protected FileStatusConsumer(DBAdapter dbAdapter, FetchTask fetchTask) {
@Override
public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- FileStatusInternalBatch batch = fetchTask.pollBatch();
- if (batch != null) {
- FileStatusInternal[] statuses = batch.getFileStatuses();
- if (statuses.length == batch.actualSize()) {
- this.dbAdapter.insertFiles(batch.getFileStatuses());
- } else {
- FileStatusInternal[] actual = new FileStatusInternal[batch.actualSize()];
- System.arraycopy(statuses, 0, actual, 0, batch.actualSize());
- this.dbAdapter.insertFiles(actual);
- }
- } else {
- Thread.sleep(100);
- }
+ FileStatusInternalBatch batch = fetchTask.pollBatch();
+ if (batch != null) {
+ FileStatusInternal[] statuses = batch.getFileStatuses();
+ if (statuses.length == batch.actualSize()) {
+ this.dbAdapter.insertFiles(batch.getFileStatuses());
+ } else {
+ FileStatusInternal[] actual = new FileStatusInternal[batch.actualSize()];
+ System.arraycopy(statuses, 0, actual, 0, batch.actualSize());
+ this.dbAdapter.insertFiles(actual);
}
- } catch (InterruptedException e) {
- e.printStackTrace();
}
}
}
diff --git a/hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/TestNamespaceFetcher.java b/hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/fetcher/TestNamespaceFetcher.java
similarity index 94%
rename from hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/TestNamespaceFetcher.java
rename to hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/fetcher/TestNamespaceFetcher.java
index 8ee520e0bbd74..eec9f0a84e428 100644
--- a/hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/TestNamespaceFetcher.java
+++ b/hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/fetcher/TestNamespaceFetcher.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.ssm;
+package org.apache.hadoop.ssm.fetcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -23,6 +23,8 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.ssm.SSMConfiguration;
import org.apache.hadoop.ssm.sql.DBAdapter;
import org.apache.hadoop.ssm.sql.FileStatusInternal;
import org.junit.Test;
@@ -58,7 +60,7 @@ public boolean matches(Object o) {
}
@Test
- public void testNamespaceFetcher() throws IOException, InterruptedException {
+ public void testNamespaceFetcher() throws IOException, InterruptedException, MissingEventsException {
final Configuration conf = new SSMConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();