Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LINKIS_CONF_DIR to ec classpath and fixed oom bug for LogWirter #1205

Merged
merged 7 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,25 @@ public long getId() {
@Override
public ExecuteResponse execute(ExecuteRequest executeRequest) {
if (event instanceof MessageJob) {
TransactionManager txManager = ((MessageJob) event).getContext().getTxManager();
Object o = txManager.begin();
// TransactionManager txManager = ((MessageJob) event).getContext().getTxManager();
// Object o = txManager.begin();
try {
run((MessageJob) event);
txManager.commit(o);
//txManager.commit(o);
return new SuccessExecuteResponse();
} catch (InterruptedException ie) {
//handle InterruptedException
logger().error("message job execution interrupted", ie);
txManager.rollback(o);
// txManager.rollback(o);
return new ErrorExecuteResponse("message job execution interrupted", ie);
} catch (MessageWarnException mwe) {
//handle method call failed
logger().error("method call normal error return");
txManager.rollback(o);
// txManager.rollback(o);
return new ErrorExecuteResponse("method call failed", mwe);
} catch (Throwable t) {
logger().debug("unexpected error occur", t);
txManager.rollback(o);
//txManager.rollback(o);
return new ErrorExecuteResponse("unexpected error", t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress
// todo check
updatedProgress = -1 * progress;
}
if(Double.isNaN(updatedProgress)){
return ;
}
job.setProgress(updatedProgress);
EntranceJob entranceJob = (EntranceJob) job;
entranceJob.getJobRequest().setProgress(String.valueOf(updatedProgress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,18 @@ object CSEntranceHelper extends Logging {
variableMap.put(linkisVariable.getKey, linkisVariable.getValue)
}
}
if(variableMap.nonEmpty){
TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], variableMap)
if(variableMap.nonEmpty) {
// 1.cs priority is low, the same ones are not added
val varMap = TaskUtils.getVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]])
variableMap.foreach { keyAndValue =>
if (! varMap.containsKey(keyAndValue._1)) {
varMap.put(keyAndValue._1, keyAndValue._2)
}
}
TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], varMap)
}

info(s"parse variable end nodeName:$nodeNameStr")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,53 @@

package org.apache.linkis.entrance.log

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{CacheNotReadyException, EntranceErrorCode}
import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.governance.common.entity.task.RequestPersistTask
import org.apache.linkis.scheduler.queue.Job
/**
* LogManager implementation, using a singleton class
* LogManager 的实现, 采用单例类进行
*/
class CacheLogManager extends LogManager {
class CacheLogManager extends LogManager with Logging {

override def getLogReader(execId: String): LogReader = {
var retLogReader:LogReader = null
this.entranceContext.getOrCreateScheduler().get(execId).foreach {
case entranceExecutionJob: EntranceExecutionJob =>
retLogReader = entranceExecutionJob.getLogReader.getOrElse({
this.synchronized {
val logWriter: CacheLogWriter =
entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)).asInstanceOf[CacheLogWriter]
val sharedCache: Cache = logWriter.getCache.
getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc))
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser)
}
})
entranceExecutionJob.setLogReader(retLogReader)
case _ => null
}
var retLogReader: LogReader = null
this.entranceContext.getOrCreateScheduler().get(execId).foreach {
case entranceExecutionJob: EntranceExecutionJob =>
retLogReader = entranceExecutionJob.getLogReader.getOrElse({
this.synchronized {
val sharedCache: Cache =
entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)) match {
case cacheLogWriter: CacheLogWriter =>
cacheLogWriter.getCache.getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc))
case _ =>
Cache(1)
}
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser)
}
})
entranceExecutionJob.setLogReader(retLogReader)
case _ => null
}
retLogReader
}


