Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support load data for hive with sql option #3380

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/zh/integration/offline_data_sources/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,22 @@ CREATE TABLE db1.t1 LIKE HIVE 'hive://hive_db.t1';

- 离线和在线引擎均可以导入 Hive 数据源
- Hive 导入支持软连接,可以减少硬拷贝并且保证 OpenMLDB 随时读取到 Hive 的最新数据。启用软链接方式进行数据导入:使用参数 `deep_copy=false`
- `OPTIONS` 参数仅有 `deep_copy` 和 `mode` 有效
- `OPTIONS` 参数仅有 `deep_copy` 、`mode` 和 `sql` 有效

举例:

```sql
LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1 OPTIONS(deep_copy=false);
```

加载数据还支持使用 SQL 语句筛选 Hive 数据表特定数据,注意 SQL 必须符合 SparkSQL 语法,数据表为注册后的表名,不带 `hive://` 前缀。

举例:

```sql
LOAD DATA INFILE 'hive://db1.t1' INTO TABLE db1.t1 OPTIONS(deep_copy=true, sql='SELECT * FROM db1.t1 where key=\"foo\"')
```

## 导出 OpenMLDB 数据到 Hive

对于 Hive 数据源的导出是通过 API [`SELECT INTO`](../../openmldb_sql/dql/SELECT_INTO_STATEMENT.md) 进行支持,通过使用特定的 URI 接口 `hive://[db].table` 的格式进行导出到 Hive 数仓。注意:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
logger.info("table info: {}", info)
require(info != null && info.getName.nonEmpty, s"table $db.$table info is not existed(no table name): $info")

val loadDataSql = extra.get("sql").get

// we read input file even in soft copy,
// cause we want to check if "the input file schema == openmldb table schema"
val df = HybridseUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList)
val df = HybridseUtil.autoLoad(ctx.getOpenmldbSession, inputFile, format, options, info.getColumnDescList,
loadDataSql)

Check warning on line 55 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/LoadDataPlan.scala#L55

Added line #L55 was not covered by tests

// write
logger.info("write data to storage {}, writer[mode {}], is deep? {}", storage, mode, deepCopy.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
updateOptionsMap(options, getOptionFromNode(node, "null_value"), "nullValue", getStr)
updateOptionsMap(options, getOptionFromNode(node, "quote"), "quote", getStr)
}

// load data: write mode(load data may write to offline storage or online storage, needs mode too)
// select into: write mode
val modeStr = parseOption(getOptionFromNode(node, "mode"), "error_if_exists", getStringOrDefault).toLowerCase
Expand All @@ -250,8 +251,9 @@

// only for select into, "" means N/A
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))

extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single", getStringOrDefault)

(format, options.toMap, mode, extraOptions.toMap)
}

Expand Down Expand Up @@ -314,7 +316,12 @@

def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String],
columns: util.List[Common.ColumnDesc]): DataFrame = {
autoLoad(openmldbSession, file, List.empty[String], format, options, columns)
autoLoad(openmldbSession, file, List.empty[String], format, options, columns, "")
}

def autoLoad(openmldbSession: OpenmldbSession, file: String, format: String, options: Map[String, String],
columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = {
autoLoad(openmldbSession, file, List.empty[String], format, options, columns, loadDataSql)
}

// Load df from file **and** symbol paths, they should in the same format and options.
Expand All @@ -327,24 +334,25 @@
// We use OpenmldbSession for running sparksql in hiveLoad. If in 4pd Spark distribution, SparkSession.sql
// will do openmldbSql first, and if DISABLE_OPENMLDB_FALLBACK, we can't use sparksql.
def autoLoad(openmldbSession: OpenmldbSession, file: String, symbolPaths: List[String], format: String,
options: Map[String, String], columns: util.List[Common.ColumnDesc]): DataFrame = {
options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String = "")

Check warning on line 337 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L337

Added line #L337 was not covered by tests
: DataFrame = {
val fmt = format.toLowerCase
if (fmt.equals("hive")) {
logger.info(s"load data from hive table $file & $symbolPaths")
if (file.isEmpty) {
var outputDf: DataFrame = null
symbolPaths.zipWithIndex.foreach { case (path, index) =>
if (index == 0) {
outputDf = HybridseUtil.hiveLoad(openmldbSession, path, columns);
outputDf = HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql)

Check warning on line 346 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L346

Added line #L346 was not covered by tests
} else {
outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns))
outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql))

Check warning on line 348 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L348

Added line #L348 was not covered by tests
}
}
outputDf
} else {
var outputDf = HybridseUtil.hiveLoad(openmldbSession, file, columns)
var outputDf = HybridseUtil.hiveLoad(openmldbSession, file, columns, loadDataSql)

Check warning on line 353 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L353

Added line #L353 was not covered by tests
for (path: String <- symbolPaths) {
outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns))
outputDf = outputDf.union(HybridseUtil.hiveLoad(openmldbSession, path, columns, loadDataSql))

