Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-v2] The hive connector support multiple filesystem #6648

Merged
merged 6 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| hive.hadoop.conf | Map | no | - |
| hive.hadoop.conf-path | string | no | - |
dailai marked this conversation as resolved.
Show resolved Hide resolved
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
Expand All @@ -57,6 +59,16 @@ The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive_site_path [string]

The path of `hive-site.xml`

### hive.hadoop.conf [map]

Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')

### hive.hadoop.conf-path [string]

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos
Expand Down Expand Up @@ -162,6 +174,171 @@ sink {
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
}
}
```

## Hive on s3

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir.

```shell
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
}
}
}
```

## Hive on oss

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.

```shell
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import java.util.Objects;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;

public abstract class BaseHdfsFileSink extends BaseFileSink {
Expand All @@ -44,7 +46,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SINK, result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
// Avoid overwriting hadoopConf for subclass initialization. If a subclass is initialized,
// it is not initialized here.
if (Objects.isNull(hadoopConf)) {
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;

import java.io.IOException;
import java.util.Objects;

public abstract class BaseHdfsFileSource extends BaseFileSource {

Expand All @@ -56,8 +57,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
String path = pluginConfig.getString(HdfsSourceConfigOptions.FILE_PATH.key());
hadoopConf =
new HadoopConf(pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
// Avoid overwriting hadoopConf for subclass initialization. If a subclass is initialized,
// it is not initialized here.
if (Objects.isNull(hadoopConf)) {
hadoopConf =
new HadoopConf(
pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfigOptions.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfigOptions.HDFS_SITE_PATH.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,50 @@ public String getSchema() {

public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
removeUnwantedOverwritingProps(extraOptions);
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
Configuration hdfsSiteConfiguration = new Configuration();
hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
configuration.addResource(hdfsSiteConfiguration);
}
}

private void removeUnwantedOverwritingProps(Map extraOptions) {
extraOptions.remove(getFsDefaultNameKey());
extraOptions.remove(getHdfsImplKey());
extraOptions.remove(getHdfsImplDisableCacheKey());
}

public void unsetUnwantedOverwritingProps(Configuration hdfsSiteConfiguration) {
hdfsSiteConfiguration.unset(getFsDefaultNameKey());
hdfsSiteConfiguration.unset(getHdfsImplKey());
hdfsSiteConfiguration.unset(getHdfsImplDisableCacheKey());
}

public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
configuration.setBoolean(getHdfsImplDisableCacheKey(), true);
configuration.set(getFsDefaultNameKey(), getHdfsNameKey());
configuration.set(getHdfsImplKey(), getFsHdfsImpl());
return configuration;
}

public String getFsDefaultNameKey() {
return CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
}

public String getHdfsImplKey() {
return String.format("fs.%s.impl", getSchema());
}

public String getHdfsImplDisableCacheKey() {
return String.format("fs.%s.impl.disable.cache", getSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
public class S3Conf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
protected static final String S3A_SCHEMA = "s3a";
protected static final String DEFAULT_SCHEMA = "s3n";
private String schema = DEFAULT_SCHEMA;

@Override
Expand All @@ -47,7 +47,7 @@ public void setSchema(String schema) {
this.schema = schema;
}

private S3Conf(String hdfsNameKey) {
protected S3Conf(String hdfsNameKey) {
super(hdfsNameKey);
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig)
return buildWithConfig(config);
}

private String switchHdfsImpl() {
protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
Expand Down
15 changes: 15 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-s3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-oss</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-cos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.Table;

import java.util.HashMap;
import java.util.Map;

public class HiveConfig {
public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand All @@ -51,6 +54,19 @@ public class HiveConfig {
.noDefaultValue()
.withDescription("The path of hive-site.xml");

public static final Option<Map<String, String>> HADOOP_CONF =
Options.key("hive.hadoop.conf")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Properties in hadoop conf");

public static final Option<String> HADOOP_CONF_PATH =
Options.key("hive.hadoop.conf-path")
.stringType()
.noDefaultValue()
.withDescription(
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files");

public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
Expand Down
Loading
Loading