diff --git a/pkg/api/function/api.go b/pkg/api/function/api.go index 06a74bb7..2c6f0331 100644 --- a/pkg/api/function/api.go +++ b/pkg/api/function/api.go @@ -7,6 +7,7 @@ import ( "github.com/fission/fission-workflows/pkg/types/events" "github.com/gogo/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/sirupsen/logrus" ) // Api that servers mainly as a function.Runtime wrapper that deals with the higher-level logic workflow-related logic. @@ -23,6 +24,7 @@ func NewApi(runtime map[string]Runtime, esClient fes.EventStore) *Api { } func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*types.TaskInvocation, error) { + aggregate := aggregates.NewWorkflowInvocationAggregate(invocationId) id := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?) fn := &types.TaskInvocation{ Metadata: &types.ObjectMetadata{ @@ -37,9 +39,9 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ return nil, err } - err = ap.es.HandleEvent(&fes.Event{ + err = ap.es.Append(&fes.Event{ Type: events.Function_TASK_STARTED.String(), - Parent: aggregates.NewWorkflowInvocationAggregate(invocationId), + Parent: aggregate, Aggregate: aggregates.NewTaskInvocationAggregate(id), Timestamp: ptypes.TimestampNow(), Data: fnAny, @@ -50,10 +52,11 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ fnResult, err := ap.runtime[spec.Type.Runtime].Invoke(spec) if err != nil { - // TODO record error message - esErr := ap.es.HandleEvent(&fes.Event{ + // TODO improve error handling here (retries? internal or task related error?) + logrus.WithField("task", invocationId).Infof("Task failed: %v", err) + esErr := ap.es.Append(&fes.Event{ Type: events.Function_TASK_FAILED.String(), - Parent: aggregates.NewWorkflowInvocationAggregate(invocationId), + Parent: aggregate, Aggregate: aggregates.NewTaskInvocationAggregate(id), Timestamp: ptypes.TimestampNow(), Data: fnAny, @@ -70,17 +73,17 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ } if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED { - err = ap.es.HandleEvent(&fes.Event{ + err = ap.es.Append(&fes.Event{ Type: events.Function_TASK_SUCCEEDED.String(), - Parent: aggregates.NewWorkflowInvocationAggregate(invocationId), + Parent: aggregate, Aggregate: aggregates.NewTaskInvocationAggregate(id), Timestamp: ptypes.TimestampNow(), Data: fnStatusAny, }) } else { - err = ap.es.HandleEvent(&fes.Event{ + err = ap.es.Append(&fes.Event{ Type: events.Function_TASK_FAILED.String(), - Parent: aggregates.NewWorkflowInvocationAggregate(invocationId), + Parent: aggregate, Aggregate: aggregates.NewTaskInvocationAggregate(id), Timestamp: ptypes.TimestampNow(), Data: fnStatusAny, @@ -95,7 +98,7 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ } func (ap *Api) Fail(invocationId string, taskId string) error { - return ap.es.HandleEvent(&fes.Event{ + return ap.es.Append(&fes.Event{ Type: events.Function_TASK_FAILED.String(), Parent: aggregates.NewWorkflowInvocationAggregate(invocationId), Aggregate: aggregates.NewTaskInvocationAggregate(taskId), diff --git a/pkg/api/invocation/api.go b/pkg/api/invocation/api.go index e2d1e38d..7b898f67 100644 --- a/pkg/api/invocation/api.go +++ b/pkg/api/invocation/api.go @@ -34,7 +34,7 @@ func (ia *Api) Invoke(invocation *types.WorkflowInvocationSpec) (string, error) return "", err } - err = ia.es.HandleEvent(&fes.Event{ + err = ia.es.Append(&fes.Event{ Type: events.Invocation_INVOCATION_CREATED.String(), Aggregate: aggregates.NewWorkflowInvocationAggregate(id), Timestamp: ptypes.TimestampNow(), @@ -56,8 +56,9 @@ func (ia *Api) Cancel(invocationId string) error { Type: events.Invocation_INVOCATION_CANCELED.String(), Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationId), Timestamp: ptypes.TimestampNow(), + Hints: &fes.EventHints{Completed: true}, } - err := ia.es.HandleEvent(event) + err := ia.es.Append(event) if err != nil { return err } @@ -78,14 +79,13 @@ func (ia *Api) MarkCompleted(invocationId string, output *types.TypedValue) erro return err } - event := &fes.Event{ + err = ia.es.Append(&fes.Event{ Type: events.Invocation_INVOCATION_COMPLETED.String(), Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationId), Timestamp: ptypes.TimestampNow(), Data: data, - } - - err = ia.es.HandleEvent(event) + Hints: &fes.EventHints{Completed: true}, + }) if err != nil { return err } diff --git a/pkg/api/workflow/api.go b/pkg/api/workflow/api.go index ed60a9ec..a9a9cb67 100644 --- a/pkg/api/workflow/api.go +++ b/pkg/api/workflow/api.go @@ -34,7 +34,7 @@ func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) { return "", err } - err = wa.es.HandleEvent(&fes.Event{ + err = wa.es.Append(&fes.Event{ Type: events.Workflow_WORKFLOW_CREATED.String(), Aggregate: aggregates.NewWorkflowAggregate(id), Timestamp: ptypes.TimestampNow(), @@ -48,10 +48,11 @@ func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) { } func (wa *Api) Delete(id string) error { - return wa.es.HandleEvent(&fes.Event{ + return wa.es.Append(&fes.Event{ Type: events.Workflow_WORKFLOW_DELETED.String(), Aggregate: aggregates.NewWorkflowAggregate(id), Timestamp: ptypes.TimestampNow(), + Hints: &fes.EventHints{Completed: true}, }) } @@ -66,7 +67,7 @@ func (wa *Api) Parse(workflow *types.Workflow) (*types.WorkflowStatus, error) { return nil, err } - err = wa.es.HandleEvent(&fes.Event{ + err = wa.es.Append(&fes.Event{ Type: events.Workflow_WORKFLOW_PARSED.String(), Aggregate: aggregates.NewWorkflowAggregate(workflow.Metadata.Id), Timestamp: ptypes.TimestampNow(), diff --git a/pkg/apiserver/admin.go b/pkg/apiserver/admin.go index 2a7598c8..0d98b824 100644 --- a/pkg/apiserver/admin.go +++ b/pkg/apiserver/admin.go @@ -1,6 +1,7 @@ package apiserver import ( + "github.com/fission/fission-workflows/pkg/version" "github.com/golang/protobuf/ptypes/empty" "golang.org/x/net/context" ) @@ -14,3 +15,10 @@ func (as *GrpcAdminApiServer) Status(ctx context.Context, _ *empty.Empty) (*Heal Status: "OK!", }, nil } + +func (as *GrpcAdminApiServer) Version(ctx context.Context, _ *empty.Empty) (*VersionResp, error) { + + return &VersionResp{ + Version: version.VERSION, + }, nil +} diff --git a/pkg/apiserver/apiserver.pb.go b/pkg/apiserver/apiserver.pb.go index bd0a1094..8a00b555 100644 --- a/pkg/apiserver/apiserver.pb.go +++ b/pkg/apiserver/apiserver.pb.go @@ -16,6 +16,7 @@ It has these top-level messages: WorkflowInvocationList WorkflowInvocationGetReponse Health + VersionResp */ package apiserver @@ -154,6 +155,22 @@ func (m *Health) GetStatus() string { return "" } +type VersionResp struct { + Version string `protobuf:"bytes,1,opt,name=version" json:"version,omitempty"` +} + +func (m *VersionResp) Reset() { *m = VersionResp{} } +func (m *VersionResp) String() string { return proto.CompactTextString(m) } +func (*VersionResp) ProtoMessage() {} +func (*VersionResp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *VersionResp) GetVersion() string { + if m != nil { + return m.Version + } + return "" +} + func init() { proto.RegisterType((*WorkflowIdentifier)(nil), "apiserver.WorkflowIdentifier") proto.RegisterType((*SearchWorkflowRequest)(nil), "apiserver.SearchWorkflowRequest") @@ -163,6 +180,7 @@ func init() { proto.RegisterType((*WorkflowInvocationList)(nil), "apiserver.WorkflowInvocationList") proto.RegisterType((*WorkflowInvocationGetReponse)(nil), "apiserver.WorkflowInvocationGetReponse") proto.RegisterType((*Health)(nil), "apiserver.Health") + proto.RegisterType((*VersionResp)(nil), "apiserver.VersionResp") } // Reference imports to suppress errors if they are not otherwise used. @@ -560,6 +578,7 @@ var _WorkflowInvocationAPI_serviceDesc = grpc.ServiceDesc{ type AdminAPIClient interface { Status(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*Health, error) + Version(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*VersionResp, error) } type adminAPIClient struct { @@ -579,10 +598,20 @@ func (c *adminAPIClient) Status(ctx context.Context, in *google_protobuf1.Empty, return out, nil } +func (c *adminAPIClient) Version(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*VersionResp, error) { + out := new(VersionResp) + err := grpc.Invoke(ctx, "/apiserver.AdminAPI/Version", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for AdminAPI service type AdminAPIServer interface { Status(context.Context, *google_protobuf1.Empty) (*Health, error) + Version(context.Context, *google_protobuf1.Empty) (*VersionResp, error) } func RegisterAdminAPIServer(s *grpc.Server, srv AdminAPIServer) { @@ -607,6 +636,24 @@ func _AdminAPI_Status_Handler(srv interface{}, ctx context.Context, dec func(int return interceptor(ctx, in, info, handler) } +func _AdminAPI_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf1.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AdminAPIServer).Version(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/apiserver.AdminAPI/Version", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AdminAPIServer).Version(ctx, req.(*google_protobuf1.Empty)) + } + return interceptor(ctx, in, info, handler) +} + var _AdminAPI_serviceDesc = grpc.ServiceDesc{ ServiceName: "apiserver.AdminAPI", HandlerType: (*AdminAPIServer)(nil), @@ -615,6 +662,10 @@ var _AdminAPI_serviceDesc = grpc.ServiceDesc{ MethodName: "Status", Handler: _AdminAPI_Status_Handler, }, + { + MethodName: "Version", + Handler: _AdminAPI_Version_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/apiserver/apiserver.proto", @@ -623,42 +674,45 @@ var _AdminAPI_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("pkg/apiserver/apiserver.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 589 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xdf, 0x6a, 0x13, 0x41, - 0x14, 0xc6, 0xd9, 0xb4, 0x6c, 0xbb, 0x27, 0xb4, 0xb6, 0xa7, 0x36, 0x09, 0xb1, 0x95, 0x38, 0x08, - 0xd6, 0x80, 0x3b, 0xd0, 0x82, 0x17, 0xb9, 0x2b, 0x55, 0x6a, 0xa1, 0x17, 0x31, 0x91, 0x0a, 0x82, - 0x17, 0x9b, 0xdd, 0x49, 0x32, 0x26, 0xd9, 0x59, 0x77, 0x66, 0x53, 0x82, 0x78, 0x23, 0xbe, 0x81, - 0xaf, 0xe5, 0x9d, 0xaf, 0xe0, 0x83, 0x48, 0x66, 0xff, 0x45, 0x77, 0x53, 0xea, 0x4d, 0xb2, 0x73, - 0x66, 0xe6, 0x77, 0xbe, 0x73, 0xbe, 0x33, 0x70, 0x1c, 0x4c, 0x46, 0xd4, 0x09, 0xb8, 0x64, 0xe1, - 0x9c, 0x85, 0xf9, 0x97, 0x1d, 0x84, 0x42, 0x09, 0xb4, 0xb2, 0x40, 0xb3, 0x33, 0xe2, 0x6a, 0x1c, - 0x0d, 0x6c, 0x57, 0xcc, 0xe8, 0x90, 0x4b, 0xc9, 0x85, 0x9f, 0xfe, 0xbf, 0xb8, 0x15, 0xe1, 0x64, - 0x38, 0x15, 0xb7, 0x92, 0x2e, 0x71, 0x6a, 0x11, 0x30, 0x19, 0xff, 0xc6, 0x98, 0xe6, 0xa3, 0x91, - 0x10, 0xa3, 0x29, 0xa3, 0x7a, 0x35, 0x88, 0x86, 0x94, 0xcd, 0x02, 0xb5, 0x48, 0x36, 0x8f, 0x92, - 0x4d, 0x27, 0xe0, 0xd4, 0xf1, 0x7d, 0xa1, 0x1c, 0xc5, 0x85, 0x9f, 0x5c, 0x25, 0x4f, 0x01, 0xdf, - 0x27, 0xf4, 0x2b, 0x8f, 0xf9, 0x8a, 0x0f, 0x39, 0x0b, 0x71, 0x17, 0x2a, 0xdc, 0x6b, 0x18, 0x2d, - 0xe3, 0xc4, 0xea, 0x55, 0xb8, 0x47, 0xea, 0x70, 0xd8, 0x67, 0x4e, 0xe8, 0x8e, 0xd3, 0xb3, 0x3d, - 0xf6, 0x39, 0x62, 0x52, 0x91, 0x97, 0x50, 0xfb, 0x77, 0x43, 0x06, 0xc2, 0x97, 0x0c, 0x8f, 0xc0, - 0xca, 0x64, 0x37, 0x8c, 0xd6, 0xc6, 0x89, 0xd5, 0xcb, 0x03, 0xc4, 0x86, 0xa3, 0x2c, 0xad, 0x3f, - 0x17, 0xae, 0xd6, 0x74, 0x87, 0x80, 0x33, 0xa8, 0x17, 0xcf, 0xbf, 0x8d, 0x58, 0xb8, 0xc0, 0x06, - 0x6c, 0xc9, 0x68, 0xf0, 0x89, 0xb9, 0x2a, 0x39, 0x9f, 0x2e, 0x49, 0x07, 0x6a, 0xc5, 0x4b, 0xd7, - 0x5c, 0x2a, 0x6c, 0x41, 0x95, 0x67, 0x91, 0x54, 0xde, 0x6a, 0x88, 0x3c, 0x2e, 0x13, 0x78, 0xc9, - 0x54, 0x8f, 0xe9, 0xf2, 0x48, 0x0b, 0xcc, 0x37, 0xcc, 0x99, 0xaa, 0x31, 0xd6, 0xc0, 0x94, 0xca, - 0x51, 0x91, 0x4c, 0xd2, 0x27, 0xab, 0xd3, 0x9f, 0x15, 0xa8, 0xa6, 0x88, 0xf3, 0xee, 0x15, 0x5e, - 0x83, 0x79, 0x11, 0x32, 0x47, 0x31, 0xdc, 0xb1, 0xd3, 0x78, 0x3f, 0x60, 0x6e, 0xf3, 0xd8, 0xce, - 0xc7, 0xa2, 0xe8, 0x05, 0x79, 0xf8, 0xed, 0xd7, 0xef, 0x1f, 0x95, 0x5d, 0x62, 0xd1, 0xb4, 0x7d, - 0x1d, 0xa3, 0x8d, 0xef, 0x60, 0x53, 0x57, 0x52, 0xb3, 0x63, 0x7b, 0xed, 0xd4, 0x7b, 0xfb, 0xf5, - 0xd2, 0xfb, 0xe6, 0x93, 0x15, 0x68, 0xb9, 0x43, 0x64, 0x5f, 0x83, 0xab, 0x98, 0x83, 0xf1, 0x12, - 0x36, 0x2e, 0x99, 0xc2, 0xbb, 0x15, 0x35, 0xad, 0x2c, 0x48, 0x6a, 0x9a, 0xb1, 0x87, 0xbb, 0x19, - 0x83, 0x7e, 0xe1, 0xde, 0x57, 0xec, 0xc2, 0xf6, 0x8d, 0x33, 0xe5, 0x5e, 0x49, 0xb9, 0x6b, 0x14, - 0x93, 0x63, 0x8d, 0xaa, 0x13, 0xcc, 0x51, 0xf3, 0x04, 0xd1, 0x31, 0xda, 0xa7, 0xdf, 0x37, 0xe1, - 0xb0, 0xe8, 0xc8, 0xb2, 0xb1, 0x03, 0x30, 0x97, 0x81, 0x09, 0xc3, 0xba, 0x5d, 0x3c, 0xa1, 0x73, - 0x3e, 0x2b, 0x2b, 0xa8, 0x64, 0xee, 0xd2, 0x7a, 0x48, 0x95, 0xe6, 0xc3, 0xb0, 0x6c, 0xf7, 0x04, - 0x20, 0xce, 0xd1, 0x5f, 0xf8, 0xee, 0xfa, 0x3c, 0x07, 0x25, 0x1b, 0x84, 0x6a, 0xe6, 0x73, 0xb2, - 0xb7, 0xc2, 0xa4, 0x72, 0xe1, 0xbb, 0x1d, 0xa3, 0xfd, 0x01, 0xb1, 0x10, 0x46, 0x17, 0xcc, 0x0b, - 0xc7, 0x77, 0xd9, 0x14, 0xef, 0xab, 0x7b, 0x6d, 0x53, 0x1b, 0x3a, 0x37, 0xb6, 0xff, 0x4a, 0xa2, - 0x1d, 0xba, 0xf9, 0x8f, 0x01, 0x2a, 0x7f, 0x45, 0xe4, 0x40, 0xc3, 0x77, 0x70, 0xb5, 0x59, 0xf8, - 0x31, 0x1e, 0xa1, 0x7b, 0x2b, 0x2f, 0x6d, 0x59, 0x22, 0x1b, 0x0b, 0xb2, 0x4f, 0xbb, 0xb0, 0x7d, - 0xee, 0xcd, 0xb8, 0x36, 0xfe, 0x15, 0x98, 0x7d, 0xfd, 0xd6, 0xd6, 0x16, 0xb1, 0xbf, 0xa2, 0x22, - 0x7e, 0xae, 0xe4, 0x81, 0x46, 0x5b, 0xb8, 0x45, 0xe3, 0x77, 0x3a, 0x30, 0xf5, 0x9d, 0xb3, 0x3f, - 0x01, 0x00, 0x00, 0xff, 0xff, 0x40, 0x18, 0xbe, 0xdc, 0xab, 0x05, 0x00, 0x00, + // 631 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xcf, 0x6e, 0xd3, 0x4e, + 0x10, 0xc7, 0xe5, 0xb4, 0x72, 0xeb, 0x89, 0xda, 0x5f, 0x3b, 0xfd, 0x35, 0x89, 0x42, 0x8b, 0xc2, + 0x0a, 0xa9, 0x25, 0x12, 0x5e, 0xa9, 0x95, 0x38, 0xe4, 0x56, 0x15, 0x54, 0x2a, 0x7a, 0x28, 0x09, + 0x2a, 0x12, 0x12, 0x07, 0xc7, 0xd9, 0xa4, 0x4b, 0x52, 0xaf, 0xf1, 0x6e, 0x52, 0x45, 0x88, 0x0b, + 0xe2, 0x0d, 0x38, 0xf3, 0x46, 0xdc, 0x78, 0x05, 0x1e, 0x04, 0x79, 0xbd, 0x8e, 0x0d, 0x76, 0xaa, + 0x72, 0x49, 0x3c, 0xb3, 0xbb, 0x9f, 0xf9, 0xce, 0x3f, 0xd8, 0x0f, 0xc7, 0x23, 0xea, 0x85, 0x5c, + 0xb2, 0x68, 0xc6, 0xa2, 0xec, 0xcb, 0x0d, 0x23, 0xa1, 0x04, 0x3a, 0x0b, 0x47, 0xb3, 0x33, 0xe2, + 0xea, 0x7a, 0xda, 0x77, 0x7d, 0x71, 0x43, 0x87, 0x5c, 0x4a, 0x2e, 0x82, 0xf4, 0xff, 0xe9, 0xad, + 0x88, 0xc6, 0xc3, 0x89, 0xb8, 0x95, 0x34, 0xc6, 0xa9, 0x79, 0xc8, 0x64, 0xf2, 0x9b, 0x60, 0x9a, + 0x0f, 0x46, 0x42, 0x8c, 0x26, 0x8c, 0x6a, 0xab, 0x3f, 0x1d, 0x52, 0x76, 0x13, 0xaa, 0xb9, 0x39, + 0xdc, 0x33, 0x87, 0x5e, 0xc8, 0xa9, 0x17, 0x04, 0x42, 0x79, 0x8a, 0x8b, 0xc0, 0x3c, 0x25, 0x8f, + 0x01, 0xdf, 0x1a, 0xfa, 0xf9, 0x80, 0x05, 0x8a, 0x0f, 0x39, 0x8b, 0x70, 0x13, 0x2a, 0x7c, 0xd0, + 0xb0, 0x5a, 0xd6, 0xa1, 0xd3, 0xad, 0xf0, 0x01, 0xa9, 0xc3, 0x6e, 0x8f, 0x79, 0x91, 0x7f, 0x9d, + 0xde, 0xed, 0xb2, 0x8f, 0x53, 0x26, 0x15, 0x79, 0x06, 0xb5, 0xbf, 0x0f, 0x64, 0x28, 0x02, 0xc9, + 0x70, 0x0f, 0x9c, 0x85, 0xec, 0x86, 0xd5, 0x5a, 0x39, 0x74, 0xba, 0x99, 0x83, 0xb8, 0xb0, 0xb7, + 0x08, 0x1b, 0xcc, 0x84, 0xaf, 0x35, 0xdd, 0x21, 0xe0, 0x18, 0xea, 0xc5, 0xfb, 0xaf, 0xa7, 0x2c, + 0x9a, 0x63, 0x03, 0xd6, 0xe4, 0xb4, 0xff, 0x81, 0xf9, 0xca, 0xdc, 0x4f, 0x4d, 0xd2, 0x81, 0x5a, + 0xf1, 0xd1, 0x05, 0x97, 0x0a, 0x5b, 0x50, 0xe5, 0x0b, 0x4f, 0x2a, 0x2f, 0xef, 0x22, 0x0f, 0xcb, + 0x04, 0x9e, 0x31, 0xd5, 0x65, 0x3a, 0x3d, 0xd2, 0x02, 0xfb, 0x25, 0xf3, 0x26, 0xea, 0x1a, 0x6b, + 0x60, 0x4b, 0xe5, 0xa9, 0xa9, 0x34, 0xe1, 0x8d, 0x45, 0x0e, 0xa0, 0x7a, 0xc5, 0xa2, 0xb8, 0x7f, + 0x71, 0x4d, 0x62, 0x99, 0xb3, 0xc4, 0x4c, 0x65, 0x1a, 0xf3, 0xe8, 0x47, 0x05, 0xaa, 0x69, 0xac, + 0x93, 0xcb, 0x73, 0xbc, 0x00, 0xfb, 0x34, 0x62, 0x9e, 0x62, 0xb8, 0xe1, 0xa6, 0xfe, 0x5e, 0xc8, + 0xfc, 0xe6, 0xbe, 0x9b, 0xcd, 0x4f, 0xb1, 0x69, 0xe4, 0xff, 0x2f, 0x3f, 0x7f, 0x7d, 0xab, 0x6c, + 0x12, 0x87, 0xa6, 0x75, 0xee, 0x58, 0x6d, 0x7c, 0x03, 0xab, 0x3a, 0xe5, 0x9a, 0x9b, 0xcc, 0x81, + 0x9b, 0x0e, 0x89, 0xfb, 0x22, 0x1e, 0x92, 0xe6, 0xa3, 0x1c, 0xb4, 0xbc, 0x95, 0x64, 0x5b, 0x83, + 0xab, 0x98, 0x81, 0xf1, 0x0c, 0x56, 0xce, 0x98, 0xc2, 0xbb, 0x15, 0x35, 0x9d, 0x85, 0x93, 0xd4, + 0x34, 0x63, 0x0b, 0x37, 0x17, 0x0c, 0xfa, 0x89, 0x0f, 0x3e, 0xe3, 0x25, 0xac, 0x5f, 0x79, 0x13, + 0x3e, 0x28, 0x49, 0x77, 0x89, 0x62, 0xb2, 0xaf, 0x51, 0x75, 0x82, 0x19, 0x6a, 0x66, 0x10, 0x1d, + 0xab, 0x7d, 0xf4, 0x75, 0x15, 0x76, 0x8b, 0xad, 0x8b, 0x0b, 0xdb, 0x07, 0x3b, 0x76, 0x8c, 0x19, + 0xd6, 0xdd, 0xe2, 0x0d, 0x1d, 0xf3, 0xa0, 0x2c, 0xa1, 0x92, 0x01, 0x4d, 0xf3, 0x21, 0x55, 0x9a, + 0x4d, 0x4d, 0x5c, 0xee, 0x31, 0x40, 0x12, 0xa3, 0x37, 0x0f, 0xfc, 0xe5, 0x71, 0x76, 0x4a, 0x0e, + 0x08, 0xd5, 0xcc, 0x27, 0x64, 0x2b, 0xc7, 0xa4, 0x72, 0x1e, 0xf8, 0x1d, 0xab, 0xfd, 0x0e, 0xb1, + 0xe0, 0x46, 0x1f, 0xec, 0x53, 0x2f, 0xf0, 0xd9, 0x04, 0xef, 0xab, 0x7b, 0x69, 0x51, 0x1b, 0x3a, + 0x36, 0xb6, 0xff, 0x08, 0xa2, 0x3b, 0x74, 0xf5, 0x0f, 0x03, 0x54, 0xbe, 0x6e, 0x64, 0x47, 0xc3, + 0x37, 0x30, 0x5f, 0x2c, 0x7c, 0x9f, 0x8c, 0xd0, 0xbd, 0x95, 0x97, 0x96, 0xcc, 0xc8, 0xc6, 0x82, + 0xec, 0xa3, 0xef, 0x16, 0xac, 0x9f, 0x0c, 0x6e, 0xb8, 0xee, 0xfc, 0x73, 0xb0, 0x7b, 0x7a, 0x2b, + 0x97, 0x66, 0xb1, 0x9d, 0x93, 0x91, 0x2c, 0x36, 0xf9, 0x4f, 0xb3, 0x1d, 0x5c, 0xa3, 0xc9, 0x46, + 0xe3, 0x2b, 0x58, 0x33, 0x1b, 0xbd, 0x14, 0x53, 0xcb, 0x61, 0x72, 0xdb, 0x4f, 0xb6, 0x34, 0x0b, + 0x70, 0x9d, 0x9a, 0xad, 0xef, 0xdb, 0xfa, 0xe5, 0xf1, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdb, + 0x64, 0x77, 0x46, 0x22, 0x06, 0x00, 0x00, } diff --git a/pkg/apiserver/apiserver.pb.gw.go b/pkg/apiserver/apiserver.pb.gw.go index a03364ad..5f7a4aa2 100644 --- a/pkg/apiserver/apiserver.pb.gw.go +++ b/pkg/apiserver/apiserver.pb.gw.go @@ -207,6 +207,15 @@ func request_AdminAPI_Status_0(ctx context.Context, marshaler runtime.Marshaler, } +func request_AdminAPI_Version_0(ctx context.Context, marshaler runtime.Marshaler, client AdminAPIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq empty.Empty + var metadata runtime.ServerMetadata + + msg, err := client.Version(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + // RegisterWorkflowAPIHandlerFromEndpoint is same as RegisterWorkflowAPIHandler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterWorkflowAPIHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { @@ -235,10 +244,18 @@ func RegisterWorkflowAPIHandlerFromEndpoint(ctx context.Context, mux *runtime.Se // RegisterWorkflowAPIHandler registers the http handlers for service WorkflowAPI to "mux". // The handlers forward requests to the grpc endpoint over "conn". func RegisterWorkflowAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { - client := NewWorkflowAPIClient(conn) + return RegisterWorkflowAPIHandlerClient(ctx, mux, NewWorkflowAPIClient(conn)) +} + +// RegisterWorkflowAPIHandler registers the http handlers for service WorkflowAPI to "mux". +// The handlers forward requests to the grpc endpoint over the given implementation of "WorkflowAPIClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "WorkflowAPIClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "WorkflowAPIClient" to call the correct interceptors. +func RegisterWorkflowAPIHandlerClient(ctx context.Context, mux *runtime.ServeMux, client WorkflowAPIClient) error { mux.Handle("POST", pattern_WorkflowAPI_Create_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -267,7 +284,7 @@ func RegisterWorkflowAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn }) mux.Handle("GET", pattern_WorkflowAPI_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -296,7 +313,7 @@ func RegisterWorkflowAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn }) mux.Handle("GET", pattern_WorkflowAPI_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -325,7 +342,7 @@ func RegisterWorkflowAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn }) mux.Handle("POST", pattern_WorkflowAPI_Validate_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -404,10 +421,18 @@ func RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx context.Context, mux * // RegisterWorkflowInvocationAPIHandler registers the http handlers for service WorkflowInvocationAPI to "mux". // The handlers forward requests to the grpc endpoint over "conn". func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { - client := NewWorkflowInvocationAPIClient(conn) + return RegisterWorkflowInvocationAPIHandlerClient(ctx, mux, NewWorkflowInvocationAPIClient(conn)) +} + +// RegisterWorkflowInvocationAPIHandler registers the http handlers for service WorkflowInvocationAPI to "mux". +// The handlers forward requests to the grpc endpoint over the given implementation of "WorkflowInvocationAPIClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "WorkflowInvocationAPIClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "WorkflowInvocationAPIClient" to call the correct interceptors. +func RegisterWorkflowInvocationAPIHandlerClient(ctx context.Context, mux *runtime.ServeMux, client WorkflowInvocationAPIClient) error { mux.Handle("POST", pattern_WorkflowInvocationAPI_Invoke_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -436,7 +461,7 @@ func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.Serv }) mux.Handle("POST", pattern_WorkflowInvocationAPI_InvokeSync_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -465,7 +490,7 @@ func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.Serv }) mux.Handle("GET", pattern_WorkflowInvocationAPI_InvokeSync_1, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -494,7 +519,7 @@ func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.Serv }) mux.Handle("DELETE", pattern_WorkflowInvocationAPI_Cancel_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -523,7 +548,7 @@ func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.Serv }) mux.Handle("GET", pattern_WorkflowInvocationAPI_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -552,7 +577,7 @@ func RegisterWorkflowInvocationAPIHandler(ctx context.Context, mux *runtime.Serv }) mux.Handle("GET", pattern_WorkflowInvocationAPI_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -639,10 +664,18 @@ func RegisterAdminAPIHandlerFromEndpoint(ctx context.Context, mux *runtime.Serve // RegisterAdminAPIHandler registers the http handlers for service AdminAPI to "mux". // The handlers forward requests to the grpc endpoint over "conn". func RegisterAdminAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { - client := NewAdminAPIClient(conn) + return RegisterAdminAPIHandlerClient(ctx, mux, NewAdminAPIClient(conn)) +} + +// RegisterAdminAPIHandler registers the http handlers for service AdminAPI to "mux". +// The handlers forward requests to the grpc endpoint over the given implementation of "AdminAPIClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "AdminAPIClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "AdminAPIClient" to call the correct interceptors. +func RegisterAdminAPIHandlerClient(ctx context.Context, mux *runtime.ServeMux, client AdminAPIClient) error { mux.Handle("GET", pattern_AdminAPI_Status_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(req.Context()) defer cancel() if cn, ok := w.(http.CloseNotifier); ok { go func(done <-chan struct{}, closed <-chan bool) { @@ -670,13 +703,46 @@ func RegisterAdminAPIHandler(ctx context.Context, mux *runtime.ServeMux, conn *g }) + mux.Handle("GET", pattern_AdminAPI_Version_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + if cn, ok := w.(http.CloseNotifier); ok { + go func(done <-chan struct{}, closed <-chan bool) { + select { + case <-done: + case <-closed: + cancel() + } + }(ctx.Done(), cn.CloseNotify()) + } + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_AdminAPI_Version_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_AdminAPI_Version_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } var ( pattern_AdminAPI_Status_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0}, []string{"status"}, "")) + + pattern_AdminAPI_Version_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0}, []string{"version"}, "")) ) var ( forward_AdminAPI_Status_0 = runtime.ForwardResponseMessage + + forward_AdminAPI_Version_0 = runtime.ForwardResponseMessage ) diff --git a/pkg/apiserver/apiserver.proto b/pkg/apiserver/apiserver.proto index 6ba3d0f6..9b069c3a 100644 --- a/pkg/apiserver/apiserver.proto +++ b/pkg/apiserver/apiserver.proto @@ -118,8 +118,18 @@ service AdminAPI { get: "/status" }; } + + rpc Version (google.protobuf.Empty) returns (VersionResp) { + option (google.api.http) = { + get: "/version" + }; + } } message Health { string status = 1; } + +message VersionResp { + string version = 1; +} diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index ecfbe4ad..46249443 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -14,8 +14,8 @@ import ( ) const ( - INVOKE_SYNC_TIMEOUT = time.Duration(10) * time.Minute - INVOKE_SYNC_POLLING_INTERVAL = time.Duration(1) * time.Second + invokeSyncTimeout = time.Duration(10) * time.Minute + invokeSyncPollingInterval = time.Duration(100) * time.Millisecond ) type grpcInvocationApiServer struct { @@ -43,7 +43,7 @@ func (gi *grpcInvocationApiServer) InvokeSync(ctx context.Context, spec *types.W return nil, err } - timeout, _ := context.WithTimeout(ctx, INVOKE_SYNC_TIMEOUT) + timeout, _ := context.WithTimeout(ctx, invokeSyncTimeout) var result *types.WorkflowInvocation for { wi := aggregates.NewWorkflowInvocation(wfiId, &types.WorkflowInvocation{}) @@ -65,7 +65,7 @@ func (gi *grpcInvocationApiServer) InvokeSync(ctx context.Context, spec *types.W return nil, errors.New("timeout occurred") default: // TODO polling is a temporary shortcut; needs optimizing. - time.Sleep(INVOKE_SYNC_POLLING_INTERVAL) + time.Sleep(invokeSyncPollingInterval) } } @@ -91,10 +91,10 @@ func (gi *grpcInvocationApiServer) Get(ctx context.Context, invocationId *Workfl } func (gi *grpcInvocationApiServer) List(context.Context, *empty.Empty) (*WorkflowInvocationList, error) { - invocations := []string{} + var invocations []string as := gi.wfiCache.List() for _, a := range as { - if a.Type != aggregates.TYPE_WORKFLOW_INVOCATION { + if a.Type != aggregates.TypeWorkflowInvocation { return nil, errors.New("invalid type in invocation cache") } diff --git a/pkg/apiserver/workflow.go b/pkg/apiserver/workflow.go index 20dbd4a2..c62f848d 100644 --- a/pkg/apiserver/workflow.go +++ b/pkg/apiserver/workflow.go @@ -57,7 +57,7 @@ func (ga *GrpcWorkflowApiServer) Get(ctx context.Context, workflowId *WorkflowId } func (ga *GrpcWorkflowApiServer) List(ctx context.Context, req *empty.Empty) (*SearchWorkflowResponse, error) { - results := []string{} + var results []string wfs := ga.cache.List() for _, result := range wfs { results = append(results, result.Id) diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index f9beb244..5bd2cedb 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -124,7 +124,7 @@ func NewSubscribedCache(ctx context.Context, cache CacheReaderWriter, target fun continue } logrus.WithField("msg", msg.Labels()).Debug("Cache received new event.") - err := c.HandleEvent(event) + err := c.ApplyEvent(event) if err != nil { logrus.WithField("err", err).Error("Failed to handle event") } @@ -135,7 +135,7 @@ func NewSubscribedCache(ctx context.Context, cache CacheReaderWriter, target fun return c } -func (uc *SubscribedCache) HandleEvent(event *Event) error { +func (uc *SubscribedCache) ApplyEvent(event *Event) error { logrus.WithFields(logrus.Fields{ "event.id": event.Id, "aggregate.id": event.Aggregate.Id, diff --git a/pkg/fes/eventstore/nats/client.go b/pkg/fes/eventstore/nats/client.go index 2139ecd2..f9a9875a 100644 --- a/pkg/fes/eventstore/nats/client.go +++ b/pkg/fes/eventstore/nats/client.go @@ -2,7 +2,6 @@ package nats import ( "fmt" - "strings" "github.com/fission/fission-workflows/pkg/fes" @@ -28,7 +27,6 @@ func NewEventStore(conn *WildcardConn) *EventStore { // Watch a aggregate type func (es *EventStore) Watch(aggregate fes.Aggregate) error { - subject := fmt.Sprintf("%s.>", aggregate.Type) sub, err := es.conn.Subscribe(subject, func(msg *stan.Msg) { event, err := toEvent(msg) @@ -41,7 +39,7 @@ func (es *EventStore) Watch(aggregate fes.Aggregate) error { "aggregate.id": event.Aggregate.Id, "event.type": event.Type, "event.id": event.Id, - "nats.subject": msg.Subject, + "nats.Subject": msg.Subject, }).Info("Publishing aggregate event to subscribers.") err = es.Publisher.Publish(event) @@ -62,8 +60,8 @@ func (es *EventStore) Close() error { return es.conn.Close() } -func (es *EventStore) HandleEvent(event *fes.Event) error { - // TODO make generic / configurable whether to fold event into parent's subject +func (es *EventStore) Append(event *fes.Event) error { + // TODO make generic / configurable whether to fold event into parent's Subject subject := toSubject(event.Aggregate) if event.Parent != nil { subject = toSubject(event.Parent) @@ -78,21 +76,20 @@ func (es *EventStore) HandleEvent(event *fes.Event) error { "event.type": event.Type, "aggregate.id": event.Aggregate.Id, "aggregate.type": event.Aggregate.Type, - "nats.subject": subject, + "nats.Subject": subject, }).Info("EventStore client appending event.") return es.conn.Publish(subject, data) } func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { - //logrus.WithField("subject", aggregateType).Debug("GET events from event store") subject := toSubject(aggregate) - msgs, err := es.conn.MsgSeqRange(subject, FIRST_MSG, MOST_RECENT_MSG) + msgs, err := es.conn.MsgSeqRange(subject, firstMsg, mostRecentMsg) if err != nil { return nil, err } - results := []*fes.Event{} + var results []*fes.Event for _, msg := range msgs { event, err := toEvent(msg) if err != nil { @@ -109,7 +106,7 @@ func (es *EventStore) List(matcher fes.StringMatcher) ([]fes.Aggregate, error) { if err != nil { return nil, err } - results := []fes.Aggregate{} + var results []fes.Aggregate for _, subject := range subjects { a := toAggregate(subject) results = append(results, *a) diff --git a/pkg/fes/eventstore/nats/nats.go b/pkg/fes/eventstore/nats/nats.go index 66288835..f5463278 100644 --- a/pkg/fes/eventstore/nats/nats.go +++ b/pkg/fes/eventstore/nats/nats.go @@ -1,18 +1,36 @@ package nats import ( + "encoding/json" + "errors" "fmt" - "time" - "strings" - - "encoding/json" + "time" "github.com/fission/fission-workflows/pkg/fes" "github.com/nats-io/go-nats-streaming" "github.com/sirupsen/logrus" ) +const ( + subjectActivity = "_activity" + mostRecentMsg uint64 = 0 + firstMsg uint64 = 1 + rangeFetchTimeout = time.Duration(1) * time.Minute +) + +type eventType int32 + +const ( + noop eventType = iota + deleted +) + +type subjectEvent struct { + Subject string `json:"Subject,omitempty"` + Type eventType `json:"type,omitempty"` +} + // Conn is a wrapper of 'stan.Conn' struct to augment the API with bounded subscriptions and channel-based subscriptions type Conn struct { stan.Conn @@ -28,13 +46,6 @@ func (cn *Conn) SubscribeChan(subject string, msgChan chan *stan.Msg, opts ...st }, opts...) } -const ( - SUBJECT_ACTIVITY = "_activity" -) - -const MOST_RECENT_MSG uint64 = 0 -const FIRST_MSG uint64 = 1 - // Msg has a python style element selector (-1 = len(events)-1) func (cn *Conn) Msg(subject string, seqId uint64) (*stan.Msg, error) { msgRange, err := cn.MsgSeqRange(subject, seqId, seqId) @@ -64,7 +75,7 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* select { case seqEnd = <-rightBound: case <-time.After(time.Duration(10) * time.Second): - return nil, fmt.Errorf("timed out while finding boundary for subject '%s'", subject) + return nil, fmt.Errorf("timed out while finding boundary for Subject '%s'", subject) } } @@ -80,15 +91,23 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* leftBoundOptions = append(leftBoundOptions, stan.StartAtSequence(seqStart)) } - result := []*stan.Msg{} - c := make(chan *stan.Msg) + var result []*stan.Msg + elementC := make(chan *stan.Msg) + errC := make(chan error) sub, err := cn.Subscribe(subject, func(msg *stan.Msg) { defer msg.Ack() - // TODO add a timeout here - c <- msg - if msg.Sequence == seqEnd { - msg.Sub.Close() - close(c) + + select { + case <-time.After(rangeFetchTimeout): + errC <- errors.New("range fetch timeout") + close(elementC) + close(errC) + case elementC <- msg: + if msg.Sequence == seqEnd { + msg.Sub.Close() + close(elementC) + close(errC) + } } }, leftBoundOptions...) if err != nil { @@ -96,11 +115,14 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* } defer sub.Close() - for msg := range c { - result = append(result, msg) + for { + select { + case err := <-errC: + return result, err + case msg := <-elementC: + result = append(result, msg) + } } - - return result, nil } // WildcardConn is an abstraction on top of Conn that provides wildcard support @@ -124,8 +146,8 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op sources: map[string]stan.Subscription{}, } - metaSub, err := wc.Conn.Subscribe(SUBJECT_ACTIVITY, func(msg *stan.Msg) { - subjectEvent := &SubjectEvent{} + metaSub, err := wc.Conn.Subscribe(subjectActivity, func(msg *stan.Msg) { + subjectEvent := &subjectEvent{} err := json.Unmarshal(msg.Data, subjectEvent) if err != nil { logrus.WithFields(logrus.Fields{ @@ -146,8 +168,11 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op "event": subjectEvent, }).Debug("NatsClient received activity.") + // TODO add semaphore for ws.sources manipulation switch subjectEvent.Type { - case ACTIVITY_CREATED: + case noop: + // Create a new listener if event is of a new subject + // TODO issue: if resumed previous event will be replayed, check if these are ignored by the event store! if _, ok := ws.sources[subject]; !ok { sub, err := wc.Subscribe(subject, cb, opts...) if err != nil { @@ -155,9 +180,16 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op } ws.sources[subject] = sub } + case deleted: + // Delete the current listener of the subject of the event + if _, ok := ws.sources[subject]; ok { + err := ws.sources[subject].Close() + if err != nil { + logrus.Errorf("Failed to close (sub)listener: %v", err) + } + } default: - // TODO notify subscription that wildcardSubject has been closed and close channel - panic(fmt.Sprintf("Unknown ActivityEvent: %v", subjectEvent)) + panic(fmt.Sprintf("Unknown eventType: %v", subjectEvent)) } }, stan.DeliverAllAvailable()) if err != nil { @@ -175,32 +207,32 @@ func (wc *WildcardConn) Publish(subject string, data []byte) error { return err } - // Announce subject activity on notification thread, because of missing wildcards in NATS streaming - activityEvent := &SubjectEvent{ + // Announce Subject activity on notification thread, because of missing wildcards in NATS streaming + activityEvent := &subjectEvent{ Subject: subject, - Type: ACTIVITY_CREATED, // TODO infer from context if created or closed + Type: noop, // TODO infer from context if noop or closed } err = wc.publishActivity(activityEvent) if err != nil { - logrus.Warnf("Failed to publish subject '%s': %v", subject, err) + logrus.Warnf("Failed to publish Subject '%s': %v", subject, err) } return nil } -func (wc *WildcardConn) publishActivity(activity *SubjectEvent) error { +func (wc *WildcardConn) publishActivity(activity *subjectEvent) error { subjectData, err := json.Marshal(activity) if err != nil { return err } - err = wc.Conn.Publish(SUBJECT_ACTIVITY, subjectData) + err = wc.Conn.Publish(subjectActivity, subjectData) if err != nil { return err } logrus.WithFields(logrus.Fields{ - "subject": SUBJECT_ACTIVITY, + "Subject": subjectActivity, "event": activity, }).Debug("Published activity event to event store.") @@ -209,18 +241,18 @@ func (wc *WildcardConn) publishActivity(activity *SubjectEvent) error { func (wc *WildcardConn) List(matcher fes.StringMatcher) ([]string, error) { - msgs, err := wc.Conn.MsgSeqRange(SUBJECT_ACTIVITY, FIRST_MSG, MOST_RECENT_MSG) + msgs, err := wc.Conn.MsgSeqRange(subjectActivity, firstMsg, mostRecentMsg) if err != nil { return nil, err } subjectCount := map[string]int{} for _, msg := range msgs { - subjectEvent := &SubjectEvent{} + subjectEvent := &subjectEvent{} err := json.Unmarshal(msg.Data, subjectEvent) if err != nil { logrus.WithFields(logrus.Fields{ "msg": subjectEvent, - "activitySubject": SUBJECT_ACTIVITY, + "activitySubject": subjectActivity, }).Warnf("Failed to parse subjectEvent.") continue } @@ -235,7 +267,7 @@ func (wc *WildcardConn) List(matcher fes.StringMatcher) ([]string, error) { } } - results := []string{} + var results []string for subject := range subjectCount { results = append(results, subject) } @@ -243,6 +275,7 @@ func (wc *WildcardConn) List(matcher fes.StringMatcher) ([]string, error) { return results, nil } +// WildcardSub is an abstraction on top of stan.Subscription that provides wildcard support type WildcardSub struct { subject string sources map[string]stan.Subscription @@ -294,15 +327,3 @@ func queryMatches(subject string, query string) bool { } return true } - -type ActivityEvent int32 - -const ( - ACTIVITY_CREATED ActivityEvent = iota - ACTIVITY_DELETED -) - -type SubjectEvent struct { - Subject string `json:"subject,omitempty"` - Type ActivityEvent `json:"type,omitempty"` -} diff --git a/pkg/fes/fes.pb.go b/pkg/fes/fes.pb.go index cd2aace4..00809f23 100644 --- a/pkg/fes/fes.pb.go +++ b/pkg/fes/fes.pb.go @@ -10,6 +10,7 @@ It is generated from these files: It has these top-level messages: Aggregate Event + EventHints */ package fes @@ -60,6 +61,7 @@ type Event struct { Timestamp *google_protobuf.Timestamp `protobuf:"bytes,4,opt,name=timestamp" json:"timestamp,omitempty"` Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` Parent *Aggregate `protobuf:"bytes,6,opt,name=parent" json:"parent,omitempty"` + Hints *EventHints `protobuf:"bytes,7,opt,name=hints" json:"hints,omitempty"` } func (m *Event) Reset() { *m = Event{} } @@ -109,26 +111,56 @@ func (m *Event) GetParent() *Aggregate { return nil } +func (m *Event) GetHints() *EventHints { + if m != nil { + return m.Hints + } + return nil +} + +// EventHints is a collection of optional metadata that help components in the event store to improve performance. +type EventHints struct { + Completed bool `protobuf:"varint,1,opt,name=completed" json:"completed,omitempty"` +} + +func (m *EventHints) Reset() { *m = EventHints{} } +func (m *EventHints) String() string { return proto.CompactTextString(m) } +func (*EventHints) ProtoMessage() {} +func (*EventHints) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *EventHints) GetCompleted() bool { + if m != nil { + return m.Completed + } + return false +} + func init() { - proto.RegisterType((*Aggregate)(nil), "Aggregate") - proto.RegisterType((*Event)(nil), "Event") + proto.RegisterType((*Aggregate)(nil), "fission.workflows.eventstore.Aggregate") + proto.RegisterType((*Event)(nil), "fission.workflows.eventstore.Event") + proto.RegisterType((*EventHints)(nil), "fission.workflows.eventstore.EventHints") } func init() { proto.RegisterFile("pkg/fes/fes.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 203 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0xc8, 0x4e, 0xd7, - 0x4f, 0x4b, 0x2d, 0x06, 0x61, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x29, 0xf9, 0xf4, 0xfc, 0xfc, - 0xf4, 0x9c, 0x54, 0x7d, 0x30, 0x2f, 0xa9, 0x34, 0x4d, 0xbf, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, - 0x31, 0xb7, 0x00, 0xa2, 0x40, 0x49, 0x9f, 0x8b, 0xd3, 0x31, 0x3d, 0xbd, 0x28, 0x35, 0x3d, 0xb1, - 0x24, 0x55, 0x88, 0x8f, 0x8b, 0x29, 0x33, 0x45, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x88, 0x29, - 0x33, 0x45, 0x48, 0x88, 0x8b, 0xa5, 0xa4, 0xb2, 0x20, 0x55, 0x82, 0x09, 0x2c, 0x02, 0x66, 0x2b, - 0x1d, 0x67, 0xe4, 0x62, 0x75, 0x2d, 0x4b, 0xcd, 0x2b, 0x21, 0x46, 0xb5, 0x90, 0x06, 0x17, 0x67, - 0x22, 0xcc, 0x78, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x6e, 0x23, 0x2e, 0x3d, 0xb8, 0x85, 0x41, 0x08, - 0x49, 0x21, 0x0b, 0x2e, 0x4e, 0xb8, 0xdb, 0x24, 0x58, 0xc0, 0x2a, 0xa5, 0xf4, 0x20, 0xae, 0xd7, - 0x83, 0xb9, 0x5e, 0x2f, 0x04, 0xa6, 0x22, 0x08, 0xa1, 0x18, 0x64, 0x6f, 0x4a, 0x62, 0x49, 0xa2, - 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0x2d, 0xa4, 0xc4, 0xc5, 0x56, 0x90, 0x58, 0x94, - 0x9a, 0x57, 0x22, 0xc1, 0x86, 0x61, 0x29, 0x54, 0x26, 0x89, 0x0d, 0x6c, 0xac, 0x31, 0x20, 0x00, - 0x00, 0xff, 0xff, 0xa5, 0xd3, 0x70, 0x72, 0x37, 0x01, 0x00, 0x00, + // 280 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0x41, 0x4b, 0xfc, 0x30, + 0x10, 0xc5, 0x69, 0x77, 0xbb, 0xff, 0x7f, 0x47, 0x11, 0xcc, 0x29, 0x2c, 0x0b, 0x2e, 0xbd, 0x58, + 0x3c, 0xa4, 0xa0, 0x17, 0x4f, 0x8a, 0xc2, 0x82, 0xe7, 0xe0, 0xc9, 0x5b, 0xd6, 0x4e, 0x63, 0xd8, + 0xb6, 0x09, 0xcd, 0xe8, 0xe2, 0xc7, 0xf3, 0x9b, 0x49, 0x53, 0xbb, 0xbd, 0x2d, 0x7a, 0x08, 0x0c, + 0x8f, 0xf7, 0x9b, 0xbc, 0x37, 0x70, 0xee, 0x76, 0xba, 0xa8, 0xd0, 0xf7, 0x4f, 0xb8, 0xce, 0x92, + 0x65, 0xab, 0xca, 0x78, 0x6f, 0x6c, 0x2b, 0xf6, 0xb6, 0xdb, 0x55, 0xb5, 0xdd, 0x7b, 0x81, 0x1f, + 0xd8, 0x92, 0x27, 0xdb, 0xe1, 0xf2, 0x42, 0x5b, 0xab, 0x6b, 0x2c, 0x82, 0x77, 0xfb, 0x5e, 0x15, + 0x64, 0x1a, 0xf4, 0xa4, 0x1a, 0x37, 0xe0, 0x59, 0x01, 0xe9, 0x83, 0xd6, 0x1d, 0x6a, 0x45, 0xc8, + 0xce, 0x20, 0x36, 0x25, 0x8f, 0xd6, 0x51, 0x9e, 0xca, 0xd8, 0x94, 0x8c, 0xc1, 0x9c, 0x3e, 0x1d, + 0xf2, 0x38, 0x28, 0x61, 0xce, 0xbe, 0x62, 0x48, 0x36, 0xfd, 0x07, 0xbf, 0x71, 0xb3, 0x0d, 0xa4, + 0x6a, 0x5c, 0xcf, 0x67, 0xeb, 0x28, 0x3f, 0xb9, 0xbe, 0x14, 0xc7, 0x12, 0x8b, 0x43, 0x1a, 0x39, + 0x91, 0xec, 0x16, 0xd2, 0x43, 0x70, 0x3e, 0x0f, 0x6b, 0x96, 0x62, 0xa8, 0x26, 0xc6, 0x6a, 0xe2, + 0x79, 0x74, 0xc8, 0xc9, 0xdc, 0x87, 0x2a, 0x15, 0x29, 0x9e, 0xac, 0xa3, 0xfc, 0x54, 0x86, 0x99, + 0xdd, 0xc3, 0xc2, 0xa9, 0x0e, 0x5b, 0xe2, 0x8b, 0xbf, 0x25, 0xfa, 0xc1, 0xd8, 0x1d, 0x24, 0x6f, + 0xa6, 0x25, 0xcf, 0xff, 0x05, 0x3e, 0x3f, 0xce, 0x87, 0x6b, 0x3d, 0xf5, 0x7e, 0x39, 0x60, 0xd9, + 0x15, 0xc0, 0x24, 0xb2, 0x15, 0xa4, 0xaf, 0xb6, 0x71, 0x35, 0x12, 0x0e, 0xe7, 0xfc, 0x2f, 0x27, + 0xe1, 0x31, 0x79, 0x99, 0x55, 0xe8, 0xb7, 0x8b, 0x50, 0xf3, 0xe6, 0x3b, 0x00, 0x00, 0xff, 0xff, + 0x09, 0xe1, 0x0f, 0x3d, 0x02, 0x02, 0x00, 0x00, } diff --git a/pkg/fes/fes.proto b/pkg/fes/fes.proto index a895c09a..b0ecc06e 100644 --- a/pkg/fes/fes.proto +++ b/pkg/fes/fes.proto @@ -1,5 +1,8 @@ syntax = "proto3"; +package fission.workflows.eventstore; +option go_package = "fes"; + import "google/protobuf/timestamp.proto"; message Aggregate { @@ -13,5 +16,11 @@ message Event { Aggregate aggregate = 3; google.protobuf.Timestamp timestamp = 4; bytes data = 5; - Aggregate parent = 6; + Aggregate parent = 6; // TODO what is the difference between a parent and a aggregate ?? + EventHints hints = 7; +} + +// EventHints is a collection of optional metadata that help components in the event store to improve performance. +message EventHints { + bool completed = 1; } diff --git a/pkg/fes/types.go b/pkg/fes/types.go index 4d6bf36a..cf42b83a 100644 --- a/pkg/fes/types.go +++ b/pkg/fes/types.go @@ -2,14 +2,16 @@ package fes import "github.com/fission/fission-workflows/pkg/util/pubsub" -// Aggregator is a entity that can be update +// Aggregator is a entity that can be updated +// TODO we need to keep more event-related information (such as current index) type Aggregator interface { // Entity-specific + // TODO can we avoid mutability here? ApplyEvent(event *Event) error // Aggregate provides type information about the entity, such as the aggregate id and the aggregate type. // - // Implemented by AggregatorMixin + // This is implemented by AggregatorMixin Aggregate() Aggregate // UpdateState mutates the current entity to the provided target state @@ -18,23 +20,17 @@ type Aggregator interface { UpdateState(targetState Aggregator) error } -type EventHandler interface { - HandleEvent(event *Event) error +type EventAppender interface { + Append(event *Event) error } // EventStore is a persistent store for events type EventStore interface { - EventHandler + EventAppender Get(aggregate *Aggregate) ([]*Event, error) List(matcher StringMatcher) ([]Aggregate, error) } -// EventBus is the volatile reactive store that processes, stores events, and notifies subscribers -//type Dispatcher interface { -// EventHandler -// //pubsub.Publisher -//} - // Projector projects events into an entity type Projector interface { Project(target Aggregator, events ...*Event) error diff --git a/pkg/types/aggregates/invocation.go b/pkg/types/aggregates/invocation.go index cf5dacd0..c311acde 100644 --- a/pkg/types/aggregates/invocation.go +++ b/pkg/types/aggregates/invocation.go @@ -15,7 +15,7 @@ import ( ) const ( - TYPE_WORKFLOW_INVOCATION = "invocation" + TypeWorkflowInvocation = "invocation" ) type WorkflowInvocation struct { @@ -35,13 +35,13 @@ func NewWorkflowInvocation(invocationId string, wi *types.WorkflowInvocation) *W func NewWorkflowInvocationAggregate(invocationId string) *fes.Aggregate { return &fes.Aggregate{ Id: invocationId, - Type: TYPE_WORKFLOW_INVOCATION, + Type: TypeWorkflowInvocation, } } func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error { // If the event is a function event, use the Function Aggregate to resolve it. - if event.Aggregate.Type == TYPE_FUNCTION_INVOCATION { + if event.Aggregate.Type == TypeTaskInvocation { return wi.applyTaskEvent(event) } diff --git a/pkg/types/aggregates/function.go b/pkg/types/aggregates/task.go similarity index 97% rename from pkg/types/aggregates/function.go rename to pkg/types/aggregates/task.go index 77898ff2..ef66ad65 100644 --- a/pkg/types/aggregates/function.go +++ b/pkg/types/aggregates/task.go @@ -11,7 +11,7 @@ import ( ) const ( - TYPE_FUNCTION_INVOCATION = "function" + TypeTaskInvocation = "function" ) type TaskInvocation struct { @@ -32,7 +32,7 @@ func NewTaskInvocation(id string, fi *types.TaskInvocation) *TaskInvocation { func NewTaskInvocationAggregate(id string) *fes.Aggregate { return &fes.Aggregate{ Id: id, - Type: TYPE_FUNCTION_INVOCATION, + Type: TypeTaskInvocation, } } diff --git a/pkg/types/aggregates/workflow.go b/pkg/types/aggregates/workflow.go index 37a43cd3..c9953472 100644 --- a/pkg/types/aggregates/workflow.go +++ b/pkg/types/aggregates/workflow.go @@ -9,7 +9,7 @@ import ( ) const ( - TYPE_WORKFLOW = "workflow" + TypeWorkflow = "workflow" ) type Workflow struct { @@ -29,7 +29,7 @@ func NewWorkflow(workflowId string, wi *types.Workflow) *Workflow { func NewWorkflowAggregate(workflowId string) *fes.Aggregate { return &fes.Aggregate{ Id: workflowId, - Type: TYPE_WORKFLOW, + Type: TypeWorkflow, } } diff --git a/pkg/types/events/util.go b/pkg/types/events/util.go index c7ffb0f1..560326ac 100644 --- a/pkg/types/events/util.go +++ b/pkg/types/events/util.go @@ -5,14 +5,14 @@ import ( ) var ( - ErrUnkownEvent = errors.New("unknown event") + ErrUnknownEvent = errors.New("unknown event") ) // Resolve attempts to convert a string-based flag to the appropriate InvocationEvent. func ParseInvocation(event string) (Invocation, error) { val, ok := Invocation_value[event] if !ok { - return -1, ErrUnkownEvent + return -1, ErrUnknownEvent } return Invocation(val), nil } @@ -20,7 +20,7 @@ func ParseInvocation(event string) (Invocation, error) { func ParseWorkflow(flag string) (Workflow, error) { val, ok := Workflow_value[flag] if !ok { - return -1, ErrUnkownEvent + return -1, ErrUnknownEvent } return Workflow(val), nil } @@ -28,7 +28,7 @@ func ParseWorkflow(flag string) (Workflow, error) { func ParseFunction(event string) (Function, error) { val, ok := Function_value[event] if !ok { - return -1, ErrUnkownEvent + return -1, ErrUnknownEvent } return Function(val), nil }