Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 32: Garbage collect completed files #29

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public class FileConfig {
public final boolean exactlyOnce;
public final double transactionTimeoutMinutes;

public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, String fileType) {
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
public final long minTimeInMillisToUpdateFile;

public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, String fileType, long minTimeInMillisToUpdateFile) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -41,6 +43,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.exactlyOnce = exactlyOnce;
this.transactionTimeoutMinutes = transactionTimeoutMinutes;
this.fileType = fileType;
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final String ROUTING_KEY_KEY = "ROUTING_KEY";
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";

private final FileProcessor processor;
private final ScheduledExecutorService executor;
Expand All @@ -61,7 +62,8 @@ public FileIngestService(DeviceDriverConfig config) {
getDeleteCompletedFiles(),
getExactlyOnce(),
getTransactionTimeoutMinutes(),
config.getClassName());
config.getClassName(),
getMinTimeInMillisToUpdateFile());
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
log.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
log.info("Scope: {}", scopeName);
Expand Down Expand Up @@ -123,6 +125,10 @@ boolean getExactlyOnce() {
return Double.parseDouble(getProperty(TRANSACTION_TIMEOUT_MINUTES_KEY, Double.toString(18.0 * 60.0)));
}

long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

protected void watchFiles() {
log.trace("watchFiles: BEGIN");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.channels.FileLock;
import java.nio.file.*;
import java.sql.Connection;
import java.sql.SQLException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove .* and import only the required classes?

import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -42,13 +43,15 @@ public abstract class FileProcessor {
private final EventWriter<byte[]> writer;
private final TransactionCoordinator transactionCoordinator;
private final EventGenerator eventGenerator;
private final Path movedFilesDirectory;
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved

public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {
this.config = config;
this.state = state;
this.writer = writer;
this.transactionCoordinator = transactionCoordinator;
this.eventGenerator = getEventGenerator(config);
this.movedFilesDirectory = Paths.get(config.stateDatabaseFileName).getParent();
}

public static FileProcessor create(
Expand Down Expand Up @@ -123,8 +126,8 @@ protected void findAndRecordNewFiles() throws Exception {
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.debug("getDirectoryListing: fileSpec={}", config.fileSpec);
//Invalid files will be moved to a separate folder Failed_Files next to the database file
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
Path failedFilesDirectory = Paths.get(config.stateDatabaseFileName).getParent();
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension, failedFilesDirectory);
log.info("movedFilesDirectory: {}", movedFilesDirectory);
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension, movedFilesDirectory, config.minTimeInMillisToUpdateFile);
log.debug("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}
Expand Down Expand Up @@ -186,7 +189,9 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
final long nextSequenceNumber = result.getLeft();
final long endOffset = result.getRight();
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);

moveCompletedFile(fileNameWithBeginOffset, endOffset, nextSequenceNumber, txnId);

// injectCommitFailure();
try {
// commit fails only if Transaction is not in open state.
Expand Down Expand Up @@ -216,14 +221,31 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
}
}

void moveCompletedFile(FileNameWithOffset fileNameWithBeginOffset, long endOffset, long nextSequenceNumber, Optional<UUID> txnId) throws SQLException, IOException {
FileUtils.moveCompletedFile(fileNameWithBeginOffset, movedFilesDirectory);
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
}

void deleteCompletedFiles() throws Exception {
final List<FileNameWithOffset> completedFiles = state.getCompletedFileRecords();
completedFiles.forEach(file -> {
//Obtain a lock on file
try(FileChannel channel = FileChannel.open(Paths.get(file.fileName), StandardOpenOption.WRITE)){
Path completedFilesPath = movedFilesDirectory.resolve(FileUtils.COMPLETED_FILES);
String completedFileName = FileUtils.createCompletedFileName(completedFilesPath, file.fileName);
Path filePath = completedFilesPath.resolve(completedFileName);
if(Files.notExists(filePath)) {
try {
state.deleteCompletedFileRecord(file.fileName);
} catch (SQLException e) {
throw new RuntimeException(e);
}
log.warn("deleteCompletedFiles: File {} does not exist.", filePath);
return;
}
try(FileChannel channel = FileChannel.open(filePath, StandardOpenOption.WRITE)){
try(FileLock lock = channel.tryLock()) {
if(lock!=null){
Files.deleteIfExists(Paths.get(file.fileName));
Files.deleteIfExists(filePath);
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
log.info("deleteCompletedFiles: Deleted file {}", file.fileName);
lock.release();
// Only remove from database if we could delete file.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.pravega.sensor.collector.util;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
Expand All @@ -9,6 +10,8 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.List;
import java.util.ArrayList;

Expand All @@ -19,7 +22,8 @@ public class FileUtils {

private static final Logger log = LoggerFactory.getLogger(FileUtils.class);
final static String separator = ",";
final static String failedFiles = "Failed_Files";
public static final String FAILED_FILES = "Failed_Files";
public static final String COMPLETED_FILES = "Completed_Files";

/**
* @return list of file name and file size in bytes
Expand All @@ -29,7 +33,7 @@ public class FileUtils {
* 3. check for empty file, log the message and continue with valid files
*
*/
static public List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension, Path failedFilesDirectory) throws IOException {
static public List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension, Path movedFilesDirectory, long minTimeInMillisToUpdateFile) throws IOException {
String[] directories= fileSpec.split(separator);
List<FileNameWithOffset> directoryListing = new ArrayList<>();
for (String directory : directories) {
Expand All @@ -38,25 +42,26 @@ static public List<FileNameWithOffset> getDirectoryListing(String fileSpec, Stri
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}
getDirectoryFiles(pathSpec, fileExtension, directoryListing, failedFilesDirectory);
getDirectoryFiles(pathSpec, fileExtension, directoryListing, movedFilesDirectory, minTimeInMillisToUpdateFile);
}
return directoryListing;
}

/**
* @return get all files in directory(including subdirectories) and their respective file size in bytes
* get all files in directory(including subdirectories) and their respective file size in bytes
*/
static protected void getDirectoryFiles(Path pathSpec, String fileExtension, List<FileNameWithOffset> directoryListing, Path failedFilesDirectory) throws IOException{
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(pathSpec)){
static protected void getDirectoryFiles(Path pathSpec, String fileExtension, List<FileNameWithOffset> directoryListing, Path movedFilesDirectory, long minTimeInMillisToUpdateFile) throws IOException{
DirectoryStream.Filter<Path> lastModifiedTimeFilter = getLastModifiedTimeFilter(minTimeInMillisToUpdateFile);
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(pathSpec, lastModifiedTimeFilter)){
for(Path path: dirStream){
if(Files.isDirectory(path)) //traverse subdirectories
getDirectoryFiles(path, fileExtension, directoryListing, failedFilesDirectory);
getDirectoryFiles(path, fileExtension, directoryListing, movedFilesDirectory, minTimeInMillisToUpdateFile);
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
if(isValidFile(fileEntry, fileExtension))
directoryListing.add(fileEntry);
else //move failed file to different folder
moveFailedFile(fileEntry, failedFilesDirectory);
moveFailedFile(fileEntry, movedFilesDirectory);
}
}
} catch(Exception ex){
Expand All @@ -68,15 +73,30 @@ static protected void getDirectoryFiles(Path pathSpec, String fileExtension, Lis
throw new IOException(ex);
}
}
return;
}

/**
* The last modified time filer for files older than #{timeBefore} milliseconds from current timestamp.
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
* This filter helps to eliminate the files that are partially written in to lookup directory by external services.
*/
private static DirectoryStream.Filter<Path> getLastModifiedTimeFilter(long minTimeInMillisToUpdateFile) {
log.debug("getLastModifiedTimeFilter: minTimeInMillisToUpdateFile: {}", minTimeInMillisToUpdateFile);
return entry -> {
BasicFileAttributes attr = Files.readAttributes(entry, BasicFileAttributes.class);
if(attr.isDirectory()) {
return true;
}
FileTime fileTime = attr.lastModifiedTime();
return (fileTime.toMillis() <= (System.currentTimeMillis() - minTimeInMillisToUpdateFile));
};
}

/*
Check for below file validation
1. Is File empty
2. If extension is null or extension is valid ingest all file
*/
public static boolean isValidFile(FileNameWithOffset fileEntry, String fileExtension) throws Exception{
public static boolean isValidFile(FileNameWithOffset fileEntry, String fileExtension) {

if(fileEntry.offset<=0){
log.warn("isValidFile: Empty file {} can not be processed",fileEntry.fileName);
Expand All @@ -90,26 +110,51 @@ else if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.subst
return false;
}

static void moveFailedFile(FileNameWithOffset fileEntry, Path filesDirectory) throws IOException {
Path sourcePath = Paths.get(fileEntry.fileName);
Path targetPath = filesDirectory.resolve(FAILED_FILES).resolve(sourcePath.getFileName());
moveFile(sourcePath, targetPath);
}

public static void moveCompletedFile(FileNameWithOffset fileEntry, Path filesDirectory) throws IOException {
Path sourcePath = Paths.get(fileEntry.fileName);
Path completedFilesPath = filesDirectory.resolve(COMPLETED_FILES);
String completedFileName = FileUtils.createCompletedFileName(filesDirectory, fileEntry.fileName);
Path targetPath = completedFilesPath.resolve(completedFileName);
moveFile(sourcePath, targetPath);
}

public static String createCompletedFileName(Path completedFilesDir, String fileName) {
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
if(fileName==null || fileName.isEmpty() || completedFilesDir==null) {
return fileName;
}

int validFileNameLength = 255 - completedFilesDir.toString().length();

if(fileName.length() > validFileNameLength) {
fileName = fileName.substring(fileName.indexOf(File.separator, fileName.length() - validFileNameLength-1));
}
return fileName.replace(File.separator,"_");
}

/*
Move failed files to different directory
*/
static void moveFailedFile(FileNameWithOffset fileEntry, Path failedFilesDirectory) throws IOException {
Path targetPath = failedFilesDirectory.resolve(failedFiles);
Files.createDirectories(targetPath);
static void moveFile(Path sourcePath, Path targetPath) throws IOException {
Files.createDirectories(targetPath.getParent());
//Obtain a lock on file before moving
try(FileChannel channel = FileChannel.open(Paths.get(fileEntry.fileName), StandardOpenOption.WRITE)) {
try(FileChannel channel = FileChannel.open(sourcePath, StandardOpenOption.WRITE)) {
try(FileLock lock = channel.tryLock()) {
if(lock!=null){
Path failedFile = targetPath.resolve(Paths.get(fileEntry.fileName).getFileName());
Files.move(Paths.get(fileEntry.fileName), failedFile, StandardCopyOption.REPLACE_EXISTING);
log.info("moveFailedFiles: Moved file to {}", failedFile);
Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
log.info("movedFile: Moved file from {} to {}", sourcePath, targetPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change log level to debug

lock.release();
}
else{
log.warn("Unable to obtain lock on file {} for moving. File is locked by another process.", fileEntry.fileName);
log.warn("Unable to obtain lock on file {} for moving. File is locked by another process.", sourcePath);
throw new Exception();
}
}
}
} catch (Exception e) {
log.warn("Unable to move failed file {}", e.getMessage());
log.warn("Failed file will be moved on the next iteration.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# This file can be used to manually test RawFileIngestService.
PRAVEGA_SENSOR_COLLECTOR_RAW1_CLASS=io.pravega.sensor.collector.file.rawfile.RawFileIngestService
PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_SPEC=/home/dada/dada/pravega/pravega-sensor-collector/parquet-file-sample-data/files
PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_EXTENSION=parquet
PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE=/home/dada/dada/pravega/pravega-sensor-collector/datafile.db
PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI=tls://pravega-controller.texas-twister.ns.sdp.hop.lab.emc.com:443
PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE=test-psc
PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM=test-psc-stream
PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY=$(hostname)
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES=false
PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=false
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this param to other file properties

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setup(){
MockitoAnnotations.initMocks(this);
config = new FileConfig("tset.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0,"RawFileIngestService");
true,20.0,"RawFileIngestService", 5000);
/*writer = EventWriter.create(
clientFactory,
"writerId",
Expand Down Expand Up @@ -79,7 +79,7 @@ public void getNewFilesTest() {
@Test
public void getDirectoryListingTest() throws IOException {
final List<FileNameWithOffset> actual = FileUtils.getDirectoryListing(
"../log-file-sample-data/","csv",Paths.get("."));
"../log-file-sample-data/","csv",Paths.get("."), 5000);
log.info("actual={}", actual);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -25,7 +26,7 @@ public class ParquetEventGeneratorTests {
@Test
public void TestFile() throws IOException {
final EventGenerator eventGenerator = ParquetEventGenerator.create("routingKey1",100);
final List<FileNameWithOffset> files = FileUtils.getDirectoryListing("../parquet-file-sample-data","parquet",Paths.get("."));
final List<FileNameWithOffset> files = FileUtils.getDirectoryListing("../parquet-file-sample-data","parquet", Paths.get("."), 5000);
File parquetData= new File(files.get(0).fileName);

final CountingInputStream inputStream = new CountingInputStream(new FileInputStream(parquetData));
Expand Down
1 change: 1 addition & 0 deletions scripts/run-with-gradle-raw-file.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ export PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY=$(hostname)
export PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES=false
export PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
export PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=false
export PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000

./gradlew --no-daemon run
1 change: 1 addition & 0 deletions windows-service/PravegaSensorCollectorApp.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES" value="false" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES" value="2.0" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE" value="false" />
<env name="PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE" value="5000" />

<onfailure action="restart"/>

Expand Down
Loading