Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New log collector module #54

Merged
merged 61 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
c5ab969
Support the method to get engine conn resource information.
Davidhua1996 Oct 17, 2022
9f6b0be
Init the structure of log module.
Davidhua1996 Oct 17, 2022
a04b58d
Add the new feature for adding task and stop task for existed jobs.
wushengyeyouya Oct 18, 2022
0576382
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 18, 2022
3afd19c
Use the "\u0001" instead of "\0x001" to act as blank placeholder.
Davidhua1996 Oct 18, 2022
9288a1c
SendBuffer and BucketConfig.
Davidhua1996 Oct 18, 2022
8a753d3
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 19, 2022
a7183e9
Add the new feature for adding task and stop task for existed jobs.
wushengyeyouya Oct 19, 2022
1efbf47
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 19, 2022
ba4189d
Complete the log collector (80%).
Davidhua1996 Oct 19, 2022
25e4646
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 19, 2022
b1cf642
fix the bug for adding task for existed jobs.
wushengyeyouya Oct 20, 2022
e704f63
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
247d93b
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
f001612
RPC(Http) module classes in collector
Davidhua1996 Oct 20, 2022
d4bd7c8
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
06a4f63
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
f5dc4ec
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
321bf48
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 20, 2022
f4e3cf5
The strategy to retry the request and compact the buffer in log colle…
Davidhua1996 Oct 20, 2022
aef0b89
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
6153eb9
Complete the log collector (95%)
Davidhua1996 Oct 20, 2022
7ac4336
Fix the problem in send buffer and consumer.
Davidhua1996 Oct 21, 2022
9588e90
Optimize the architecture of fetch Job details.
wushengyeyouya Oct 21, 2022
467d06f
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 21, 2022
f3bee02
Add config autowired interface to set the log appender params.
Davidhua1996 Oct 21, 2022
58fe439
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 21, 2022
b31a319
fix the compilation bug.
wushengyeyouya Oct 21, 2022
6ac3092
fix the long to int parse bug.
wushengyeyouya Oct 21, 2022
58d8410
Complete the log collector (100%)
Davidhua1996 Oct 22, 2022
f52cc8d
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 22, 2022
80b5003
Add Flink log collector and the transform in streamis which resolves …
Davidhua1996 Oct 22, 2022
1c115d5
Complete stream log server to store the log events.
Davidhua1996 Oct 22, 2022
0da93b0
Fix the problem in job builder.
Davidhua1996 Oct 23, 2022
7b40828
Move the internal group config into JobConfKeyConstants.
Davidhua1996 Oct 23, 2022
7ddb908
Move the internal group config into JobConfKeyConstants.
Davidhua1996 Oct 23, 2022
503ed3f
Make the constraints in transforms be configurable.
Davidhua1996 Oct 23, 2022
e0d9804
Fix serialization and configuration problem.
Davidhua1996 Oct 23, 2022
b2052e1
Optimize the update task and stop task logic.
wushengyeyouya Oct 24, 2022
d49ebc4
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 24, 2022
be6ef36
Fix the problem in fetching user info when the flink application does…
Davidhua1996 Oct 24, 2022
3359556
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 24, 2022
7f449ee
Add new strategy named ThresholdFilter
Davidhua1996 Oct 25, 2022
8b7903e
Fix the problem when closing bucket.
Davidhua1996 Oct 25, 2022
cc67dbb
Split the log collector into log4j1 and log4j2; Avoid the jackson mod…
Davidhua1996 Oct 25, 2022
f8dbed0
Add module to collect spark container log.
Davidhua1996 Oct 26, 2022
8d006e1
Add the new requestBody parameter `projectName` for adding task and s…
wushengyeyouya Oct 26, 2022
c518e10
Add the new requestBody parameter `projectName` for adding task and s…
wushengyeyouya Oct 26, 2022
f40985c
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 26, 2022
49c1f95
Complete the strategy in JsonTool.escapeStrValue().
Davidhua1996 Oct 26, 2022
d882cf4
resolve 413 response status for update task.
wushengyeyouya Oct 29, 2022
86d8ad6
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 29, 2022
d33b0f3
resolve 413 response status for update task.
wushengyeyouya Oct 29, 2022
11cf7c7
Fix the in method restoreInvariants of AbstractRpcLogSender.
Davidhua1996 Oct 29, 2022
4e330a2
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 29, 2022
e9c8792
resolve 413 response status for update task.
wushengyeyouya Oct 31, 2022
2004b57
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 31, 2022
b3d60fe
adjust the relation path in pom.xml.
Davidhua1996 Nov 7, 2022
67defce
Fix the problem in pom.xml of stream job log modules.
Davidhua1996 Nov 7, 2022
02dcdc6
resolve the LinkisJobInfo is null for adding task.
wushengyeyouya Nov 15, 2022
a4fc531
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
</modules>

