Skip to content

Commit

Permalink
fix: fix conflict with dev-1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jackxu2011 committed Jul 15, 2022
2 parents 0c3f00c + c9cb9d7 commit 586056d
Show file tree
Hide file tree
Showing 71 changed files with 750 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.linkis.storage.excel

import org.apache.commons.io.IOUtils
import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.storage.domain._
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.poi.ss.usermodel._
Expand Down Expand Up @@ -148,29 +148,29 @@ class StorageExcelWriter(val charset: String, val sheetName: String, val dateFor

private def setCellTypeValue(dataType: DataType, elem: Any, cell: SXSSFCell): Unit = {
if (null == elem) return
dataType match {
case StringType => cell.setCellValue(DataType.valueToString(elem))
case TinyIntType => cell.setCellValue(elem.toString.toInt)
case ShortIntType => cell.setCellValue(elem.toString.toInt)
case IntType => cell.setCellValue(elem.toString.toInt)
case LongType => cell.setCellValue(elem.toString.toLong)
case BigIntType => cell.setCellValue(elem.toString.toLong)
case FloatType => cell.setCellValue(elem.toString.toFloat)
case DoubleType => cell.setCellValue(elem.toString.toDouble)
case CharType => cell.setCellValue(DataType.valueToString(elem))
case VarcharType => cell.setCellValue(DataType.valueToString(elem))
case DateType => cell.setCellValue(getDate(elem))
case TimestampType => cell.setCellValue(getDate(elem))
case DecimalType => cell.setCellValue(DataType.valueToString(elem))
case BigDecimalType => cell.setCellValue(DataType.valueToString(elem))
case _ =>
val value = DataType.valueToString(elem)
cell.setCellValue(value)
if (null != dataType) {
logger.warn(s"Cannot find matched type for dataType : ${dataType.toString}, string value:${value}, value className : ${elem.getClass.getName}, will treat it as string.")
} else {
logger.warn(s"Invalid null dataType. Will treat value string : ${value}, value className : ${elem.getClass.getName} as string.")
}
Utils.tryCatch {
dataType match {
case StringType => cell.setCellValue(DataType.valueToString(elem))
case TinyIntType => cell.setCellValue(elem.toString.toInt)
case ShortIntType => cell.setCellValue(elem.toString.toInt)
case IntType => cell.setCellValue(elem.toString.toInt)
case LongType => cell.setCellValue(elem.toString.toLong)
case BigIntType => cell.setCellValue(elem.toString.toLong)
case FloatType => cell.setCellValue(elem.toString.toFloat)
case DoubleType => cell.setCellValue(elem.toString.toDouble)
case CharType => cell.setCellValue(DataType.valueToString(elem))
case VarcharType => cell.setCellValue(DataType.valueToString(elem))
case DateType => cell.setCellValue(getDate(elem))
case TimestampType => cell.setCellValue(getDate(elem))
case DecimalType => cell.setCellValue(DataType.valueToString(elem))
case BigDecimalType => cell.setCellValue(DataType.valueToString(elem))
case _ =>
val value = DataType.valueToString(elem)
cell.setCellValue(value)
}
} {
case e: Exception =>
cell.setCellValue(DataType.valueToString(elem))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class EngineConnLogOperator extends Operator with Logging {
}
val logDIrSuffix = getAs("logDirSuffix", "")
val (engineConnLogDir, engineConnInstance, ticketId) = if (StringUtils.isNotBlank(logDIrSuffix)) {
val ecLogPath = ECMConfiguration.ENGINECONN_ROOT_DIR + File.pathSeparator + logDIrSuffix
val ecLogPath = ECMConfiguration.ENGINECONN_ROOT_DIR + File.separator + logDIrSuffix
val ticketId = getAs("ticketId", "")
(ecLogPath, "", ticketId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ object ComputationExecutorConf {
val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars("hive.resultset.use.unique.column.names", false)

val JOB_ID_TO_ENV_KEY = CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID").getValue

val TASK_ASYNC_MAX_THREAD_SIZE = CommonVars("linkis.ec.task.execution.async.thread.size", 50).getValue

val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L).getValue
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.PostConstruct
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutorService


@Component
Expand All @@ -87,6 +88,8 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re

private val CONCURRENT_TASK_LOCKER = new Object

private val taskAsyncSubmitExecutor: ExecutionContextExecutorService = Utils.newCachedExecutionContext(ComputationExecutorConf.TASK_ASYNC_MAX_THREAD_SIZE, "TaskExecution-Thread-")

@PostConstruct
def init(): Unit = {
LogHelper.setLogListener(this)
Expand Down Expand Up @@ -137,7 +140,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
}
val jobId = JobUtils.getJobIdFromMap(requestTask.getProperties)
if (StringUtils.isNotBlank(jobId)) {
System.getProperties().put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
System.getProperties.put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
logger.info(s"Received job with id ${jobId}.")
}
val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
Expand All @@ -149,32 +152,29 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
task.setLabels(labels)
val entranceServerInstance = RPCUtils.getServiceInstanceFromSender(sender)
task.setCallbackServiceInstance(entranceServerInstance)
if (executorManager.containExecutor(labels)) {
submitTaskToExecutor(task, labels)
} else {
logger.info(s"task $taskId need to create executor")
val runnable = new Runnable {
override def run(): Unit = Utils.tryCatch {
submitTaskToExecutor(task, labels) match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
logger.error(message, throwable)
sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
case _ =>
}
} { t =>
logger.warn("Failed to submit task ", t)
sendToEntrance(task, ResponseTaskError(task.getTaskId, ExceptionUtils.getRootCauseMessage(t)))
sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
logger.info(s"task $taskId submit executor to execute")
val runnable = new Runnable {
override def run(): Unit = Utils.tryCatch {
// Waiting to run, preventing task messages from being sent to submit services before SubmitResponse, such as entry
Thread.sleep(ComputationExecutorConf.TASK_SUBMIT_WAIT_TIME_MS)
submitTaskToExecutor(task, labels) match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
logger.error(message, throwable)
sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
case _ =>
}
} { t =>
logger.warn("Failed to submit task ", t)
sendToEntrance(task, ResponseTaskError(task.getTaskId, ExceptionUtils.getRootCauseMessage(t)))
sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
}
val submitTaskToExecutorFuture = Utils.defaultScheduler.submit(runnable)
SubmitResponse(task.getTaskId)
}
val submitTaskToExecutorFuture = taskAsyncSubmitExecutor.submit(runnable)
SubmitResponse(task.getTaskId)
}

private def submitTaskToExecutor(task: CommonEngineConnTask, labels: Array[Label[_]]): ExecuteResponse = {
val newLabels = restExecutorLabels(labels)
val executor = executorManager.getExecutorByLabels(labels)
executor match {
case computationExecutor: ComputationExecutor =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object EngineConnPluginConf {

val JAVA_ENGINE_REQUEST_MEMORY = CommonVars[ByteType]("wds.linkis.engineconn.java.driver.memory", new ByteType("1g"))

val JAVA_ENGINE_REQUEST_CORES = CommonVars[Int]("wds.linkis.engineconn.java.driver.cores", 2)
val JAVA_ENGINE_REQUEST_CORES = CommonVars[Int]("wds.linkis.engineconn.java.driver.cores", 1)

val JAVA_ENGINE_REQUEST_INSTANCE = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.linkis.entrance.restful;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.protocol.label.InsLabelRefreshRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.springframework.web.bind.annotation.*;

Expand All @@ -42,6 +44,10 @@ public class EntranceLabelRestfulApi {

@RequestMapping(path = "/update", method = RequestMethod.POST)
public Message updateRouteLabel(HttpServletRequest req, @RequestBody JsonNode jsonNode) {
String userName = ModuleUserUtils.getOperationUser(req, "updateRouteLabel");
if (!Configuration.isAdmin(userName)) {
return Message.error("Non-administrators cannot update Route Label");
}
String routeLabel = jsonNode.get("routeLabel").textValue();
Map<String, Object> labels = new HashMap<String, Object>();
logger.info("Prepare to update entrance label {}", routeLabel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.linkis.manager.am.vo.ECResourceInfoRecordVo;
import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SecurityFilter;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.apache.commons.lang.StringUtils;
Expand All @@ -40,7 +41,6 @@
import com.github.pagehelper.PageInfo;

import java.util.*;
import java.util.stream.Collectors;

@RequestMapping(
path = "/linkisManager/ecinfo",
Expand Down Expand Up @@ -99,14 +99,13 @@ public Message queryEcrHistory(
@RequestParam(value = "pageNow", required = false, defaultValue = "1") Integer pageNow,
@RequestParam(value = "pageSize", required = false, defaultValue = "20")
Integer pageSize) {
// String username = SecurityFilter.getLoginUsername(req);
String username = "hadoop";
String username = SecurityFilter.getLoginUsername(req);
// Parameter judgment
instance = ECResourceInfoUtils.strCheckAndDef(instance, null);
creator = ECResourceInfoUtils.strCheckAndDef(creator, null);
String creatorUser = ECResourceInfoUtils.strCheckAndDef(creator, null);
engineType = ECResourceInfoUtils.strCheckAndDef(engineType, null);
if (null != creator && !ECResourceInfoUtils.checkNameValid(creator)) {
return Message.error("Invalid creator : " + creator);
if (null != creatorUser && !ECResourceInfoUtils.checkNameValid(creatorUser)) {
return Message.error("Invalid creator : " + creatorUser);
}
if (null == startDate) {
Calendar calendar = Calendar.getInstance();
Expand All @@ -117,8 +116,8 @@ public Message queryEcrHistory(
}
if (Configuration.isAdmin(username)) {
username = null;
if (StringUtils.isNotBlank(creator)) {
username = creator;
if (StringUtils.isNotBlank(creatorUser)) {
username = creatorUser;
}
}
List<ECResourceInfoRecordVo> list = new ArrayList<>();
Expand All @@ -128,26 +127,21 @@ public Message queryEcrHistory(
try {
queryTasks =
ecResourceInfoService.getECResourceInfoRecordList(
instance, endDate, startDate, username);
if (StringUtils.isNotBlank(engineType)) {
String finalEngineType = engineType;
queryTasks =
queryTasks.stream()
.filter(info -> info.getLabelValue().contains(finalEngineType))
.collect(Collectors.toList());
}
instance, endDate, startDate, username, engineType);
queryTasks.forEach(
info -> {
ECResourceInfoRecordVo ecrHistroryListVo = new ECResourceInfoRecordVo();
BeanUtils.copyProperties(info, ecrHistroryListVo);
ecrHistroryListVo.setEngineType(
info.getLabelValue().split(",")[1].split("-")[0]);
ecrHistroryListVo.setUsedResource(
ECResourceInfoUtils.getStringToMap(info.getUsedResource()));
ECResourceInfoUtils.getStringToMap(info.getUsedResource(), info));
ecrHistroryListVo.setReleasedResource(
ECResourceInfoUtils.getStringToMap(info.getReleasedResource()));
ECResourceInfoUtils.getStringToMap(
info.getReleasedResource(), info));
ecrHistroryListVo.setRequestResource(
ECResourceInfoUtils.getStringToMap(info.getRequestResource()));
ECResourceInfoUtils.getStringToMap(
info.getRequestResource(), info));
list.add(ecrHistroryListVo);
});
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface ECResourceInfoService {
void deleteECResourceInfoRecord(Integer id);

List<ECResourceInfoRecord> getECResourceInfoRecordList(
String instance, Date endDate, Date startDate, String username);
String instance, Date endDate, Date startDate, String username, String engineType);

// TODO add search method

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public void deleteECResourceInfoRecord(Integer id) {

@Override
public List<ECResourceInfoRecord> getECResourceInfoRecordList(
String instance, Date endDate, Date startDate, String username) {
String instance, Date endDate, Date startDate, String username, String engineType) {
return ecResourceRecordMapper.getECResourceInfoHistory(
username, instance, endDate, startDate);
username, instance, endDate, startDate, engineType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@
*/
package org.apache.linkis.manager.am.util;

import org.apache.linkis.common.utils.ByteTimeUtils;
import org.apache.linkis.manager.am.vo.ResourceVo;
import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
import org.apache.linkis.server.BDPJettyServerHelper;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;

import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public class ECResourceInfoUtils {

private static Logger logger = LoggerFactory.getLogger(ECResourceInfoUtils.class);

public static String NAME_REGEX = "^[a-zA-Z\\d_\\.]+$";

public static boolean checkNameValid(String creator) {
Expand All @@ -36,10 +45,32 @@ public static String strCheckAndDef(String str, String def) {
return StringUtils.isBlank(str) ? def : str;
}

public static Map<String, Object> getStringToMap(String str) {
Gson gson = new Gson();
Map<String, Object> map = new HashMap<>();
map = gson.fromJson(str, map.getClass());
return map;
public static ResourceVo getStringToMap(String str, ECResourceInfoRecord info) {
ResourceVo resourceVo = null;
Map<String, Object> map =
BDPJettyServerHelper.gson().fromJson(str, new HashMap<>().getClass());
if (MapUtils.isNotEmpty(map)) {
resourceVo = new ResourceVo();
if (info.getLabelValue().contains("spark")
|| (info.getLabelValue().contains("flink"))) {
if (null != map.get("driver")) {
Map<String, Object> divermap = MapUtils.getMap(map, "driver");
resourceVo.setInstance(((Double) divermap.get("instance")).intValue());
resourceVo.setCores(((Double) divermap.get("cpu")).intValue());
resourceVo.setMemory(
ByteTimeUtils.byteStringAsBytes(divermap.get("memory").toString()));
return resourceVo;
} else {
logger.warn("Compatible with old data ,{},{}", info.getLabelValue(), info);
return null; // Compatible with old data
}
}
resourceVo.setInstance(((Double) map.get("instance")).intValue());
resourceVo.setMemory(ByteTimeUtils.byteStringAsBytes((map.get("memory").toString())));
Double core =
null == map.get("cpu") ? (Double) map.get("cores") : (Double) map.get("cpu");
resourceVo.setCores(core.intValue());
}
return resourceVo;
}
}
Loading

0 comments on commit 586056d

Please sign in to comment.