This repository has been archived by the owner on Oct 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 61
/
event.proto
301 lines (225 loc) · 11.8 KB
/
event.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
syntax = "proto3";
package flyteidl.event;
option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event";
import "flyteidl/core/literals.proto";
import "flyteidl/core/compiler.proto";
import "flyteidl/core/execution.proto";
import "flyteidl/core/identifier.proto";
import "flyteidl/core/catalog.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
message WorkflowExecutionEvent {
// Workflow execution id
core.WorkflowExecutionIdentifier execution_id = 1;
// the id of the originator (Propeller) of the event
string producer_id = 2;
core.WorkflowExecution.Phase phase = 3;
// This timestamp represents when the original event occurred, it is generated
// by the executor of the workflow.
google.protobuf.Timestamp occurred_at = 4;
oneof output_result {
// URL to the output of the execution, it encodes all the information
// including Cloud source provider. ie., s3://...
string output_uri = 5;
// Error information for the execution
core.ExecutionError error = 6;
// Raw output data produced by this workflow execution.
core.LiteralMap output_data = 7;
}
}
message NodeExecutionEvent {
// Unique identifier for this node execution
core.NodeExecutionIdentifier id = 1;
// the id of the originator (Propeller) of the event
string producer_id = 2;
core.NodeExecution.Phase phase = 3;
// This timestamp represents when the original event occurred, it is generated
// by the executor of the node.
google.protobuf.Timestamp occurred_at = 4;
oneof input_value {
string input_uri = 5;
// Raw input data consumed by this node execution.
core.LiteralMap input_data = 20;
}
oneof output_result {
// URL to the output of the execution, it encodes all the information
// including Cloud source provider. ie., s3://...
string output_uri = 6;
// Error information for the execution
core.ExecutionError error = 7;
// Raw output data produced by this node execution.
core.LiteralMap output_data = 15;
}
// Additional metadata to do with this event's node target based
// on the node type
oneof target_metadata {
WorkflowNodeMetadata workflow_node_metadata = 8;
TaskNodeMetadata task_node_metadata = 14;
}
// [To be deprecated] Specifies which task (if any) launched this node.
ParentTaskExecutionMetadata parent_task_metadata = 9;
// Specifies the parent node of the current node execution. Node executions at level zero will not have a parent node.
ParentNodeExecutionMetadata parent_node_metadata = 10;
// Retry group to indicate grouping of nodes by retries
string retry_group = 11;
// Identifier of the node in the original workflow/graph
// This maps to value of WorkflowTemplate.nodes[X].id
string spec_node_id = 12;
// Friendly readable name for the node
string node_name = 13;
int32 event_version = 16;
// Whether this node launched a subworkflow.
bool is_parent = 17;
// Whether this node yielded a dynamic workflow.
bool is_dynamic = 18;
// String location uniquely identifying where the deck HTML file is
// NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar)
string deck_uri = 19;
// This timestamp represents the instant when the event was reported by the executing framework. For example,
// when first processing a node the `occurred_at` timestamp should be the instant propeller makes progress, so when
// literal inputs are initially copied. The event however will not be sent until after the copy completes.
// Extracting both of these timestamps facilitates a more accurate portrayal of the evaluation time-series.
google.protobuf.Timestamp reported_at = 21;
}
// For Workflow Nodes we need to send information about the workflow that's launched
message WorkflowNodeMetadata {
core.WorkflowExecutionIdentifier execution_id = 1;
}
message TaskNodeMetadata {
// Captures the status of caching for this execution.
core.CatalogCacheStatus cache_status = 1;
// This structure carries the catalog artifact information
core.CatalogMetadata catalog_key = 2;
// Captures the status of cache reservations for this execution.
core.CatalogReservation.Status reservation_status = 3;
// The latest checkpoint location
string checkpoint_uri = 4;
// In the case this task launched a dynamic workflow we capture its structure here.
DynamicWorkflowNodeMetadata dynamic_workflow = 16;
}
// For dynamic workflow nodes we send information about the dynamic workflow definition that gets generated.
message DynamicWorkflowNodeMetadata {
// id represents the unique identifier of the workflow.
core.Identifier id = 1;
// Represents the compiled representation of the embedded dynamic workflow.
core.CompiledWorkflowClosure compiled_workflow = 2;
// dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is
// required to correctly recover partially completed executions where the workflow has already been compiled.
string dynamic_job_spec_uri = 3;
}
message ParentTaskExecutionMetadata {
core.TaskExecutionIdentifier id = 1;
}
message ParentNodeExecutionMetadata {
// Unique identifier of the parent node id within the execution
// This is value of core.NodeExecutionIdentifier.node_id of the parent node
string node_id = 1;
}
// Plugin specific execution event information. For tasks like Python, Hive, Spark, DynamicJob.
message TaskExecutionEvent {
// ID of the task. In combination with the retryAttempt this will indicate
// the task execution uniquely for a given parent node execution.
core.Identifier task_id = 1;
// A task execution is always kicked off by a node execution, the event consumer
// will use the parent_id to relate the task to it's parent node execution
core.NodeExecutionIdentifier parent_node_execution_id = 2;
// retry attempt number for this task, ie., 2 for the second attempt
uint32 retry_attempt = 3;
// Phase associated with the event
core.TaskExecution.Phase phase = 4;
// id of the process that sent this event, mainly for trace debugging
string producer_id = 5;
// log information for the task execution
repeated core.TaskLog logs = 6;
// This timestamp represents when the original event occurred, it is generated
// by the executor of the task.
google.protobuf.Timestamp occurred_at = 7;
oneof input_value {
// URI of the input file, it encodes all the information
// including Cloud source provider. ie., s3://...
string input_uri = 8;
// Raw input data consumed by this task execution.
core.LiteralMap input_data = 19;
}
oneof output_result {
// URI to the output of the execution, it will be in a format that encodes all the information
// including Cloud source provider. ie., s3://...
string output_uri = 9;
// Error information for the execution
core.ExecutionError error = 10;
// Raw output data produced by this task execution.
core.LiteralMap output_data = 17;
}
// Custom data that the task plugin sends back. This is extensible to allow various plugins in the system.
google.protobuf.Struct custom_info = 11;
// Some phases, like RUNNING, can send multiple events with changed metadata (new logs, additional custom_info, etc)
// that should be recorded regardless of the lack of phase change.
// The version field should be incremented when metadata changes across the duration of an individual phase.
uint32 phase_version = 12;
// An optional explanation for the phase transition.
string reason = 13;
// A predefined yet extensible Task type identifier. If the task definition is already registered in flyte admin
// this type will be identical, but not all task executions necessarily use pre-registered definitions and this
// type is useful to render the task in the UI, filter task executions, etc.
string task_type = 14;
// Metadata around how a task was executed.
TaskExecutionMetadata metadata = 16;
// The event version is used to indicate versioned changes in how data is reported using this
// proto message. For example, event_verison > 0 means that maps tasks report logs using the
// TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog
// in this message.
int32 event_version = 18;
// This timestamp represents the instant when the event was reported by the executing framework. For example, a k8s
// pod task may be marked completed at (ie. `occurred_at`) the instant the container running user code completes,
// but this event will not be reported until the pod is marked as completed. Extracting both of these timestamps
// facilitates a more accurate portrayal of the evaluation time-series.
google.protobuf.Timestamp reported_at = 20;
}
// This message contains metadata about external resources produced or used by a specific task execution.
message ExternalResourceInfo {
// Identifier for an external resource created by this task execution, for example Qubole query ID or presto query ids.
string external_id = 1;
// A unique index for the external resource with respect to all external resources for this task. Although the
// identifier may change between task reporting events or retries, this will remain the same to enable aggregating
// information from multiple reports.
uint32 index = 2;
// Retry attempt number for this external resource, ie., 2 for the second attempt
uint32 retry_attempt = 3;
// Phase associated with the external resource
core.TaskExecution.Phase phase = 4;
// Captures the status of caching for this external resource execution.
core.CatalogCacheStatus cache_status = 5;
// log information for the external resource execution
repeated core.TaskLog logs = 6;
}
// This message holds task execution metadata specific to resource allocation used to manage concurrent
// executions for a project namespace.
message ResourcePoolInfo {
// Unique resource ID used to identify this execution when allocating a token.
string allocation_token = 1;
// Namespace under which this task execution requested an allocation token.
string namespace = 2;
}
// Holds metadata around how a task was executed.
// As a task transitions across event phases during execution some attributes, such its generated name, generated external resources,
// and more may grow in size but not change necessarily based on the phase transition that sparked the event update.
// Metadata is a container for these attributes across the task execution lifecycle.
message TaskExecutionMetadata {
// Unique, generated name for this task execution used by the backend.
string generated_name = 1;
// Additional data on external resources on other back-ends or platforms (e.g. Hive, Qubole, etc) launched by this task execution.
repeated ExternalResourceInfo external_resources = 2;
// Includes additional data on concurrent resource management used during execution..
// This is a repeated field because a plugin can request multiple resource allocations during execution.
repeated ResourcePoolInfo resource_pool_info = 3;
// The identifier of the plugin used to execute this task.
string plugin_identifier = 4;
// Includes the broad category of machine used for this specific task execution.
enum InstanceClass {
// The default instance class configured for the flyte application platform.
DEFAULT = 0;
// The instance class configured for interruptible tasks.
INTERRUPTIBLE = 1;
}
InstanceClass instance_class = 16;
}