<properties>
<linkis.version>1.1.1</linkis.version>
<linkis.version>1.1.3</linkis.version>
<junit.version>4.12</junit.version>
<dss.version>1.1.0</dss.version>
<streamis.version>0.2.0</streamis.version>
<scala.version>2.11.12</scala.version>
<jdk.compile.version>1.8</jdk.compile.version>
<maven.version>3.3.3</maven.version>
<gson.version>2.8.5</gson.version>
<fasterxml.jackson.version>2.11.3</fasterxml.jackson.version>
<fasterxml.jackson.version>2.13.2</fasterxml.jackson.version>
<math3.version>3.1.1</math3.version>
<httpclient.version>4.5.4</httpclient.version>
<httpmime.version>4.5.4</httpmime.version>
Expand Down
1 change: 1 addition & 0 deletions streamis-jobmanager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<module>streamis-job-manager</module>
<module>streamis-jobmanager-server</module>
<module>streamis-projectmanager-server</module>
<module>streamis-job-log</module>
</modules>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.url;

import org.apache.commons.lang.StringUtils;

import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ trait FlinkLogIterator extends Iterator[String] with Closeable {
val engineConnLogOperator: EngineConnLogOperator
def init(): Unit
def getLogPath: String
def getLogDirSuffix: String
def getLogs: util.ArrayList[String]
def getEndLine: Long
}
Expand All @@ -28,6 +29,7 @@ class SimpleFlinkJobLogIterator(override val requestPayload: LogRequestPayload,
private var logs: util.ArrayList[String] = _
private var index = 0
private var logPath: String = _
private var logDirSuffix: String = _
private var isClosed = true
private var endLine = 0

Expand Down Expand Up @@ -69,4 +71,8 @@ class SimpleFlinkJobLogIterator(override val requestPayload: LogRequestPayload,
override def getLogs: util.ArrayList[String] = logs

override def getEndLine: Long = endLine

def setLogDirSuffix(logDirSuffix: String) : Unit = this.logDirSuffix = logDirSuffix

override def getLogDirSuffix: String = logDirSuffix
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class LogRequestPayload {
private var onlyKeywords: String = _
private var lastRows = 0
private var logType: String = _
private var logHistory: Boolean = false
def getPageSize: Int = pageSize
def setPageSize(pageSize: Int): Unit = this.pageSize = pageSize

Expand All @@ -32,4 +33,8 @@ class LogRequestPayload {
def getLogType: String = logType

def setLogType(logType: String): Unit = this.logType = logType

def isLogHistory: Boolean = logHistory

def setLogHistory(logHistory: Boolean): Unit = this.logHistory = logHistory
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.core.{FlinkLo
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LogRequestPayload
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.exception.{FlinkJobLaunchErrorException, FlinkJobStateFetchException, FlinkSavePointException}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.FlinkJobLaunchManager
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator.{FlinkTriggerSavepointOperator, FlinkYarnLogOperator}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator.{FlinkClientLogOperator, FlinkTriggerSavepointOperator, FlinkYarnLogOperator}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.state.{Checkpoint, Savepoint}
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.computation.client.once.OnceJob
import org.apache.linkis.computation.client.once.simple.SimpleOnceJob
import org.apache.linkis.computation.client.once.action.ECResourceInfoAction
import org.apache.linkis.computation.client.once.result.ECResourceInfoResult
import org.apache.linkis.computation.client.once.{LinkisManagerClient, LinkisManagerClientImpl, OnceJob}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
import org.apache.linkis.computation.client.operator.impl.EngineConnLogOperator

import org.apache.linkis.httpclient.dws.DWSHttpClient
import java.util
import java.net.URI

class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager: JobStateManager)
Expand All @@ -39,9 +43,13 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
* Log operator
*/
private var logOperatorMap = Map(
"client" -> EngineConnLogOperator.OPERATOR_NAME,
"client" -> FlinkClientLogOperator.OPERATOR_NAME,
"yarn" -> FlinkYarnLogOperator.OPERATOR_NAME
)
/**
* The linkis client in onceJob
*/
private var linkisClient: DWSHttpClient = _

override def getJobInfo: FlinkJobInfo = {
getJobInfo(false)
Expand Down Expand Up @@ -99,13 +107,38 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
case Some(operator) =>
onceJob.getOperator(operator) match {
case engineConnLogOperator: EngineConnLogOperator =>
val logIterator = new SimpleFlinkJobLogIterator(requestPayload, engineConnLogOperator)
engineConnLogOperator match {
case clientLogOperator: FlinkClientLogOperator =>
var logDirSuffix = this.jobInfo.getLogDirSuffix
if (StringUtils.isBlank(logDirSuffix) && requestPayload.isLogHistory){
// If want to fetch the history log, must get the log directory suffix first
getLinkisClient match {
case client: DWSHttpClient =>
Option(Utils.tryCatch{
client.execute(ECResourceInfoAction.newBuilder().setUser(jobInfo.getUser)
.setTicketid(clientLogOperator.getTicketId).build()).asInstanceOf[ECResourceInfoResult]
}{
case e: Exception =>
warn("Fail to query the engine conn resource info from linkis", e)
null
}) match {
case Some(result) => logDirSuffix = Utils.tryAndWarn{result.getData.getOrDefault("ecResourceInfoRecord", new util.HashMap[String, Any]).asInstanceOf[util.Map[String, Any]]
.getOrDefault("logDirSuffix", "").asInstanceOf[String]}
case _ =>
}
}
}
clientLogOperator.setLogDirSuffix(logDirSuffix)
logIterator.setLogDirSuffix(logDirSuffix)
case _ =>
}
engineConnLogOperator match {
case yarnLogOperator: FlinkYarnLogOperator => yarnLogOperator.setApplicationId(jobInfo.getApplicationId)
case _ =>
}
engineConnLogOperator.setECMServiceInstance(jobInfo.getECMInstance)
engineConnLogOperator.setEngineConnType(FlinkJobLaunchManager.FLINK_ENGINE_CONN_TYPE)
val logIterator = new SimpleFlinkJobLogIterator(requestPayload, engineConnLogOperator)
logIterator.init()
jobInfo match {
case jobInfo: FlinkJobInfo => jobInfo.setLogPath(logIterator.getLogPath)
Expand Down Expand Up @@ -161,5 +194,27 @@ class FlinkJobClient(onceJob: OnceJob, var jobInfo: FlinkJobInfo, stateManager:
triggerSavepoint(savepointURI.toString, JobLauncherConfiguration.FLINK_TRIGGER_SAVEPOINT_MODE.getValue)
}

/**
* Get linkis client
* @return
*/
def getLinkisClient: DWSHttpClient = {
Utils.tryAndWarn{
if (null == this.linkisClient){
this.synchronized{
if (null == this.linkisClient){
this.linkisClient = SimpleOnceJobBuilder.getLinkisManagerClient match {
case client: LinkisManagerClient =>
val dwsClientField = classOf[LinkisManagerClientImpl].getDeclaredField("dwsHttpClient")
dwsClientField.setAccessible(true)
dwsClientField.get(client).asInstanceOf[DWSHttpClient]
case _ => null
}

}
}
}
this.linkisClient
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,41 +34,53 @@ class FlinkJobInfo extends YarnJobInfo {
private var applicationUrl: String = _
private var status: String = _
private var logPath: String = _
private var logDirSuffix: String = _
private var resources: java.util.Map[String, Object] = _
private var completedMsg: String = _
private var jobStates: Array[JobStateInfo] = _

override def getApplicationId: String = applicationId

def setApplicationId(applicationId: String): Unit = this.applicationId = applicationId

override def getApplicationUrl: String = applicationUrl

def setApplicationUrl(applicationUrl: String): Unit = this.applicationUrl = applicationUrl

override def getId: String = id
def setId(id: String): Unit = this.id = id

def setId(id: String): Unit = this.id = id

override def getECMInstance: ServiceInstance = ecmInstance

def setECMInstance(ecmInstance: ServiceInstance): Unit = this.ecmInstance = ecmInstance

override def getUser: String = user

def setUser(user: String): Unit = this.user = user

override def getStatus: String = status

override def setStatus(status: String): Unit = this.status = status

override def getLogPath: String = logPath

def setLogPath(logPath: String): Unit = this.logPath = logPath

override def getResources: util.Map[String, Object] = resources

def setResources(resources: java.util.Map[String, Object]): Unit = this.resources = resources

def getSavepoint: String = savepoint

def setSavepoint(savepoint: String): Unit = this.savepoint = savepoint

def getCheckpoint: String = checkpoint

def setCheckpoint(checkpoint: String): Unit = this.checkpoint = checkpoint

override def getCompletedMsg: String = completedMsg

def setCompletedMsg(completedMsg: String): Unit = this.completedMsg = completedMsg

override def toString: String = s"FlinkJobInfo(id: $id, status: $status, applicationId: $applicationId, applicationUrl: $applicationUrl, logPath: $logPath)"
Expand All @@ -85,6 +97,7 @@ class FlinkJobInfo extends YarnJobInfo {
def setJobStates(jobStates: Array[JobStateInfo]): Unit = {
this.jobStates = jobStates
}

/**
* Job name
*
Expand All @@ -95,11 +108,16 @@ class FlinkJobInfo extends YarnJobInfo {
def setName(name: String): Unit = {
this.name = name
}
}

object FlinkJobInfo{
def main(args: Array[String]): Unit = {
val jobInfo = "{\"jobStates:\":{\"location\":\"xx\"}"
DWSHttpClient.jacksonJson.readValue(jobInfo, classOf[FlinkJobInfo])
/**
* Job log directory suffix
*
* @return
*/
override def getLogDirSuffix: String = this.logDirSuffix

override def setLogDirSuffix(logDirSuffix: String): Unit = {
this.logDirSuffix = logDirSuffix
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,11 @@ trait LinkisJobInfo extends JobInfo {
*/
def getECMInstance: ServiceInstance

/**
* Job log directory suffix
* @return
*/
def getLogDirSuffix: String

def setLogDirSuffix(logDirSuffix: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@

package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager

import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.{JobState, JobStateInfo}
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.{JobClient, LaunchJob}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.{FlinkJobClient, FlinkJobInfo, LinkisJobInfo}
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.SimpleFlinkJobLaunchManager.INSTANCE_NAME
import org.apache.commons.lang.StringEscapeUtils
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.{FlinkJobClient, FlinkJobInfo, LinkisJobInfo}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.linkis.common.utils.{RetryHandler, Utils}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SubmittableSimpleOnceJob}
import org.apache.linkis.computation.client.once.{OnceJob, SubmittableOnceJob}
import org.apache.linkis.computation.client.operator.impl.{EngineConnApplicationInfoOperator, EngineConnLogOperator}
import org.apache.linkis.computation.client.operator.impl.EngineConnApplicationInfoOperator
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.ujes.client.exception.UJESJobException

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.operator

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.computation.client.once.action.EngineConnOperateAction
import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnLogs}

/**
* Append "logDirSuffix" parameter
*/
class FlinkClientLogOperator extends EngineConnLogOperator{

private var logDirSuffix: String = _

def setLogDirSuffix(logDirSuffix: String): Unit = {
this.logDirSuffix = logDirSuffix
}

protected override def addParameters(builder: EngineConnOperateAction.Builder): Unit = {
builder.operatorName(EngineConnLogOperator.OPERATOR_NAME)
if (StringUtils.isNotBlank(this.logDirSuffix)) {
builder.addParameter("logDirSuffix", logDirSuffix)
}
super.addParameters(builder)
}


override def getTicketId: String = super.getTicketId

override def getName: String = FlinkClientLogOperator.OPERATOR_NAME
}

object FlinkClientLogOperator {
val OPERATOR_NAME = "engineConnLog_flink"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import org.apache.linkis.computation.client.once.action.EngineConnOperateAction
import org.apache.linkis.computation.client.operator.impl.EngineConnLogOperator

/**
* Extend the engine conn log operator
* Extend the flink client log operator
*/
class FlinkYarnLogOperator extends EngineConnLogOperator{
class FlinkYarnLogOperator extends FlinkClientLogOperator {

private var applicationId: String = _

Expand All @@ -30,8 +30,9 @@ class FlinkYarnLogOperator extends EngineConnLogOperator{
}

protected override def addParameters(builder: EngineConnOperateAction.Builder): Unit = {
builder.addParameter("yarnApplicationId", this.applicationId)
super.addParameters(builder)
builder.operatorName(getName)
builder.addParameter("yarnApplicationId", this.applicationId)
}

override def getName: String = FlinkYarnLogOperator.OPERATOR_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import org.apache.linkis.common.conf.CommonVars
*/
object JobConfKeyConstants {

/**
* Config group for streamis internal configuration
*/
val GROUP_INTERNAL: CommonVars[String] = CommonVars("wds.streamis.job.internal.config.group", "wds.streamis.internal.params")
/**
* Group: Flink extra
*/
Expand Down
Loading