override def createLogWriter(job: Job): LogWriter = {

if (null != job && job.isCompleted) {
return null
}
job match {
case entranceExecutionJob: EntranceExecutionJob => {
val cache: Cache = Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue)
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
val cacheLogWriter: CacheLogWriter =
new CacheLogWriter(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser)
entranceExecutionJob.setLogWriter(cacheLogWriter)
logger.info(s"job ${entranceExecutionJob.getJobRequest.getId} create cacheLogWriter")
val webSocketCacheLogReader: WebSocketCacheLogReader =
new WebSocketCacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser)
entranceExecutionJob.setWebSocketLogReader(webSocketCacheLogReader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ abstract class LogManager extends LogListener with Logging with EntranceLogListe
job match{
case entranceExecutionJob: EntranceExecutionJob =>
if (entranceExecutionJob.getLogWriter.isEmpty) entranceExecutionJob synchronized {
if (entranceExecutionJob.getLogWriter.isEmpty) createLogWriter(entranceExecutionJob)
if (entranceExecutionJob.getLogWriter.isEmpty) {
val logWriter = createLogWriter(entranceExecutionJob)
if (null == logWriter) {
return
}
}
}
entranceExecutionJob.getLogWriter.foreach(logWriter => logWriter.write(log))
entranceExecutionJob.getWebSocketLogWriter.foreach(writer => writer.write(log))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,65 +31,64 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream

abstract class LogWriter(charset: String) extends Closeable with Flushable with Logging {

private var firstWrite = true
private var firstWrite = true

protected val outputStream: OutputStream
protected var outputStream: OutputStream

def write(msg: String): Unit = synchronized {
val log = if (!firstWrite) "\n" + msg else {
firstWrite = false
msg
def write(msg: String): Unit = synchronized {
val log = if (!firstWrite) "\n" + msg else {
logger.info(s"$toString write first one line log")
firstWrite = false
msg
}
Utils.tryAndWarnMsg{
outputStream.write(log.getBytes(charset))
outputStream.flush()
}(s"$toString error when write query log to outputStream.")
}
Utils.tryQuietly({
outputStream.write(log.getBytes(charset))
outputStream.flush()
}, t => {
warn("error when write query log to outputStream.", t)
info(msg)
})
}



def flush(): Unit = Utils.tryAndWarnMsg[Unit] {
outputStream match {
case hdfs: HdfsDataOutputStream =>
// todo check
hdfs.hsync()
case _ =>
outputStream.flush()
}
}("Error encounters when flush log, ")

def close(): Unit = {
info(s" $toString logWriter close")
flush()
if (outputStream != null) {
Utils.tryCatch{
outputStream.close()
}{
case t:Throwable => //ignore
def flush(): Unit = Utils.tryAndWarnMsg[Unit] {
outputStream match {
case hdfs: HdfsDataOutputStream =>
// todo check
hdfs.hflush()
case _ =>
outputStream.flush()
}
}(s"$toString Error encounters when flush log, ")

def close(): Unit = {
logger.info(s" $toString logWriter close")
flush()
if (outputStream != null) {
Utils.tryQuietly(outputStream.close())
outputStream = null
}
}
}
}

abstract class AbstractLogWriter(logPath: String,
user: String,
charset: String) extends LogWriter(charset) {
if(StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.")
protected val fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())

protected val outputStream: OutputStream = {
FileSystemUtils.createNewFile(new FsPath(logPath), user, true)
fileSystem.write(new FsPath(logPath), true)
}

override def close(): Unit = {
super.close()
if (fileSystem != null) Utils.tryQuietly(fileSystem.close(), t => {
warn("Error encounters when closing fileSystem", t)
})
}
}
abstract class AbstractLogWriter(logPath: String,
user: String,
charset: String) extends LogWriter(charset) {
if (StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.")
protected var fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())

protected var outputStream: OutputStream = {
FileSystemUtils.createNewFile(new FsPath(logPath), user, true)
fileSystem.write(new FsPath(logPath), true)
}


override def close(): Unit = {
super.close()
if (fileSystem != null) Utils.tryAndWarnMsg{
fileSystem.close()
fileSystem = null
}(s"$toString Error encounters when closing fileSystem")
}

override def toString: String = logPath
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object SparkConfiguration extends Logging {

val SPARK_DRIVER_CLASSPATH = CommonVars[String]("spark.driver.extraClassPath", "")

val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.configuration=linkis-engine.properties " + "\"")
val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.server.conf=linkis-engine.properties " + "\"")

val SPARK_DEFAULT_EXTERNAL_JARS_PATH = CommonVars[String]("spark.external.default.jars", "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object EnvConfiguration {
val ENGINE_CONN_CLASSPATH_FILES = CommonVars("wds.linkis.engineConn.files", "", "engineConn额外的配置文件")

val ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]("wds.linkis.engineConn.javaOpts.default", s"-XX:+UseG1GC -XX:MaxPermSize=250m -XX:PermSize=128m " +
s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.configuration=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}")
s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.server.conf=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}")

val ENGINE_CONN_MEMORY = CommonVars("wds.linkis.engineConn.memory", new ByteType("2g"), "Specify the memory size of the java client(指定java进程的内存大小)")

Expand All @@ -49,5 +49,7 @@ object EnvConfiguration {

val LOG4J2_XML_FILE = CommonVars[String]("wds.linkis.engineconn.log4j2.xml.file", "log4j2-engineconn.xml")

val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.LINKIS_HOME.getValue + "/lib/linkis-commons/public-module")
val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.getLinkisHome + "/lib/linkis-commons/public-module")

val LINKIS_CONF_DIR = CommonVars("LINKIS_CONF_DIR", Configuration.getLinkisHome() + "/conf")
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc
// addPathToClassPath(environment, variable(PWD))
// first, add engineconn conf dirs.
addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_CONF_DIR_NAME))
// second, add engineconn libs.
// then, add LINKIS_CONF_DIR conf dirs.
addPathToClassPath(environment, Seq(EnvConfiguration.LINKIS_CONF_DIR.getValue))
// then, add engineconn libs.
addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_LIB_DIR_NAME + "/*"))
// then, add public modules.
if (!enablePublicModule) {
Expand Down Expand Up @@ -116,6 +118,7 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc
environment
}


override protected def getNecessaryEnvironment(implicit engineConnBuildRequest: EngineConnBuildRequest): Array[String] =
if(!ifAddHiveConfigPath) Array.empty else Array(HADOOP_CONF_DIR.toString, HIVE_CONF_DIR.toString)

Expand Down