Check warning on line 355 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L355

Added line #L355 was not covered by tests
}
outputDf
}
Expand All @@ -355,16 +363,18 @@
var outputDf: DataFrame = null
symbolPaths.zipWithIndex.foreach { case (path, index) =>
if (index == 0) {
outputDf = HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns);
outputDf = HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns, loadDataSql)

Check warning on line 366 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L366

Added line #L366 was not covered by tests
} else {
outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns))
outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns,
loadDataSql))

Check warning on line 369 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L368-L369

Added lines #L368 - L369 were not covered by tests
}
}
outputDf
} else {
var outputDf = HybridseUtil.autoFileLoad(openmldbSession, file, fmt, options, columns)
var outputDf = HybridseUtil.autoFileLoad(openmldbSession, file, fmt, options, columns, loadDataSql)
for (path: String <- symbolPaths) {
outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns))
outputDf = outputDf.union(HybridseUtil.autoFileLoad(openmldbSession, path, fmt, options, columns,
loadDataSql))

Check warning on line 377 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L376-L377

Added lines #L376 - L377 were not covered by tests
}
outputDf
}
Expand All @@ -376,15 +386,21 @@
// 2. spark read may change the df schema to all nullable
// So we should fix it.
private def autoFileLoad(openmldbSession: OpenmldbSession, file: String, format: String,
options: Map[String, String], columns: util.List[Common.ColumnDesc]): DataFrame = {
options: Map[String, String], columns: util.List[Common.ColumnDesc], loadDataSql: String): DataFrame = {
require(format.equals("csv") || format.equals("parquet"))
val reader = openmldbSession.getSparkSession.read.options(options)

val (oriSchema, readSchema, tsCols) = HybridseUtil.extractOriginAndReadSchema(columns)
var df = if (format.equals("parquet")) {
// When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.
// ref https://spark.apache.org/docs/3.2.1/sql-data-sources-parquet.html
val df = reader.format(format).load(file)
val df = if (loadDataSql != null && loadDataSql.nonEmpty) {
reader.format(format).load(file).createOrReplaceTempView("file")
openmldbSession.sparksql(loadDataSql)

Check warning on line 399 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L398-L399

Added lines #L398 - L399 were not covered by tests
} else {
reader.format(format).load(file)
}

require(checkSchemaIgnoreNullable(df.schema, oriSchema),
s"schema mismatch(ignore nullable), loaded ${df.schema}!= table $oriSchema, check $file")
// reset nullable property
Expand All @@ -404,6 +420,11 @@
}
}

if (loadDataSql != null && loadDataSql.nonEmpty) {
df.createOrReplaceTempView("file")
df = openmldbSession.sparksql(loadDataSql)

Check warning on line 425 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L424-L425

Added lines #L424 - L425 were not covered by tests
}

if (logger.isDebugEnabled()) {
logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}")
df.show(10)
Expand All @@ -430,14 +451,19 @@
path.substring(tableStartPos)
}

private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc]):
DataFrame = {
private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc],
loadDataSql: String = ""): DataFrame = {

Check warning on line 455 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L455

Added line #L455 was not covered by tests
if (logger.isDebugEnabled()) {
logger.debug("session catalog {}", openmldbSession.getSparkSession.sessionState.catalog)
openmldbSession.sparksql("show tables").show()
}
// use sparksql to read hive, no need to try openmldbsql and then fallback to sparksql
val df = openmldbSession.sparksql(s"SELECT * FROM ${hiveDest(file)}")
val df = if (loadDataSql != null && loadDataSql.nonEmpty) {
logger.debug("Try to execute custom SQL for hive: " + loadDataSql)
openmldbSession.sparksql(loadDataSql)

Check warning on line 463 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L462-L463

Added lines #L462 - L463 were not covered by tests
} else {
openmldbSession.sparksql(s"SELECT * FROM ${hiveDest(file)}")

Check warning on line 465 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L465

Added line #L465 was not covered by tests
}
if (logger.isDebugEnabled()) {
logger.debug(s"read dataframe schema: ${df.schema}, count: ${df.count()}")
df.show(10)
Expand Down
Loading