diff --git a/devops-boot-project/devops-boot-core/devops-common/src/main/kotlin/com/tencent/devops/common/time/TimeExtensions.kt b/devops-boot-project/devops-boot-core/devops-common/src/main/kotlin/com/tencent/devops/common/time/TimeExtensions.kt new file mode 100644 index 0000000..dc5652e --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-common/src/main/kotlin/com/tencent/devops/common/time/TimeExtensions.kt @@ -0,0 +1,8 @@ +package com.tencent.devops.common.time + +import java.time.LocalDateTime +import java.time.ZoneId + +fun LocalDateTime.toEpochMilli(zoneId: ZoneId = ZoneId.systemDefault()): Long { + return this.atZone(zoneId).toInstant().toEpochMilli() +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/BlockStrategyEnum.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/BlockStrategyEnum.kt index 987e49c..701a0ec 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/BlockStrategyEnum.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/BlockStrategyEnum.kt @@ -5,10 +5,13 @@ package com.tencent.devops.schedule.enums */ enum class BlockStrategyEnum( private val code: Int, - private val label: String -): DictItem { + private val label: String, +) : DictItem { - DEFAULT(1, "默认策略"); + SERIAL_EXECUTION(1, "串行"), + DISCARD_LATER(2, "丢弃最后"), + COVER_EARLY(3, "覆盖之前"), + ; override fun code() = code override fun description() = label diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/JobModeEnum.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/JobModeEnum.kt index c084518..8462bcb 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/JobModeEnum.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/JobModeEnum.kt @@ -5,23 +5,32 @@ package com.tencent.devops.schedule.enums */ enum class JobModeEnum( private val code: Int, - private val label: String -): DictItem { + private val label: String, + val isContainer: Boolean, + val isScript: Boolean, +) : DictItem { /** * Java Bean */ - BEAN(1, "Java Bean"), + BEAN(1, "Java Bean", false, false), /** * Shell */ - SHELL(2, "Shell"); + SHELL(2, "Shell", false, true), + /** + * K8s shell + * */ + K8S_SHELL(3, "K8s shell", true, true), + ; override fun code() = code override fun description() = label companion object { + const val DEFAULT_IMAGE = "bash" + /** * 根据[code]查找对应的枚举类型 */ diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/ScheduleTypeEnum.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/ScheduleTypeEnum.kt index 75290d9..7c3e8dd 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/ScheduleTypeEnum.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/enums/ScheduleTypeEnum.kt @@ -5,10 +5,10 @@ package com.tencent.devops.schedule.enums */ enum class ScheduleTypeEnum( private val code: Int, - private val label: String -): DictItem { + private val label: String, +) : DictItem { /** - * 立即执行 + * 立即执行,调度器不会进行调度,需要主动触发才会执行 */ IMMEDIATELY(1, "立即执行"), @@ -25,7 +25,8 @@ enum class ScheduleTypeEnum( /** * cron表达式 */ - CRON(4, "Cron表达式"); + CRON(4, "Cron表达式"), + ; override fun code() = code override fun description() = label diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobCreateRequest.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobCreateRequest.kt index 7c7fd58..c89115c 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobCreateRequest.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobCreateRequest.kt @@ -64,5 +64,13 @@ data class JobCreateRequest( /** * 最大重试次数 */ - val maxRetryCount: Int + val maxRetryCount: Int, + /** + * 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码 + * */ + val source: String? = null, + /** + * 镜像地址,容器任务需要 + * */ + val image: String? = null, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobInfo.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobInfo.kt index 95b978a..048db81 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobInfo.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobInfo.kt @@ -98,4 +98,12 @@ data class JobInfo( * 下次执行时间 */ var nextTriggerTime: Long = 0, + /** + * 资源内容,可以是脚本,也可以是yaml + * */ + var source: String? = null, + /** + * 镜像地址 + * */ + var image: String? = null, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt index d82d440..e316619 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt @@ -55,4 +55,12 @@ data class JobUpdateRequest( * 最大重试次数 */ val maxRetryCount: Int? = null, + /** + * 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码 + * */ + val source: String? = null, + /** + * 镜像地址,容器任务需要 + * */ + val image: String? = null, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/trigger/TriggerParam.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/trigger/TriggerParam.kt index 8dc890c..cf1fbf8 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/trigger/TriggerParam.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/trigger/TriggerParam.kt @@ -6,6 +6,9 @@ import java.time.LocalDateTime data class TriggerParam( var jobId: String, + var jobMode: Int, + var source: String? = null, + var image: String? = null, var jobHandler: String, var jobParam: String, var blockStrategy: Int, @@ -15,5 +18,6 @@ data class TriggerParam( var triggerTime: LocalDateTime, var broadcastIndex: Int = 0, var broadcastTotal: Int = 0, - var workerAddress: String? = null + var workerAddress: String? = null, + var updateTime: LocalDateTime, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/Conventions.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/Conventions.kt index ea516ac..c93f938 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/Conventions.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/Conventions.kt @@ -25,7 +25,9 @@ fun TJobInfo.convert(): JobInfo { maxRetryCount = maxRetryCount, triggerStatus = triggerStatus, lastTriggerTime = lastTriggerTime, - nextTriggerTime = nextTriggerTime + nextTriggerTime = nextTriggerTime, + source = source, + image = image, ) } @@ -49,7 +51,9 @@ fun JobInfo.convert(): TJobInfo { maxRetryCount = maxRetryCount, triggerStatus = triggerStatus, lastTriggerTime = lastTriggerTime, - nextTriggerTime = nextTriggerTime + nextTriggerTime = nextTriggerTime, + source = source, + image = image, ) } @@ -59,7 +63,7 @@ fun TWorkerGroup.convert(): WorkerGroup { name = name, discoveryType = discoveryType, updateTime = updateTime, - registryList = addressList.split(",") + registryList = addressList.split(","), ) } @@ -69,7 +73,7 @@ fun WorkerGroup.convert(): TWorkerGroup { name = name, discoveryType = discoveryType, updateTime = updateTime, - addressList = registryList.joinToString(",") + addressList = registryList.joinToString(","), ) } @@ -90,7 +94,7 @@ fun TJobLog.convert(): JobLog { executionTime = executionTime, executionCode = executionCode, executionMsg = executionMsg, - alarmStatus = alarmStatus + alarmStatus = alarmStatus, ) } @@ -111,7 +115,7 @@ fun JobLog.convert(): TJobLog { executionTime = executionTime, executionCode = executionCode, executionMsg = executionMsg, - alarmStatus = alarmStatus + alarmStatus = alarmStatus, ) } @@ -120,7 +124,7 @@ fun TWorker.convert(): WorkerInfo { id = id.orEmpty(), address = address, group = group, - updateTime = updateTime + updateTime = updateTime, ) } @@ -129,6 +133,6 @@ fun WorkerInfo.convert(): TWorker { id = id.orEmpty(), address = address, group = group, - updateTime = updateTime + updateTime = updateTime, ) } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt index 10118aa..82174f2 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt @@ -100,4 +100,12 @@ data class TJobInfo( * 下次执行时间 */ var nextTriggerTime: Long, + /** + * 资源内容,可以是脚本,也可以是yaml + * */ + var source: String? = null, + /** + * 镜像地址 + * */ + var image: String? = null, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt index 95252a5..da21182 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt @@ -1,5 +1,6 @@ package com.tencent.devops.schedule.mongo.provider +import com.mongodb.MongoServerException import com.mongodb.client.result.UpdateResult import com.tencent.devops.schedule.mongo.model.TLockInfo import com.tencent.devops.schedule.provider.LockProvider @@ -17,7 +18,7 @@ import java.util.UUID * 基于mongodb实现的lock */ class MongoLockProvider( - private val mongoTemplate: MongoTemplate + private val mongoTemplate: MongoTemplate, ) : LockProvider { override fun acquire(key: String, expiration: Long): String? { val query = Query.query(where(TLockInfo::id).isEqualTo(key)) @@ -29,38 +30,49 @@ class MongoLockProvider( val options = FindAndModifyOptions().upsert(true) .returnNew(true) - val lock = mongoTemplate.findAndModify( - query, update, options, - TLockInfo::class.java - )!! - val locked = lock.token == token + try { + val lock = mongoTemplate.findAndModify( + query, + update, + options, + TLockInfo::class.java, + )!! + val locked = lock.token == token - // 如果已过期 - if (!locked && lock.expireAt < System.currentTimeMillis()) { - val deleted = mongoTemplate.remove( - Query.query( - where(TLockInfo::id).isEqualTo(key) - .and(TLockInfo::token).isEqualTo(lock.token) - .and(TLockInfo::expireAt).`is`(lock.expireAt) - ), - TLockInfo::class.java - ) - if (deleted.deletedCount >= 1) { - // 成功释放锁, 再次尝试获取锁 - return acquire(key, expiration) + // 如果已过期 + if (!locked && lock.expireAt < System.currentTimeMillis()) { + val deleted = mongoTemplate.remove( + Query.query( + where(TLockInfo::id).isEqualTo(key) + .and(TLockInfo::token).isEqualTo(lock.token) + .and(TLockInfo::expireAt).`is`(lock.expireAt), + ), + TLockInfo::class.java, + ) + if (deleted.deletedCount >= 1) { + // 成功释放锁, 再次尝试获取锁 + return acquire(key, expiration) + } + } + return if (locked) { + logger.trace("Acquired lock for key {} with token {}", key, token) + return token + } else { + null + } + } catch (e: MongoServerException) { + if (e.code == 11000) { // duplicate key + return null + } else { + throw e } } - - return if (locked) { - logger.trace("Acquired lock for key {} with token {}", key, token) - return token - } else null } override fun release(key: String, token: String): Boolean { val query = Query.query( where(TLockInfo::id).isEqualTo(key) - .and(TLockInfo::token).isEqualTo(token) + .and(TLockInfo::token).isEqualTo(token), ) val deleted = mongoTemplate.remove(query, TLockInfo::class.java) val released = deleted.deletedCount == 1L @@ -78,7 +90,7 @@ class MongoLockProvider( override fun refresh(key: String, token: String, expiration: Long): Boolean { val query = Query.query( where(TLockInfo::id).isEqualTo(key) - .and(TLockInfo::token).isEqualTo(token) + .and(TLockInfo::token).isEqualTo(token), ) val update = Update.update(TLockInfo::expireAt.name, System.currentTimeMillis() + expiration) val updated: UpdateResult = mongoTemplate.updateFirst(query, update, TLockInfo::class.java) @@ -90,16 +102,16 @@ class MongoLockProvider( } else { logger.warn( "Refresh query did not affect any records for key {} with token {}. " + - "This is possible when refresh interval fires for the final time " + - "after the lock has been released", - key, token + "This is possible when refresh interval fires for the final time " + + "after the lock has been released", + key, + token, ) } return refreshed } - companion object { private val logger = LoggerFactory.getLogger(MongoLockProvider::class.java) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt index 39e5fba..4ff5837 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt @@ -4,10 +4,13 @@ import com.tencent.devops.schedule.constants.DATE_TIME_FORMATTER import com.tencent.devops.schedule.constants.MAX_LOG_MESSAGE_SIZE import com.tencent.devops.schedule.constants.PRE_LOAD_TIME import com.tencent.devops.schedule.enums.BlockStrategyEnum +import com.tencent.devops.schedule.enums.JobModeEnum +import com.tencent.devops.schedule.enums.JobModeEnum.Companion.DEFAULT_IMAGE import com.tencent.devops.schedule.enums.MisfireStrategyEnum import com.tencent.devops.schedule.enums.RouteStrategyEnum import com.tencent.devops.schedule.enums.ScheduleTypeEnum import com.tencent.devops.schedule.enums.TriggerStatusEnum +import com.tencent.devops.schedule.enums.TriggerTypeEnum import com.tencent.devops.schedule.pojo.job.JobCreateRequest import com.tencent.devops.schedule.pojo.job.JobInfo import com.tencent.devops.schedule.pojo.job.JobQueryParam @@ -17,10 +20,13 @@ import com.tencent.devops.schedule.pojo.log.LogQueryParam import com.tencent.devops.schedule.pojo.page.Page import com.tencent.devops.schedule.provider.JobProvider import com.tencent.devops.schedule.provider.WorkerProvider +import com.tencent.devops.schedule.scheduler.JobScheduler import com.tencent.devops.schedule.utils.computeNextTriggerTime import com.tencent.devops.schedule.utils.validate import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.json.BasicJsonParser +import org.springframework.context.annotation.Lazy import org.springframework.scheduling.support.CronExpression import java.time.LocalDateTime import java.time.temporal.ChronoUnit @@ -30,11 +36,15 @@ import java.time.temporal.ChronoUnit */ open class DefaultJobManager( private val jobProvider: JobProvider, - private val workerProvider: WorkerProvider + private val workerProvider: WorkerProvider, ) : JobManager { private val jsonParser = BasicJsonParser() + @Lazy + @Autowired + private lateinit var jobScheduler: JobScheduler + override fun listJobPage(param: JobQueryParam): Page { return jobProvider.listJobPage(param) } @@ -70,7 +80,7 @@ open class DefaultJobManager( request.jobParam = "{}" } validate { jsonParser.parseMap(request.jobParam) } - require(request.jobHandler.isNotBlank()) + // 验证路由策略 requireNotNull(RouteStrategyEnum.ofCode(routeStrategy)) // 验证过期策略 @@ -81,6 +91,14 @@ open class DefaultJobManager( val workerGroup = workerProvider.findGroupById(groupId) requireNotNull(workerGroup) + // 验证任务类型 + val jobModeEnum = JobModeEnum.ofCode(jobMode) + requireNotNull(jobModeEnum) + val finalImage = if (jobModeEnum.isContainer && image == null) DEFAULT_IMAGE else image + if (jobModeEnum == JobModeEnum.BEAN) { + require(request.jobHandler.isNotBlank()) + } + val jobInfo = JobInfo( name = name, description = description, @@ -97,16 +115,21 @@ open class DefaultJobManager( maxRetryCount = maxRetryCount, lastTriggerTime = 0, nextTriggerTime = 0, - triggerStatus = TriggerStatusEnum.RUNNING.code(), + triggerStatus = TriggerStatusEnum.STOP.code(), createTime = LocalDateTime.now(), - updateTime = LocalDateTime.now() + updateTime = LocalDateTime.now(), + source = source, + image = finalImage, ) - // 生成下次执行时间 - val from = LocalDateTime.now().plus(PRE_LOAD_TIME, ChronoUnit.MILLIS) - val nextTriggerTime = computeNextTriggerTime(jobInfo, from) - requireNotNull(nextTriggerTime) - jobInfo.nextTriggerTime = nextTriggerTime + // 一次性任务,不主动触发 + if (scheduleTypeEnum != ScheduleTypeEnum.IMMEDIATELY) { + // 生成下次执行时间 + val from = LocalDateTime.now().plus(PRE_LOAD_TIME, ChronoUnit.MILLIS) + val nextTriggerTime = computeNextTriggerTime(jobInfo, from) + requireNotNull(nextTriggerTime) + jobInfo.nextTriggerTime = nextTriggerTime + } return jobProvider.addJob(jobInfo).also { jobInfo.id = it @@ -152,6 +175,13 @@ open class DefaultJobManager( requireNotNull(BlockStrategyEnum.ofCode(it)) jobInfo.blockStrategy = it } + image?.let { + jobInfo.image = image + } + source?.let { + jobInfo.source = it + } + jobInfo.updateTime = LocalDateTime.now() jobProvider.updateJob(jobInfo).also { logger.info("update job[${jobInfo.id}] success") } @@ -161,6 +191,9 @@ open class DefaultJobManager( override fun startJob(id: String) { val job = jobProvider.findJobById(id) requireNotNull(job) + if (job.scheduleType == ScheduleTypeEnum.IMMEDIATELY.code()) { + throw IllegalArgumentException("IMMEDIATELY schedule type limit start.") + } // 生成下次执行时间, 延后一段时间执行,避开预读周期 val from = LocalDateTime.now().plus(PRE_LOAD_TIME, ChronoUnit.MILLIS) val nextTriggerTime = computeNextTriggerTime(job, from) @@ -194,6 +227,12 @@ open class DefaultJobManager( logger.info("delete job[$id] success") } + override fun triggerJob(id: String, executorParam: String?) { + val job = jobProvider.findJobById(id) + requireNotNull(job) + jobScheduler.trigger(job.id.orEmpty(), TriggerTypeEnum.MANUAL, jobParam = executorParam) + } + override fun updateJobSchedule(job: JobInfo) { jobProvider.updateJobSchedule(job) logger.debug("update job schedule[${job.id}] success") @@ -249,9 +288,11 @@ open class DefaultJobManager( val triggerTime = validate { LocalDateTime.parse(scheduleConf, DATE_TIME_FORMATTER) } require(triggerTime.isAfter(LocalDateTime.now())) } + ScheduleTypeEnum.FIX_RATE -> { validate { scheduleConf.toLong() > 0 } } + ScheduleTypeEnum.CRON -> { validate { CronExpression.parse(scheduleConf) } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt index 6f40d30..b29a99f 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt @@ -45,6 +45,13 @@ interface JobManager { */ fun deleteJob(id: String) + /** + * 触发任务 + * @param id 任务id + * @param executorParam 任务参数 + * */ + fun triggerJob(id: String, executorParam: String?) + /** * 更新任务调度信息 * 只更新调度状态、上次调度时间、下次调度时间 diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt index 0630570..aba06a8 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt @@ -6,7 +6,6 @@ import com.tencent.devops.schedule.enums.BlockStrategyEnum import com.tencent.devops.schedule.enums.ExecutionCodeEnum import com.tencent.devops.schedule.enums.RouteStrategyEnum import com.tencent.devops.schedule.enums.TriggerCodeEnum -import com.tencent.devops.schedule.enums.TriggerStatusEnum import com.tencent.devops.schedule.enums.TriggerTypeEnum import com.tencent.devops.schedule.manager.JobManager import com.tencent.devops.schedule.manager.WorkerManager @@ -35,13 +34,14 @@ class DefaultJobScheduler( private val workerManager: WorkerManager, private val lockProvider: LockProvider, private val scheduleServerProperties: ScheduleServerProperties, - private val workerRpcClient: WorkerRpcClient + private val workerRpcClient: WorkerRpcClient, ) : JobScheduler, InitializingBean, DisposableBean { private lateinit var triggerThreadPool: ThreadPoolExecutor private lateinit var jobTodoMonitor: JobTodoMonitor private lateinit var jobRetryMonitor: JobRetryMonitor - //private lateinit var jobLostMonitor: JobLostMonitor + + // private lateinit var jobLostMonitor: JobLostMonitor private lateinit var workerStatusMonitor: WorkerStatusMonitor override fun getJobManager() = jobManager @@ -84,7 +84,7 @@ class DefaultJobScheduler( triggerType: TriggerTypeEnum, retryCount: Int?, jobParam: String?, - shardingParam: String? + shardingParam: String?, ) { logger.debug("prepare trigger job[$jobId]") triggerThreadPool.execute { @@ -93,10 +93,6 @@ class DefaultJobScheduler( logger.warn("trigger job[$jobId] failed: job not exists.") return@execute } - if (job.triggerStatus == TriggerStatusEnum.STOP.code()) { - logger.warn("trigger job[$jobId] failed: job is stopped.") - return@execute - } jobParam?.let { job.jobParam = it } val finalRetryCount = retryCount ?: job.maxRetryCount val group = workerManager.findGroupById(job.groupId) ?: run { @@ -133,7 +129,7 @@ class DefaultJobScheduler( triggerType: TriggerTypeEnum, retryCount: Int, index: Int, - total: Int + total: Int, ) { val blockStrategy = BlockStrategyEnum.ofCode(job.blockStrategy) val routeStrategy = RouteStrategyEnum.ofCode(job.routeStrategy) @@ -146,7 +142,7 @@ class DefaultJobScheduler( jobId = job.id.orEmpty(), groupId = group.id.orEmpty(), triggerType = triggerType.code(), - triggerTime = LocalDateTime.now() + triggerTime = LocalDateTime.now(), ) val logId = jobManager.addJobLog(jobLog) // 2. 构造trigger param @@ -159,7 +155,11 @@ class DefaultJobScheduler( logId = logId, triggerTime = jobLog.triggerTime, broadcastIndex = index, - broadcastTotal = total + broadcastTotal = total, + updateTime = job.updateTime, + source = job.source, + image = job.image, + jobMode = job.jobMode, ) // 3. 选择worker地址 require(group.registryList.isNotEmpty()) { "没有可用的worker地址" } @@ -199,7 +199,7 @@ class DefaultJobScheduler( scheduleServerProperties.maxTriggerPoolSize, 60L, TimeUnit.SECONDS, - LinkedBlockingQueue(1000) + LinkedBlockingQueue(1000), ) { runnable -> Thread(runnable, "job-trigger-${runnable.hashCode()}") } @@ -213,7 +213,9 @@ class DefaultJobScheduler( return try { if (parts.size == 2) { Pair(parts[0].toInt(), parts[1].toInt()) - } else null + } else { + null + } } catch (ignored: Exception) { null } @@ -223,4 +225,3 @@ class DefaultJobScheduler( private val logger = LoggerFactory.getLogger(DefaultJobScheduler::class.java) } } - diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/web/JobController.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/web/JobController.kt index 5200796..2f3e45f 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/web/JobController.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/web/JobController.kt @@ -22,7 +22,7 @@ import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping("$SERVER_BASE_PATH$SERVER_API_V1") class JobController( - private val jobManager: JobManager + private val jobManager: JobManager, ) { @GetMapping("/job/list") @@ -59,6 +59,12 @@ class JobController( return Response.success() } + @PostMapping("/job/trigger") + fun trigger(@RequestParam id: String, @RequestBody(required = false) executorParam: String?): Response { + jobManager.triggerJob(id, executorParam) + return Response.success() + } + @GetMapping("/log/list") fun listLog(param: LogQueryParam): Response> { val page = jobManager.listLogPage(param) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/build.gradle.kts b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/build.gradle.kts index 698340d..e3000d0 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/build.gradle.kts +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/build.gradle.kts @@ -2,5 +2,7 @@ description = "DevOps Boot Schedule Worker" dependencies { api(project(":devops-boot-project:devops-boot-core:devops-schedule:devops-schedule-common")) + api(project(":devops-boot-project:devops-boot-core:devops-common")) + api("io.kubernetes:client-java") compileOnly("org.springframework.cloud:spring-cloud-starter") } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt index 2dba1a1..9788184 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt @@ -2,6 +2,7 @@ package com.tencent.devops.schedule.config import com.tencent.devops.schedule.config.ScheduleWorkerProperties.Companion.PREFIX import com.tencent.devops.schedule.constants.WorkerRegistryMode +import com.tencent.devops.schedule.k8s.K8sProperties import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(prefix = PREFIX) @@ -18,6 +19,10 @@ data class ScheduleWorkerProperties( * 自身地址,为空则自动获取。仅当mode=AUTO时有效 */ var address: String = "", + /** + * 资源存放路径 + * */ + var sourcePath: String = System.getProperty("java.io.tmpdir"), /** * 执行器配置 */ @@ -25,7 +30,11 @@ data class ScheduleWorkerProperties( /** * 调度中心配置 */ - var server: ScheduleWorkerServerProperties = ScheduleWorkerServerProperties() + var server: ScheduleWorkerServerProperties = ScheduleWorkerServerProperties(), + /** + * k8s环境配置 + * */ + var k8s: K8sProperties = K8sProperties(), ) { companion object { const val PREFIX = "devops.schedule.worker" @@ -59,7 +68,6 @@ data class ScheduleWorkerProperties( /** * 调度中心地址,DISCOVERY模式下可填写服务名称 */ - var address: String = "http://localhost:8080" + var address: String = "http://localhost:8080", ) - } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt index 3dd45f8..998dff7 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt @@ -2,91 +2,137 @@ package com.tencent.devops.schedule.executor import com.tencent.devops.schedule.api.ServerRpcClient import com.tencent.devops.schedule.config.ScheduleWorkerProperties -import com.tencent.devops.schedule.pojo.job.JobExecutionResult +import com.tencent.devops.schedule.enums.BlockStrategyEnum +import com.tencent.devops.schedule.enums.JobModeEnum +import com.tencent.devops.schedule.enums.JobModeEnum.BEAN +import com.tencent.devops.schedule.enums.JobModeEnum.K8S_SHELL +import com.tencent.devops.schedule.enums.JobModeEnum.SHELL +import com.tencent.devops.schedule.handler.K8sShellHandler +import com.tencent.devops.schedule.handler.ShellHandler +import com.tencent.devops.schedule.k8s.K8sHelper import com.tencent.devops.schedule.pojo.trigger.TriggerParam -import com.tencent.devops.utils.jackson.readJsonString +import com.tencent.devops.schedule.thread.JobThread import com.tencent.devops.web.util.SpringContextHolder import org.slf4j.LoggerFactory import org.springframework.beans.BeansException -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit +import org.springframework.beans.factory.DisposableBean +import java.util.concurrent.ConcurrentHashMap /** * JobExecutor默认实现 */ class DefaultJobExecutor( private val workerProperties: ScheduleWorkerProperties, - private val serverRpcClient: ServerRpcClient -) : JobExecutor { + private val serverRpcClient: ServerRpcClient, +) : JobExecutor, DisposableBean { - /** - * 任务执行线程池 - */ - private val threadPool = createThreadPool() + init { + Companion.serverRpcClient = serverRpcClient + } + + private val k8sShellHandler: K8sShellHandler by lazy { createK8sHandler() } + private val shellHandler: ShellHandler = ShellHandler(workerProperties.sourcePath) override fun execute(param: TriggerParam) { val jobId = param.jobId val logId = param.logId - logger.debug("prepare to execute job[$jobId], log[$logId]: $param") - require(param.jobHandler.isNotBlank()) + logger.debug("prepare to execute job[$jobId], log[$logId]: {}", param) require(param.jobId.isNotBlank()) require(param.logId.isNotBlank()) require(param.jobParam.isNotBlank()) - val context = buildJobContext(param) - // 寻找jobHandler bean - val handler = try { - SpringContextHolder.getBean(JobHandler::class.java, param.jobHandler) - } catch (e: BeansException) { - throw RuntimeException("未找到jobHandler[${param.jobHandler}]") - } - threadPool.submit { - // 执行任务逻辑,获取结果 - val result = try { - handler.execute(context) - } catch (e: Throwable) { - logger.error("execute job log[$logId] error: ${e.message}", e) - JobExecutionResult.failed(e.message.orEmpty()) + + // 根据job的类型,选择不同的执行方式 + val jobMode = param.jobMode + val handler = when (JobModeEnum.ofCode(jobMode)) { + BEAN -> { + try { + SpringContextHolder.getBean(JobHandler::class.java, param.jobHandler) + } catch (e: BeansException) { + throw RuntimeException("未找到jobHandler[${param.jobHandler}]") + } } - result.logId = logId - logger.info("complete job log[$logId]: $result") - // 上报任务结果 - try { - serverRpcClient.submitResult(result) - logger.info("submit job log[$logId] result success") - } catch (e: Exception) { - logger.error("submit job log[$logId] result error: ${e.message}", e) + + SHELL -> shellHandler + + K8S_SHELL -> k8sShellHandler + + else -> { + // 不支持任务类型 + throw IllegalArgumentException("Job mode [$jobMode] not supported") } } - } + var jobThread = loadJobThread(jobId) + if (jobThread != null) { + val blockStrategy = BlockStrategyEnum.ofCode(param.blockStrategy) + when (blockStrategy) { + BlockStrategyEnum.DISCARD_LATER -> { + if (jobThread.running.get()) { + logger.warn("discard task $logId") + return + } + } + + BlockStrategyEnum.COVER_EARLY -> { + if (jobThread.running.get()) { + logger.warn("cover early $logId") + jobThread = null + } + } - private fun buildJobContext(param: TriggerParam): JobContext { - with(param) { - return JobContext( - jobId = jobId, - jobParamMap = jobParam.readJsonString(), - logId = logId, - triggerTime = triggerTime, - broadcastIndex = broadcastIndex, - broadcastTotal = broadcastTotal - ) + else -> { + // 入队 + } + } + } + if (jobThread == null) { + jobThread = registerJobThread(jobId, handler) } + jobThread.pushTriggerQueue(param) } - private fun createThreadPool(): ThreadPoolExecutor { - return ThreadPoolExecutor( - workerProperties.executor.corePoolSize, - workerProperties.executor.maximumPoolSize, - workerProperties.executor.keepAliveTime, - TimeUnit.SECONDS, - ArrayBlockingQueue(1024) - ) { runnable -> - Thread(runnable, "job-executor-${runnable.hashCode()}") + override fun destroy() { + jobThreadRepository.values.forEach { + logger.info("Destroying job thread ${it.name}") + val oldJobThread = removeJobThread(it.jobId) + if (oldJobThread != null) { + try { + oldJobThread.join() + } catch (e: Exception) { + logger.error("JobThread destroy(join) error, jobId:{}", oldJobThread.jobId) + } + } } + jobThreadRepository.clear() + } + + private fun createK8sHandler(): K8sShellHandler { + val k8sProperties = workerProperties.k8s + val k8sClient = K8sHelper.createClient(k8sProperties) + return K8sShellHandler(k8sClient, k8sProperties.namespace, k8sProperties.limit) } companion object { private val logger = LoggerFactory.getLogger(DefaultJobExecutor::class.java) - } + private val jobThreadRepository = ConcurrentHashMap() + private lateinit var serverRpcClient: ServerRpcClient + fun registerJobThread(jobId: String, handler: JobHandler): JobThread { + val newJobThread = JobThread(jobId, handler, serverRpcClient) + newJobThread.start() + jobThreadRepository.putIfAbsent(jobId, newJobThread)?.toStop() + return newJobThread + } + fun removeJobThread(jobId: String): JobThread? { + val oldJobThread = jobThreadRepository.remove(jobId) + if (oldJobThread != null) { + oldJobThread.toStop() + return oldJobThread + } + return null + } + + fun loadJobThread(jobId: String): JobThread? { + return jobThreadRepository[jobId] + } + } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/JobContext.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/JobContext.kt index e37400e..98adaa9 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/JobContext.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/JobContext.kt @@ -19,6 +19,10 @@ data class JobContext( * 本次任务触发时间 */ var triggerTime: LocalDateTime, + /** + * 任务更新时间 + * */ + var updateTime: LocalDateTime, /** * 分片广播序号 */ @@ -26,5 +30,13 @@ data class JobContext( /** * 分片广播总数 */ - var broadcastTotal: Int = 1 + var broadcastTotal: Int = 1, + /** + * 资源内容,可以是脚本,也可以是yaml + * */ + var source: String?, + /** + * k8s任务使用的镜像 + * */ + var image: String?, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/JobSystemEnv.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/JobSystemEnv.kt new file mode 100644 index 0000000..2c60a9f --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/JobSystemEnv.kt @@ -0,0 +1,13 @@ +package com.tencent.devops.schedule.handler + +/** + * 任务系统环境变量 + * */ +object JobSystemEnv { + const val JOB_ID = "DEVOPS_SCHEDULE_JOB_ID" + const val JOB_PARAMETERS = "DEVOPS_SCHEDULE_JOB_PARAMETERS" + const val LOG_ID = "DEVOPS_SCHEDULE_LOG_ID" + const val TRIGGER_TIME = "DEVOPS_SCHEDULE_TRIGGER_TIME" + const val BROADCAST_INDEX = "DEVOPS_SCHEDULE_BROADCAST_INDEX" + const val BROADCAST_TOTAL = "DEVOPS_SCHEDULE_BROADCAST_TOTAL" +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/K8sShellHandler.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/K8sShellHandler.kt new file mode 100644 index 0000000..85c598a --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/K8sShellHandler.kt @@ -0,0 +1,183 @@ +package com.tencent.devops.schedule.handler + +import com.tencent.devops.common.time.toEpochMilli +import com.tencent.devops.schedule.executor.JobContext +import com.tencent.devops.schedule.executor.JobHandler +import com.tencent.devops.schedule.handler.JobSystemEnv.BROADCAST_INDEX +import com.tencent.devops.schedule.handler.JobSystemEnv.BROADCAST_TOTAL +import com.tencent.devops.schedule.handler.JobSystemEnv.JOB_ID +import com.tencent.devops.schedule.handler.JobSystemEnv.JOB_PARAMETERS +import com.tencent.devops.schedule.handler.JobSystemEnv.LOG_ID +import com.tencent.devops.schedule.handler.JobSystemEnv.TRIGGER_TIME +import com.tencent.devops.schedule.handler.ShellHandler.Companion.BASH_CMD +import com.tencent.devops.schedule.k8s.ResourceLimitProperties +import com.tencent.devops.schedule.k8s.V1ConfigMap +import com.tencent.devops.schedule.k8s.V1Pod +import com.tencent.devops.schedule.k8s.buildMessage +import com.tencent.devops.schedule.k8s.configMap +import com.tencent.devops.schedule.k8s.containers +import com.tencent.devops.schedule.k8s.exec +import com.tencent.devops.schedule.k8s.limits +import com.tencent.devops.schedule.k8s.metadata +import com.tencent.devops.schedule.k8s.newEnvVar +import com.tencent.devops.schedule.k8s.newKeyToPath +import com.tencent.devops.schedule.k8s.requests +import com.tencent.devops.schedule.k8s.resources +import com.tencent.devops.schedule.k8s.spec +import com.tencent.devops.schedule.k8s.volumeMounts +import com.tencent.devops.schedule.k8s.volumes +import com.tencent.devops.schedule.pojo.job.JobExecutionResult +import com.tencent.devops.utils.jackson.toJsonString +import io.kubernetes.client.openapi.ApiClient +import io.kubernetes.client.openapi.ApiException +import io.kubernetes.client.openapi.apis.CoreV1Api +import io.kubernetes.client.openapi.models.V1Container +import org.slf4j.LoggerFactory + +/** + * 处理k8s shell任务 + * 通过configmap保存脚本,使用Pod运行任务 + * */ +class K8sShellHandler( + private val client: ApiClient, + private val namespace: String, + private val limitProperties: ResourceLimitProperties, +) : JobHandler { + override fun execute(context: JobContext): JobExecutionResult { + with(context) { + val podName = "schedule-shell-$logId" + var createdPod = false + val api = CoreV1Api(client) + try { + val configMapName = "schedule-shell-$jobId-${updateTime.toEpochMilli()}" + if (api.exec { api.readNamespacedConfigMap(configMapName, namespace, null, null, null) } == null) { + val configMapBody = V1ConfigMap { + metadata { + name = configMapName + } + data = mapOf(CMD to source) + } + val configMap = api.createNamespacedConfigMap(namespace, configMapBody, null, null, null) + logger.info("Created configmap $configMap") + } + val podBody = V1Pod { + metadata { + name = podName + } + spec { + containers { + name = logId + image = context.image + command = listOf(BASH_CMD, CMD) + workingDir = WORK_SPACE + setEnv(context) + volumeMounts { + name = "shell-$logId" + mountPath = WORK_SPACE + readOnly = true + } + resources { + limits( + cpu = limitProperties.limitCpu, + memory = limitProperties.limitMem, + ephemeralStorage = limitProperties.limitStorage, + ) + requests( + cpu = limitProperties.requestCpu, + memory = limitProperties.requestMem, + ephemeralStorage = limitProperties.requestStorage, + ) + } + } + volumes { + name = "shell-$logId" + configMap { + name = configMapName + items = listOf( + newKeyToPath { + key = CMD + path = CMD + }, + ) + } + } + restartPolicy = "Never" + } + } + api.createNamespacedPod(namespace, podBody, null, null, null) + logger.info("Created pod $podName") + createdPod = true + var pod = api.exec { api.readNamespacedPod(podName, namespace, null, null, null) } + var status = pod?.status?.phase.orEmpty() + logger.info("Pod status: $status") + while (pod != null && (status == "Running" || status == "Pending")) { + Thread.sleep(1000) + pod = api.exec { api.readNamespacedPod(podName, namespace, null, null, null) } + status = pod?.status?.phase.orEmpty() + } + logger.info("Pod status: $status") + val log = api.readNamespacedPodLog( + podName, + namespace, + logId, + true, + null, + null, + null, + null, + null, + null, + null, + ) + logger.info("Pod log: $log") + check(pod?.status?.phase == "Succeeded") + } catch (e: ApiException) { + logger.error(e.buildMessage()) + throw e + } finally { + if (createdPod) { + api.exec { api.deleteNamespacedPod(podName, namespace, null, null, null, null, null, null) } + logger.info("Delete pod $podName.") + } + } + } + return JobExecutionResult.success() + } + + private fun V1Container.setEnv(context: JobContext) { + with(context) { + env = listOf( + newEnvVar { + name = JOB_ID + value = jobId + }, + newEnvVar { + name = LOG_ID + value = logId + }, + newEnvVar { + name = JOB_PARAMETERS + value = jobParamMap.toJsonString() + }, + newEnvVar { + name = TRIGGER_TIME + value = triggerTime.toEpochMilli().toString() + }, + newEnvVar { + name = BROADCAST_INDEX + value = broadcastIndex.toString() + }, + newEnvVar { + name = BROADCAST_TOTAL + value = broadcastTotal.toString() + }, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(K8sShellHandler::class.java) + private const val CMD = "run.sh" + private const val WORK_SPACE = "/data/workspace" + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/ShellHandler.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/ShellHandler.kt new file mode 100644 index 0000000..3c283a4 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/handler/ShellHandler.kt @@ -0,0 +1,86 @@ +package com.tencent.devops.schedule.handler + +import com.tencent.devops.common.time.toEpochMilli +import com.tencent.devops.schedule.executor.JobContext +import com.tencent.devops.schedule.executor.JobHandler +import com.tencent.devops.schedule.pojo.job.JobExecutionResult +import com.tencent.devops.utils.jackson.toJsonString +import org.slf4j.LoggerFactory +import java.io.BufferedReader +import java.io.InputStream +import java.io.InputStreamReader +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths +import kotlin.concurrent.thread + +/** + * 处理脚本任务 + * 在主机本地生成脚本,并执行。 + * */ +class ShellHandler(private val path: String) : JobHandler { + override fun execute(context: JobContext): JobExecutionResult { + // 生成脚本文件 + with(context) { + requireNotNull(source) + logger.info("Prepare script") + val scriptFileName = "${jobId}_${updateTime.toEpochMilli()}.sh" + val scriptFilePath = Paths.get(path, scriptFileName) + // 创建脚本文件 + if (!Files.exists(scriptFilePath)) { + if (!Files.exists(scriptFilePath.parent)) { + Files.createDirectories(scriptFilePath.parent) + } + Files.createFile(scriptFilePath) + scriptFilePath.toFile().writeText(source!!) + logger.info("Generate script file $scriptFilePath") + } else { + logger.info("Find file $scriptFilePath") + } + // 运行脚本 + logger.info("Execute script") + val pb = ProcessBuilder(BASH_CMD, scriptFilePath.toString()) + setEnv(pb, context) + pb.redirectErrorStream(true) + val process = pb.start() + thread { + showLog(process.inputStream) + } + val exitValue = process.waitFor() + return if (exitValue == 0) { + JobExecutionResult.success() + } else { + JobExecutionResult.failed("script exit value($exitValue) is failed") + } + } + } + + private fun showLog(inputStream: InputStream, error: Boolean = false) { + val reader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8)) + reader.use { + var line = it.readLine() + while (line != null) { + if (error) { + logger.error(line) + } else { + logger.info(line) + } + line = it.readLine() + } + } + } + + private fun setEnv(pb: ProcessBuilder, context: JobContext) { + pb.environment()[JobSystemEnv.JOB_ID] = context.jobId + pb.environment()[JobSystemEnv.JOB_PARAMETERS] = context.jobParamMap.toJsonString() + pb.environment()[JobSystemEnv.LOG_ID] = context.logId + pb.environment()[JobSystemEnv.TRIGGER_TIME] = context.triggerTime.toEpochMilli().toString() + pb.environment()[JobSystemEnv.BROADCAST_INDEX] = context.broadcastIndex.toString() + pb.environment()[JobSystemEnv.BROADCAST_TOTAL] = context.broadcastTotal.toString() + } + + companion object { + private val logger = LoggerFactory.getLogger(ShellHandler::class.java) + const val BASH_CMD = "bash" + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sDsl.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sDsl.kt new file mode 100644 index 0000000..b914851 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sDsl.kt @@ -0,0 +1,98 @@ +package com.tencent.devops.schedule.k8s + +import io.kubernetes.client.custom.Quantity +import io.kubernetes.client.openapi.models.V1ConfigMap +import io.kubernetes.client.openapi.models.V1ConfigMapVolumeSource +import io.kubernetes.client.openapi.models.V1Container +import io.kubernetes.client.openapi.models.V1EnvVar +import io.kubernetes.client.openapi.models.V1KeyToPath +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import io.kubernetes.client.openapi.models.V1PodSpec +import io.kubernetes.client.openapi.models.V1ResourceRequirements +import io.kubernetes.client.openapi.models.V1Volume +import io.kubernetes.client.openapi.models.V1VolumeMount + +fun V1Pod.metadata(configuration: V1ObjectMeta.() -> Unit) { + if (metadata == null) { + metadata = V1ObjectMeta() + } + metadata!!.configuration() +} + +fun V1ConfigMap.metadata(configuration: V1ObjectMeta.() -> Unit) { + if (metadata == null) { + metadata = V1ObjectMeta() + } + metadata!!.configuration() +} + +fun V1ConfigMap(configuration: V1ConfigMap.() -> Unit): V1ConfigMap { + return V1ConfigMap().apply(configuration) +} + +fun V1Pod(configuration: V1Pod.() -> Unit): V1Pod { + return V1Pod().apply(configuration) +} + +fun V1Pod.spec(configuration: V1PodSpec.() -> Unit) { + if (spec == null) { + spec = V1PodSpec() + } + spec!!.configuration() +} + +fun V1PodSpec.containers(configuration: V1Container.() -> Unit) { + addContainersItem(V1Container().apply(configuration)) +} + +fun V1PodSpec.volumes(configuration: V1Volume.() -> Unit) { + addVolumesItem(V1Volume().apply(configuration)) +} + +fun V1Container.volumeMounts(configuration: V1VolumeMount.() -> Unit) { + addVolumeMountsItem(V1VolumeMount().apply(configuration)) +} + +fun V1Volume.configMap(configuration: V1ConfigMapVolumeSource.() -> Unit) { + configMap(V1ConfigMapVolumeSource().apply(configuration)) +} + +fun newVolumeMounts(configuration: V1VolumeMount.() -> Unit): V1VolumeMount { + return V1VolumeMount().apply(configuration) +} + +fun newEnvVar(configuration: V1EnvVar.() -> Unit): V1EnvVar { + return V1EnvVar().apply(configuration) +} + +fun newKeyToPath(configuration: V1KeyToPath.() -> Unit): V1KeyToPath { + return V1KeyToPath().apply(configuration) +} + +fun V1Container.resources(configuration: V1ResourceRequirements.() -> Unit) { + if (resources == null) { + resources = V1ResourceRequirements() + } + resources!!.configuration() +} + +fun V1ResourceRequirements.limits(cpu: Double, memory: Long, ephemeralStorage: Long) { + limits( + mapOf( + "cpu" to Quantity("$cpu"), + "memory" to Quantity("$memory"), + "ephemeral-storage" to Quantity("$ephemeralStorage"), + ), + ) +} + +fun V1ResourceRequirements.requests(cpu: Double, memory: Long, ephemeralStorage: Long) { + requests( + mapOf( + "cpu" to Quantity("$cpu"), + "memory" to Quantity("$memory"), + "ephemeral-storage" to Quantity("$ephemeralStorage"), + ), + ) +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sExtensions.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sExtensions.kt new file mode 100644 index 0000000..37f5e58 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sExtensions.kt @@ -0,0 +1,24 @@ +package com.tencent.devops.schedule.k8s + +import io.kubernetes.client.openapi.ApiException +import io.kubernetes.client.openapi.apis.CoreV1Api +import org.springframework.http.HttpStatus + +fun ApiException.buildMessage(): String { + val builder = StringBuilder().append(code) + .appendLine("[$message]") + .appendLine(responseHeaders) + .appendLine(responseBody) + return builder.toString() +} + +fun CoreV1Api.exec(block: () -> T?): T? { + try { + return block() + } catch (e: ApiException) { + if (e.code == HttpStatus.NOT_FOUND.value()) { + return null + } + throw e + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sHelper.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sHelper.kt new file mode 100644 index 0000000..a333a15 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sHelper.kt @@ -0,0 +1,25 @@ +package com.tencent.devops.schedule.k8s + +import io.kubernetes.client.openapi.ApiClient +import io.kubernetes.client.util.ClientBuilder +import io.kubernetes.client.util.Config +import io.kubernetes.client.util.credentials.AccessTokenAuthentication + +/** + * k8s工具类 + * */ +object K8sHelper { + /** + * 根据k8s属性,创建client,如果没有配置,默认使用本地client + * */ + fun createClient(k8sProps: K8sProperties): ApiClient { + return if (k8sProps.token != null && k8sProps.apiServer != null) { + ClientBuilder() + .setBasePath(k8sProps.apiServer) + .setAuthentication(AccessTokenAuthentication(k8sProps.token)) + .build() + } else { + Config.defaultClient() + } + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sProperties.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sProperties.kt new file mode 100644 index 0000000..a203085 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/K8sProperties.kt @@ -0,0 +1,8 @@ +package com.tencent.devops.schedule.k8s + +data class K8sProperties( + var namespace: String = "default", + var apiServer: String? = null, + var token: String? = null, + var limit: ResourceLimitProperties = ResourceLimitProperties(), +) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/ResourceLimitProperties.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/ResourceLimitProperties.kt new file mode 100644 index 0000000..b48b1cf --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/k8s/ResourceLimitProperties.kt @@ -0,0 +1,17 @@ +package com.tencent.devops.schedule.k8s + +/** + * 资源限制 + * */ +data class ResourceLimitProperties( + val limitMem: Long = 32 * GB, + val limitStorage: Long = 128 * GB, + val limitCpu: Double = 16.0, + val requestMem: Long = 16 * GB, + val requestStorage: Long = 16 * GB, + val requestCpu: Double = 4.0, +) { + companion object { + private const val GB = 1024 * 1024 * 1024L + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt new file mode 100644 index 0000000..2b16490 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt @@ -0,0 +1,117 @@ +package com.tencent.devops.schedule.thread + +import com.tencent.devops.schedule.api.ServerRpcClient +import com.tencent.devops.schedule.executor.DefaultJobExecutor +import com.tencent.devops.schedule.executor.JobContext +import com.tencent.devops.schedule.executor.JobHandler +import com.tencent.devops.schedule.pojo.job.JobExecutionResult +import com.tencent.devops.schedule.pojo.trigger.TriggerParam +import com.tencent.devops.utils.jackson.readJsonString +import org.slf4j.LoggerFactory +import java.util.Base64 +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +/** + * 任务线程 + * */ +class JobThread( + val jobId: String, + private val jobHandler: JobHandler, + private val serverRpcClient: ServerRpcClient, +) : Thread() { + private val triggerLogIdSet = mutableSetOf() + private val triggerQueue = LinkedBlockingQueue() + private val stop = AtomicBoolean(false) + private var idleTimes = 0 + private var base64Decoder = Base64.getDecoder() + var running = AtomicBoolean(false) + + init { + name = "JobThread-$jobId" + } + + override fun run() { + logger.info("$name started") + while (!stop.get()) { + idleTimes++ + running.set(false) + try { + val triggerParam = triggerQueue.poll(3, TimeUnit.SECONDS) + if (triggerParam != null) { + running.set(true) + idleTimes = 0 + val logId = triggerParam.logId + triggerLogIdSet.remove(logId) + val context = buildJobContext(triggerParam) + // 执行任务逻辑,获取结果 + val result = try { + jobHandler.execute(context) + } catch (e: Throwable) { + logger.error("execute job log[$logId] error: ${e.message}", e) + JobExecutionResult.failed(e.message.orEmpty()) + } + result.logId = logId + logger.info("complete job log[$logId]: $result") + // 上报任务结果 + try { + serverRpcClient.submitResult(result) + logger.info("submit job log[$logId] result success") + } catch (e: Exception) { + logger.error("submit job log[$logId] result error: ${e.message}", e) + } + } else if (idleTimes > 30 && triggerQueue.size == 0) { + // 超过最大空闲事件 + logger.info("executor idle times over limit") + DefaultJobExecutor.removeJobThread(jobId) + } + } catch (e: Exception) { + if (!stop.get()) { + throw e + } + } + } + logger.info("$name stopped") + } + + fun pushTriggerQueue(triggerParam: TriggerParam): Boolean { + if (triggerLogIdSet.contains(triggerParam.logId)) { + return false + } + triggerQueue.add(triggerParam) + triggerLogIdSet.add(triggerParam.logId) + return true + } + + fun toStop() { + logger.info("Stopping $name") + stop.set(true) + if (running.get()) { + logger.info("$name is running now,waiting...") + while (running.get()) { + // empty + } + } + } + + private fun buildJobContext(param: TriggerParam): JobContext { + with(param) { + return JobContext( + jobId = jobId, + jobParamMap = jobParam.readJsonString(), + logId = logId, + triggerTime = triggerTime, + broadcastIndex = broadcastIndex, + broadcastTotal = broadcastTotal, + source = if (source != null) String(base64Decoder.decode(param.source)) else null, + image = if (param.image != null) param.image else null, + updateTime = param.updateTime, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(JobThread::class.java) + } +}