Skip to content

Commit

Permalink
[BugFix] fix the CachingFileIO had wrong length and support pin/unpin…
Browse files Browse the repository at this point in the history
… for disk cache (#18892)

Signed-off-by: zombee0 <[email protected]>
(cherry picked from commit 4a3be4c)
  • Loading branch information
zombee0 authored and wanpengfei-git committed Mar 10, 2023
1 parent 23e5b36 commit c9cca9f
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 74 deletions.
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1740,6 +1740,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static long iceberg_metadata_disk_cache_capacity = 2147483648L;

/**
* iceberg metadata disk cache expire after access
*/
@ConfField
public static long iceberg_metadata_disk_cache_expiration_seconds = 7L * 24L * 60L * 60L;

/**
* iceberg metadata cache max entry size, default 8MB
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,29 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.security.SecureRandom;

import static com.starrocks.connector.iceberg.io.IcebergCachingFileIO.METADATA_CACHE_DISK_PATH;

public class IOUtil {
private static final Logger LOG = LogManager.getLogger(IOUtil.class);
public static final String FILE_PREFIX = "file://";
public static final String FILE_SIMPLIFIED_PREFIX = "file:/";
public static final String EMPTY_STRING = "";
public static final String SLASH_STRING = "/";
public static final Configuration DEFAULT_CONF = new Configuration();

public static SecureRandom rand = new SecureRandom();

Expand Down Expand Up @@ -88,16 +97,20 @@ public static Path getLocalDiskDirPath(String localDir) {

public static OutputFile getTmpOutputFile(String localDir, String path) {
String newPath = remoteToLocalTmpFilePath(localDir, path);
return HadoopOutputFile.fromLocation(newPath, new Configuration());
return HadoopOutputFile.fromLocation(newPath, DEFAULT_CONF);
}

public static OutputFile getOutputFile(Path path) {
return HadoopOutputFile.fromPath(path, new Configuration());
return HadoopOutputFile.fromPath(path, DEFAULT_CONF);
}

public static InputFile getInputFile(Path path) {
return HadoopInputFile.fromPath(path, DEFAULT_CONF);
}

public static OutputFile getOutputFile(String localDir, String path) {
Path newPath = new Path(remoteToLocalFilePath(localDir, path));
return HadoopOutputFile.fromPath(newPath, new Configuration());
return HadoopOutputFile.fromPath(newPath, DEFAULT_CONF);
}

public static String localFileToRemote(Path localFile, String localDir) {
Expand Down Expand Up @@ -129,4 +142,30 @@ public static String remoteToLocalFilePath(String localDir, String path) {
String newPath = path.substring(path.indexOf("/"));
return Paths.get(FILE_PREFIX + localDir, prefix, newPath).toString();
}

public static void deleteLocalFileWithRemotePath(String key) {
HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) IOUtil.getOutputFile(
METADATA_CACHE_DISK_PATH, key);
try {
hadoopOutputFile.getFileSystem().delete(hadoopOutputFile.getPath(), false);
} catch (Exception e) {
LOG.warn("failed on deleting file: {}. msg: {}", hadoopOutputFile.getPath(), e);
}
}

public static void closeInputStreamIgnoreException(InputStream stream) {
try {
stream.close();
} catch (IOException e) {
//ignored
}
}

public static void closeOutputStreamIgnoreException(OutputStream stream) {
try {
stream.close();
} catch (IOException e) {
//ignored
}
}
}
Loading

0 comments on commit c9cca9f

Please sign in to comment.