Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Jun 22, 2022
1 parent 2b6a149 commit 6cc99ec
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

<properties>
<elasticsearch.version>7.6.2</elasticsearch.version>
<fasterxml.jackson.version>2.11.3</fasterxml.jackson.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -88,22 +87,18 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.elasti
wds.linkis.engineconn.support.parallelism=true

# ElasticSearch
wds.linkis.es.cluster=127.0.0.1:9200
linkis.es.cluster=127.0.0.1:9200

Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ import org.apache.linkis.common.conf.{ByteType, CommonVars}
object ElasticSearchConfiguration {

// es client
val ES_CLUSTER = CommonVars("wds.linkis.es.cluster", "127.0.0.1:9200")
val ES_DATASOURCE_NAME = CommonVars("wds.linkis.datasource", "default_datasource")
val ES_AUTH_CACHE = CommonVars("wds.linkis.es.auth.cache", false)
val ES_USERNAME = CommonVars("wds.linkis.es.username", "")
val ES_PASSWORD = CommonVars("wds.linkis.es.password", "")
val ES_SNIFFER_ENABLE = CommonVars("wds.linkis.es.sniffer.enable", false)
val ES_HTTP_METHOD = CommonVars("wds.linkis.es.http.method", "GET")
val ES_HTTP_ENDPOINT = CommonVars("wds.linkis.es.http.endpoint", "/_search")
val ES_HTTP_SQL_ENDPOINT = CommonVars("wds.linkis.es.sql.endpoint", "/_sql")
val ES_SQL_FORMAT = CommonVars("wds.linkis.es.sql.format", "{\"query\": \"%s\"}")
val ES_HTTP_HEADER_PREFIX = "wds.linkis.es.headers."
val ES_CLUSTER = CommonVars("linkis.es.cluster", "127.0.0.1:9200")
val ES_DATASOURCE_NAME = CommonVars("linkis.datasource", "default_datasource")
val ES_AUTH_CACHE = CommonVars("linkis.es.auth.cache", false)
val ES_USERNAME = CommonVars("linkis.es.username", "")
val ES_PASSWORD = CommonVars("linkis.es.password", "")
val ES_SNIFFER_ENABLE = CommonVars("linkis.es.sniffer.enable", false)
val ES_HTTP_METHOD = CommonVars("linkis.es.http.method", "GET")
val ES_HTTP_ENDPOINT = CommonVars("linkis.es.http.endpoint", "/_search")
val ES_HTTP_SQL_ENDPOINT = CommonVars("linkis.es.sql.endpoint", "/_sql")
val ES_SQL_FORMAT = CommonVars("linkis.es.sql.format", "{\"query\": \"%s\"}")
val ES_HTTP_HEADER_PREFIX = "linkis.es.headers."

// entrance resource
val ENTRANCE_MAX_JOB_INSTANCE = CommonVars("wds.linkis.es.max.job.instance", 100)
val ENTRANCE_PROTECTED_JOB_INSTANCE = CommonVars("wds.linkis.es.protected.job.instance", 20)
val ENGINE_DEFAULT_LIMIT = CommonVars("wds.linkis.es.default.limit", 5000)
val ENTRANCE_MAX_JOB_INSTANCE = CommonVars("linkis.es.max.job.instance", 100)
val ENTRANCE_PROTECTED_JOB_INSTANCE = CommonVars("linkis.es.protected.job.instance", 20)
val ENGINE_DEFAULT_LIMIT = CommonVars("linkis.es.default.limit", 5000)

// resultSet
val ENGINE_RESULT_SET_MAX_CACHE = CommonVars("wds.linkis.resultSet.cache.max", new ByteType("512k"))
val ENGINE_RESULT_SET_MAX_CACHE = CommonVars("linkis.resultSet.cache.max", new ByteType("512k"))

val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("wds.linkis.engineconn.concurrent.limit", 100)
val ENGINE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.concurrent.limit", 100)

val DEFAULT_VERSION = CommonVars[String]("wds.linkis.engineconn.io.version", "7.6.2")
val DEFAULT_VERSION = CommonVars[String]("linkis.engineconn.io.version", "7.6.2")

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ class ResponseHandlerImpl extends ResponseHandler {
cborMapper.readTree(contentBytes)
case "application/smile" =>
smileMapper.readTree(contentBytes)
// case "text/csv" =>
// csvMapper.readTree(contentBytes)
// case "text/tab-separated-values" =>
// csvMapper.readTree(contentBytes)
// val schema = csvMapper.schemaFor(classOf[Array[Byte]]).withColumnSeparator('\t')
// val reader = csvMapper.readerFor(classOf[Array[Byte]]).`with`(schema)
// reader.readValue(contentBytes)
case _ =>
jsonMapper.readTree(contentBytes)
}
Expand Down Expand Up @@ -154,25 +147,4 @@ class ResponseHandlerImpl extends ResponseHandler {
}
}
// scalastyle:on

// def writeText(content: String, storePath: String, alias: String, proxyUser: String): String = {
// val resultSet = ResultSetFactory.getInstance.getResultSetByType(ResultSetFactory.TEXT_TYPE)
// val resultSetPath = resultSet.getResultSetPath(new FsPath(storePath), alias)
// val writer = ResultSetWriter.getResultSetWriter(resultSet, ElasticSearchConfiguration.ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath, proxyUser)
// writer.addMetaData(null)
// content.split("\\n").foreach(item => writer.addRecord(new LineRecord(item)))
// IOUtils.closeQuietly(writer)
// writer.toString()
// }
//
// def writeTable(metaData: TableMetaData, records: ArrayBuffer[TableRecord], storePath: String, alias: String, proxyUser: String): String = {
// val resultSet = ResultSetFactory.getInstance.getResultSetByType(ResultSetFactory.TABLE_TYPE)
// val resultSetPath = resultSet.getResultSetPath(new FsPath(storePath), alias)
// val writer = ResultSetWriter.getResultSetWriter(resultSet, ElasticSearchConfiguration.ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath, proxyUser)
// writer.addMetaData(metaData)
// records.foreach(writer.addRecord)
// IOUtils.closeQuietly(writer)
// writer.toString()
// }

}

0 comments on commit 6cc99ec

Please sign in to comment.