Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
[task level retry] add task object collection (#4936)
Browse files Browse the repository at this point in the history
* fix

fix

fix

fix

fix

* fix

* fix
  • Loading branch information
hzy46 authored Sep 27, 2020
1 parent 127bc2a commit a3be56f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 11 deletions.
46 changes: 46 additions & 0 deletions src/database-controller/sdk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@ class DatabaseModel {
},
);

class TaskHistory extends Model {}
TaskHistory.init(
{
insertedAt: Sequelize.DATE,
uid: {
type: Sequelize.STRING(36),
primaryKey: true,
},
frameworkName: {
type: Sequelize.STRING(64),
allowNull: false,
},
attemptIndex: Sequelize.INTEGER,
taskroleName: Sequelize.STRING(256),
taskName: Sequelize.STRING(256),
taskIndex: Sequelize.INTEGER,
taskUid: {
type: Sequelize.STRING(36),
allowNull: false,
},
taskAttemptIndex: Sequelize.INTEGER,
podUid: Sequelize.STRING(36),
historyType: {
type: Sequelize.STRING(16),
allowNull: false,
defaultValue: 'retry',
},
snapshot: Sequelize.TEXT,
},
{
sequelize,
modelName: 'task_history',
createdAt: 'insertedAt',
indexes: [
{
unique: false,
fields: ['frameworkName', 'attemptIndex'],
},
],
freezeTableName: true,
},
);

class Pod extends Model {}
Pod.init(
{
Expand Down Expand Up @@ -255,6 +298,7 @@ class DatabaseModel {
);

Framework.hasMany(FrameworkHistory);
Framework.hasMany(TaskHistory);
Framework.hasMany(Pod);
Framework.hasMany(FrameworkEvent);
Framework.hasMany(PodEvent);
Expand All @@ -281,6 +325,7 @@ class DatabaseModel {
this.sequelize = sequelize;
this.Framework = Framework;
this.FrameworkHistory = FrameworkHistory;
this.TaskHistory = TaskHistory;
this.Pod = Pod;
this.FrameworkEvent = FrameworkEvent;
this.PodEvent = PodEvent;
Expand All @@ -296,6 +341,7 @@ class DatabaseModel {
await Promise.all([
this.Framework.sync({ alter: true }),
this.FrameworkHistory.sync({ alter: true }),
this.TaskHistory.sync({ alter: true }),
this.Pod.sync({ alter: true }),
this.FrameworkEvent.sync({ alter: true }),
this.PodEvent.sync({ alter: true }),
Expand Down
18 changes: 11 additions & 7 deletions src/fluentd/deploy/fluentd-config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,18 @@ data:
enable_ruby
<record>
objectSnapshotTrigger "${\
if record['logMessage'].include? 'Framework Pod Deleted'
'pod_deleted'
elsif record['logMessage'].include? 'Framework will be retried'
'framework_retried'
elsif record['logMessage'].include? 'Task will be retried'
'task_retried'
if record['logMessage'].include? 'OnFrameworkRetry'
'OnFrameworkRetry'
elsif record['logMessage'].include? 'OnFrameworkDeletion'
'OnFrameworkDeletion'
elsif record['logMessage'].include? 'OnTaskRetry'
'OnTaskRetry'
elsif record['logMessage'].include? 'OnTaskDeletion'
'OnTaskDeletion'
elsif record['logMessage'].include? 'OnPodDeletion'
'OnPodDeletion'
else
'unknown'
'Unknown'
end}"
</record>
</filter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,41 @@ def insert_framework(hex_id, time, record)
end
end

def insert_task(hex_id, time, historyType, record)
# This function try to insert the task snapshot into task history table.
# In some cases, the framework controller may have duplicate logs about one task attempt,
# or there has been already successful inserted record before.
# To handle it, `insert_task` executes a "SELECT" first.
# If the uid exists in the table, it will ignore it safely.
# Any error should be raised.
thread = Thread.current
frameworkName = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_NAME"]
attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"]
taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"]
taskName = record["objectSnapshot"]["metadata"]["name"]
taskIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"]
taskUid = record["objectSnapshot"]["metadata"]["uid"]
taskAttemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"]
podUid = record["objectSnapshot"]["status"]["attemptStatus"]["podUID"]
snapshot = record_value(record["objectSnapshot"])
# use taskUid + taskAttemptIndex + historyType to generate a uid
uid = Digest::MD5.hexdigest "#{taskUid}+#{taskAttemptIndex}+#{historyType}"
# select from framework_history, ensure there is no corresponding history object
selectResult = thread[:conn].exec_params('SELECT uid from task_history where uid=$1', [uid])
if selectResult.cmd_tuples == 0
# if there is no existing records, try to insert a new one.
thread[:conn].exec_params("INSERT INTO task_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", " +
"\"#{@taskroleName_col}\", \"taskName\", \"taskIndex\", \"taskUid\", \"#{@taskAttemptIndex_col}\", \"podUid\", \"#{@historyType_col}\", \"#{@snapshot_col}\") " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
[time, time, uid, frameworkName, attemptIndex, taskroleName, taskName, taskIndex, taskUid, taskAttemptIndex, podUid, historyType, snapshot]
)
else
# if there is an existing record, ignore it.
log.warn "[pgjson] chunk #{hex_id}: ignored task object as it already exists, uid=#{uid}"
end
end


def insert_pod(hex_id, time, record)
# This function try to insert the pod snapshot into pods table.
# In some cases, the framework controller may have duplicate logs about one pod,
Expand All @@ -192,8 +227,6 @@ def insert_pod(hex_id, time, record)
# if there is an existing record, ignore it.
log.warn "[pgjson] chunk #{hex_id}: ignored pod snapshot object as it already exists, uid=#{uid}"
end


end

def write(chunk)
Expand All @@ -209,9 +242,13 @@ def write(chunk)
kind = record["objectSnapshot"]["kind"]
trigger = record['objectSnapshotTrigger']
log.info "[pgjson] object type #{kind} triggered by #{trigger} found in chunk #{hex_id}"
if trigger == "framework_retried" && kind == "Framework"
if trigger == "OnFrameworkRetry" && kind == "Framework"
insert_framework hex_id, time, record
elsif trigger == "pod_deleted" && kind == "Pod"
elsif trigger == "OnTaskRetry" && kind == "Task"
insert_task hex_id, time, "retry", record
elsif trigger == "OnTaskDeletion" && kind == "Task"
insert_task hex_id, time, "deletion", record
elsif trigger == "OnPodDeletion" && kind == "Pod"
insert_pod hex_id, time, record
else
logMessage = record['logMessage']
Expand Down

0 comments on commit a3be56f

Please sign in to comment.