Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 调度支持k8s job #203 #204

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.tencent.devops.common.time

import java.time.LocalDateTime
import java.time.ZoneId

fun LocalDateTime.toEpochMilli(zoneId: ZoneId = ZoneId.systemDefault()): Long {
return this.atZone(zoneId).toInstant().toEpochMilli()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package com.tencent.devops.schedule.enums
*/
enum class BlockStrategyEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
) : DictItem {

DEFAULT(1, "默认策略");
SERIAL_EXECUTION(1, "串行"),
DISCARD_LATER(2, "丢弃最后"),
COVER_EARLY(3, "覆盖之前"),
;

override fun code() = code
override fun description() = label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,32 @@ package com.tencent.devops.schedule.enums
*/
enum class JobModeEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
val isContainer: Boolean,
val isScript: Boolean,
) : DictItem {
/**
* Java Bean
*/
BEAN(1, "Java Bean"),
BEAN(1, "Java Bean", false, false),

/**
* Shell
*/
SHELL(2, "Shell");
SHELL(2, "Shell", false, true),

/**
* K8s shell
* */
K8S_SHELL(3, "K8s shell", true, true),
;

override fun code() = code
override fun description() = label

companion object {
const val DEFAULT_IMAGE = "bash"

/**
* 根据[code]查找对应的枚举类型
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package com.tencent.devops.schedule.enums
*/
enum class ScheduleTypeEnum(
private val code: Int,
private val label: String
): DictItem {
private val label: String,
) : DictItem {
/**
* 立即执行
* 立即执行,调度器不会进行调度,需要主动触发才会执行
*/
IMMEDIATELY(1, "立即执行"),

Expand All @@ -25,7 +25,8 @@ enum class ScheduleTypeEnum(
/**
* cron表达式
*/
CRON(4, "Cron表达式");
CRON(4, "Cron表达式"),
;

override fun code() = code
override fun description() = label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,13 @@ data class JobCreateRequest(
/**
* 最大重试次数
*/
val maxRetryCount: Int
val maxRetryCount: Int,
/**
* 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码
* */
val source: String? = null,
/**
* 镜像地址,容器任务需要
* */
val image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,12 @@ data class JobInfo(
* 下次执行时间
*/
var nextTriggerTime: Long = 0,
/**
* 资源内容,可以是脚本,也可以是yaml
* */
var source: String? = null,
/**
* 镜像地址
* */
var image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ data class JobUpdateRequest(
* 最大重试次数
*/
val maxRetryCount: Int? = null,
/**
* 资源内容,可以是脚本,也可以是yaml,使用了basic64编码,使用时需要先解码
* */
val source: String? = null,
/**
* 镜像地址,容器任务需要
* */
val image: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import java.time.LocalDateTime

data class TriggerParam(
var jobId: String,
var jobMode: Int,
var source: String? = null,
var image: String? = null,
var jobHandler: String,
var jobParam: String,
var blockStrategy: Int,
Expand All @@ -15,5 +18,6 @@ data class TriggerParam(
var triggerTime: LocalDateTime,
var broadcastIndex: Int = 0,
var broadcastTotal: Int = 0,
var workerAddress: String? = null
var workerAddress: String? = null,
var updateTime: LocalDateTime,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ fun TJobInfo.convert(): JobInfo {
maxRetryCount = maxRetryCount,
triggerStatus = triggerStatus,
lastTriggerTime = lastTriggerTime,
nextTriggerTime = nextTriggerTime
nextTriggerTime = nextTriggerTime,
source = source,
image = image,
)
}

Expand All @@ -49,7 +51,9 @@ fun JobInfo.convert(): TJobInfo {
maxRetryCount = maxRetryCount,
triggerStatus = triggerStatus,
lastTriggerTime = lastTriggerTime,
nextTriggerTime = nextTriggerTime
nextTriggerTime = nextTriggerTime,
source = source,
image = image,
)
}

Expand All @@ -59,7 +63,7 @@ fun TWorkerGroup.convert(): WorkerGroup {
name = name,
discoveryType = discoveryType,
updateTime = updateTime,
registryList = addressList.split(",")
registryList = addressList.split(","),
)
}

Expand All @@ -69,7 +73,7 @@ fun WorkerGroup.convert(): TWorkerGroup {
name = name,
discoveryType = discoveryType,
updateTime = updateTime,
addressList = registryList.joinToString(",")
addressList = registryList.joinToString(","),
)
}

Expand All @@ -90,7 +94,7 @@ fun TJobLog.convert(): JobLog {
executionTime = executionTime,
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus
alarmStatus = alarmStatus,
)
}

Expand All @@ -111,7 +115,7 @@ fun JobLog.convert(): TJobLog {
executionTime = executionTime,
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus
alarmStatus = alarmStatus,
)
}

Expand All @@ -120,7 +124,7 @@ fun TWorker.convert(): WorkerInfo {
id = id.orEmpty(),
address = address,
group = group,
updateTime = updateTime
updateTime = updateTime,
)
}

Expand All @@ -129,6 +133,6 @@ fun WorkerInfo.convert(): TWorker {
id = id.orEmpty(),
address = address,
group = group,
updateTime = updateTime
updateTime = updateTime,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,12 @@ data class TJobInfo(
* 下次执行时间
*/
var nextTriggerTime: Long,
/**
* 资源内容,可以是脚本,也可以是yaml
* */
var source: String? = null,
/**
* 镜像地址
* */
var image: String? = null,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.devops.schedule.mongo.provider

import com.mongodb.MongoServerException
import com.mongodb.client.result.UpdateResult
import com.tencent.devops.schedule.mongo.model.TLockInfo
import com.tencent.devops.schedule.provider.LockProvider
Expand All @@ -17,7 +18,7 @@ import java.util.UUID
* 基于mongodb实现的lock
*/
class MongoLockProvider(
private val mongoTemplate: MongoTemplate
private val mongoTemplate: MongoTemplate,
) : LockProvider {
override fun acquire(key: String, expiration: Long): String? {
val query = Query.query(where(TLockInfo::id).isEqualTo(key))
Expand All @@ -29,38 +30,49 @@ class MongoLockProvider(

val options = FindAndModifyOptions().upsert(true)
.returnNew(true)
val lock = mongoTemplate.findAndModify(
query, update, options,
TLockInfo::class.java
)!!
val locked = lock.token == token
try {
val lock = mongoTemplate.findAndModify(
query,
update,
options,
TLockInfo::class.java,
)!!
val locked = lock.token == token

// 如果已过期
if (!locked && lock.expireAt < System.currentTimeMillis()) {
val deleted = mongoTemplate.remove(
Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(lock.token)
.and(TLockInfo::expireAt).`is`(lock.expireAt)
),
TLockInfo::class.java
)
if (deleted.deletedCount >= 1) {
// 成功释放锁, 再次尝试获取锁
return acquire(key, expiration)
// 如果已过期
if (!locked && lock.expireAt < System.currentTimeMillis()) {
val deleted = mongoTemplate.remove(
Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(lock.token)
.and(TLockInfo::expireAt).`is`(lock.expireAt),
),
TLockInfo::class.java,
)
if (deleted.deletedCount >= 1) {
// 成功释放锁, 再次尝试获取锁
return acquire(key, expiration)
}
}
return if (locked) {
logger.trace("Acquired lock for key {} with token {}", key, token)
return token
} else {
null
}
} catch (e: MongoServerException) {
if (e.code == 11000) { // duplicate key
return null
} else {
throw e
}
}

return if (locked) {
logger.trace("Acquired lock for key {} with token {}", key, token)
return token
} else null
}

override fun release(key: String, token: String): Boolean {
val query = Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(token)
.and(TLockInfo::token).isEqualTo(token),
)
val deleted = mongoTemplate.remove(query, TLockInfo::class.java)
val released = deleted.deletedCount == 1L
Expand All @@ -78,7 +90,7 @@ class MongoLockProvider(
override fun refresh(key: String, token: String, expiration: Long): Boolean {
val query = Query.query(
where(TLockInfo::id).isEqualTo(key)
.and(TLockInfo::token).isEqualTo(token)
.and(TLockInfo::token).isEqualTo(token),
)
val update = Update.update(TLockInfo::expireAt.name, System.currentTimeMillis() + expiration)
val updated: UpdateResult = mongoTemplate.updateFirst(query, update, TLockInfo::class.java)
Expand All @@ -90,16 +102,16 @@ class MongoLockProvider(
} else {
logger.warn(
"Refresh query did not affect any records for key {} with token {}. " +
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key, token
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key,
token,
)
}

return refreshed
}


companion object {
private val logger = LoggerFactory.getLogger(MongoLockProvider::class.java)

Expand Down
Loading
Loading