Skip to content

Commit

Permalink
add: OapFileSystemType
Browse files Browse the repository at this point in the history
  • Loading branch information
galaxina committed Sep 4, 2023
1 parent 979a80b commit 47f06c1
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 9 deletions.
57 changes: 57 additions & 0 deletions oap-hadoop/src/main/java/oap/hadoop/OapFileSystemType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package oap.hadoop;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;

public enum OapFileSystemType {
FILE( "file://" ) {
@Override
public String root( Configuration configuration ) {
return null;
}
},
/**
* fs.sftp.hostname = host
* fs.sftp.port = 22 // optional
*/
SFTP( "sftp://" ) {
@Override
public String root( Configuration configuration ) {
String host = configuration.get( "fs.sftp.hostname" );
String port = configuration.get( "fs.sftp.port" );

Preconditions.checkNotNull( host, "fs.sftp.hostname" );
return fsDefaultFS + host + ( port != null ? ":" + port : "" ) + "/";
}

},

/**
* fs.s3a.access.key = access key
* fs.s3a.secret.key = secret key
* fs.s3a.backet = backet name
* fs.s3a.region = region
* fs.s3a.aws.credentials.provider = provider
*/
S3A( "s3a://" ) {
@Override
public String root( Configuration configuration ) {
String region = configuration.get( "fs.s3a.region" );
String bucket = configuration.get( "fs.s3a.bucket" );

Preconditions.checkNotNull( region, "fs.s3a.region" );
Preconditions.checkNotNull( configuration.get( bucket, "fs.s3a.bucket" ) );


return fsDefaultFS + "s3." + region + ".amazonaws.com/" + bucket;
}
};

public final String fsDefaultFS;

OapFileSystemType( String fsDefaultFS ) {
this.fsDefaultFS = fsDefaultFS;
}

public abstract String root( Configuration configuration );
}
47 changes: 43 additions & 4 deletions oap-hadoop/src/main/java/oap/hadoop/OapHadoopConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,59 @@
package oap.hadoop;

import com.google.common.base.Preconditions;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import oap.io.IoStreams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Map;

@Slf4j
@ToString
public class OapHadoopConfiguration extends Configuration {
public OapHadoopConfiguration( Map<String, String> configuration ) {
private final OapFileSystemType fileSystemType;

public OapHadoopConfiguration( OapFileSystemType fileSystemType, Map<String, String> configuration ) {
super( false );
this.fileSystemType = fileSystemType;

log.info( "hadoop filesystem {} conf {}", fileSystemType, configuration );

Preconditions.checkArgument( !configuration.containsKey( "fs.defaultFS" ) );

log.info( "hadoop conf {}", configuration );
configuration.forEach( this::set );
set( "fs.defaultFS", fileSystemType.fsDefaultFS );
}

public InputStream getInputStream( String path, boolean decode ) throws UncheckedIOException {
try {
FileSystem fileSystem = getFileSystem();

org.apache.hadoop.fs.Path hadoopPath = getPath( path );

InputStream rawStream = fileSystem.open( hadoopPath );
return decode
? IoStreams.in( rawStream, IoStreams.Encoding.from( path ) )
: rawStream;
} catch( IOException e ) {
throw new UncheckedIOException( e );
}
}

public Path getPath( String path ) {
return new Path( fileSystemType.root( this ), path );
}

public OapHadoopConfiguration( Configuration other ) {
super( other );
public FileSystem getFileSystem() throws UncheckedIOException {
try {
return FileSystem.get( this );
} catch( IOException e ) {
throw new UncheckedIOException( e );
}
}
}
17 changes: 12 additions & 5 deletions oap-hadoop/src/main/resources/META-INF/oap-module.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ name = oap-hadoop
services {
oap-hadoop-configuratinon {
implementation = oap.hadoop.OapHadoopConfiguration
parameters.configuration {
// "fs.s3a.access.key" = access key
// "fs.s3a.secret.key" = secret key
// "fs.s3a.aws.credentials.provider" = provider
parameters {
// S3A, SFTP, FILE
fileSystemType = S3A
configuration {
// "fs.s3a.access.key" = access key
// "fs.s3a.secret.key" = secret key
// "fs.s3a.backet" = backet name
// "fs.s3a.region" = region
// "fs.s3a.aws.credentials.provider" = provider

// "fs.azure.account.key.youraccount.blob.core.windows.net" = access key
// "fs.sftp.hostname" = host
// "fs.sftp.port" = 22
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package oap.hadoop;

import org.testng.annotations.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public class OapHadoopConfigurationTest {
@Test
public void testGetPathS3a() {
OapHadoopConfiguration oapHadoopConfiguration = new OapHadoopConfiguration( OapFileSystemType.S3A,
Map.of( "fs.s3a.region", "us-east-1", "fs.s3a.bucket", "my-bucket" ) );

assertThat( oapHadoopConfiguration.getPath( "folder/file.txt" ).toString() )
.isEqualTo( "s3a://s3.us-east-1.amazonaws.com/my-bucket/folder/file.txt" );
}

@Test
public void testGetPathSftp() {
OapHadoopConfiguration oapHadoopConfiguration = new OapHadoopConfiguration( OapFileSystemType.SFTP,
Map.of( "fs.sftp.hostname", "hostname" ) );

assertThat( oapHadoopConfiguration.getPath( "folder/file.txt" ).toString() )
.isEqualTo( "sftp://hostname/folder/file.txt" );

oapHadoopConfiguration = new OapHadoopConfiguration( OapFileSystemType.SFTP,
Map.of( "fs.sftp.hostname", "hostname", "fs.sftp.port", "33" ) );

assertThat( oapHadoopConfiguration.getPath( "folder/file.txt" ).toString() )
.isEqualTo( "sftp://hostname:33/folder/file.txt" );
}
}

0 comments on commit 47f06c1

Please sign in to comment.