From 05f7c298d4e943bb02bcdaf9ae42066e28aea72d Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Tue, 15 Oct 2024 00:35:05 +0100 Subject: [PATCH] Failure Policy: Adds FailurePolicy API field to Job (#52) * Failure Policy: Adds FailurePolicy API field to Job Based of https://github.com/dapr/proposals/pull/66 Adds a `FailurePolicy` option to the `Job` API to allow re-triggering job which are marked as failed by the caller. Adds two types of policy; `Drop` and `Constant`. `Drop` has no retry policy, `Constant` will constantly retry the job trigger for a configurable delay, up to a configurable maximum number of retries (which could be infinite). Note that the failure policy retry cadence has no effect on the actual Job schedule, meaning if a job was to be retired and eventually succeeded, the Job would continue to trigger at the origin configured schedule. By default, all Jobs will have a `Constant` policy with a delay of 1s. Signed-off-by: joshvanl * Linter Signed-off-by: joshvanl * Rename `Constant` FailurePolicy `delay` to `interval` Signed-off-by: joshvanl --------- Signed-off-by: joshvanl --- .golangci.yaml | 3 + README.md | 8 +- api/failurepolicy.pb.go | 340 ++++++++ api/job.pb.go | 82 +- cron/cron.go | 5 +- cron/cron_test.go | 208 ++++- flake.lock | 12 +- internal/api/api.go | 4 +- internal/api/api_test.go | 30 +- internal/api/serve_test.go | 44 +- internal/api/stored/counter.pb.go | 24 +- internal/api/validator/validator_test.go | 7 +- internal/client/client.go | 38 +- internal/client/client_test.go | 67 +- internal/client/fake/fake.go | 117 +++ internal/client/fake/fake_test.go | 17 + internal/counter/counter.go | 130 +++- internal/counter/counter_test.go | 941 ++++++++++++++++++++++- internal/garbage/collector_test.go | 24 +- internal/grave/yard_test.go | 4 +- internal/informer/informer_test.go | 2 +- internal/leadership/leadership_test.go | 8 +- internal/partitioner/zero_test.go | 2 +- internal/queue/queue.go | 31 +- internal/queue/queue_test.go | 11 +- internal/scheduler/builder.go | 25 +- internal/scheduler/builder_test.go | 88 ++- nix/ci.nix | 3 +- proto/api/failurepolicy.proto | 37 + proto/api/job.proto | 9 +- proto/stored/counter.proto | 5 + tests/cron/cron.go | 4 + tests/tests.go | 6 +- 33 files changed, 2074 insertions(+), 262 deletions(-) create mode 100644 api/failurepolicy.pb.go create mode 100644 internal/client/fake/fake.go create mode 100644 internal/client/fake/fake_test.go create mode 100644 proto/api/failurepolicy.proto diff --git a/.golangci.yaml b/.golangci.yaml index 1589e1e..f6c1577 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -19,6 +19,9 @@ linters: - funlen - maintidx - containedctx + - mnd + - contextcheck + - err113 linters-settings: goimports: local-prefixes: github.com/diagridio diff --git a/README.md b/README.md index 5e86590..0059fd9 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,8 @@ func main() { TriggerFn: func(context.Context, *api.TriggerRequest) bool { // Do something with your trigger here. // Return true if the trigger was successful, false otherwise. - // Note, returning false will cause the job to be retried *immediately*. + // Note, returning false will cause the job to be retried according to + // the Jobs configurable FailurePolicy. return true }, }) @@ -70,6 +71,11 @@ A Job itself is made up of the following fields: Optional. - `Payload`: A protobuf Any message that can be used to store the main payload of the job which will be passed to the trigger function. Optional. +- `FailurePolicy` Controls whether the Job should be retired if the trigger + function returns false. `Drop` doesn't retry the job, `Constant `Constant` will + constantly retry the job trigger for a configurable internal, up to a configurable + maximum number of retries (which could be infinite). By default, Jobs have a + `Constant` policy, with a 1s interval and 3 maximum retries. A job must have *at least* either a `Schedule` or a `DueTime` set. diff --git a/api/failurepolicy.pb.go b/api/failurepolicy.pb.go new file mode 100644 index 0000000..b5d6139 --- /dev/null +++ b/api/failurepolicy.pb.go @@ -0,0 +1,340 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v4.25.4 +// source: proto/api/failurepolicy.proto + +package api + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// FailurePolicy defines the policy to apply when a job fails to trigger. +type FailurePolicy struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // policy is the policy to apply when a job fails to trigger. + // + // Types that are assignable to Policy: + // + // *FailurePolicy_Drop + // *FailurePolicy_Constant + Policy isFailurePolicy_Policy `protobuf_oneof:"policy"` +} + +func (x *FailurePolicy) Reset() { + *x = FailurePolicy{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_api_failurepolicy_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FailurePolicy) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FailurePolicy) ProtoMessage() {} + +func (x *FailurePolicy) ProtoReflect() protoreflect.Message { + mi := &file_proto_api_failurepolicy_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FailurePolicy.ProtoReflect.Descriptor instead. +func (*FailurePolicy) Descriptor() ([]byte, []int) { + return file_proto_api_failurepolicy_proto_rawDescGZIP(), []int{0} +} + +func (m *FailurePolicy) GetPolicy() isFailurePolicy_Policy { + if m != nil { + return m.Policy + } + return nil +} + +func (x *FailurePolicy) GetDrop() *FailurePolicyDrop { + if x, ok := x.GetPolicy().(*FailurePolicy_Drop); ok { + return x.Drop + } + return nil +} + +func (x *FailurePolicy) GetConstant() *FailurePolicyConstant { + if x, ok := x.GetPolicy().(*FailurePolicy_Constant); ok { + return x.Constant + } + return nil +} + +type isFailurePolicy_Policy interface { + isFailurePolicy_Policy() +} + +type FailurePolicy_Drop struct { + Drop *FailurePolicyDrop `protobuf:"bytes,1,opt,name=drop,proto3,oneof"` +} + +type FailurePolicy_Constant struct { + Constant *FailurePolicyConstant `protobuf:"bytes,2,opt,name=constant,proto3,oneof"` +} + +func (*FailurePolicy_Drop) isFailurePolicy_Policy() {} + +func (*FailurePolicy_Constant) isFailurePolicy_Policy() {} + +// FailurePolicyDrop is a policy which drops the job tick when the job fails to +// trigger. +type FailurePolicyDrop struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *FailurePolicyDrop) Reset() { + *x = FailurePolicyDrop{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_api_failurepolicy_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FailurePolicyDrop) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FailurePolicyDrop) ProtoMessage() {} + +func (x *FailurePolicyDrop) ProtoReflect() protoreflect.Message { + mi := &file_proto_api_failurepolicy_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FailurePolicyDrop.ProtoReflect.Descriptor instead. +func (*FailurePolicyDrop) Descriptor() ([]byte, []int) { + return file_proto_api_failurepolicy_proto_rawDescGZIP(), []int{1} +} + +// FailurePolicyRetry is a policy which retries the job at a consistent +// interval when the job fails to trigger. +type FailurePolicyConstant struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // interval is the constant delay to wait before retrying the job. + Interval *durationpb.Duration `protobuf:"bytes,1,opt,name=interval,proto3" json:"interval,omitempty"` + // max_retries is the optional maximum number of retries to attempt before + // giving up. + // If unset, the Job will be retried indefinitely. + MaxRetries *uint32 `protobuf:"varint,2,opt,name=max_retries,json=maxRetries,proto3,oneof" json:"max_retries,omitempty"` +} + +func (x *FailurePolicyConstant) Reset() { + *x = FailurePolicyConstant{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_api_failurepolicy_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FailurePolicyConstant) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FailurePolicyConstant) ProtoMessage() {} + +func (x *FailurePolicyConstant) ProtoReflect() protoreflect.Message { + mi := &file_proto_api_failurepolicy_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FailurePolicyConstant.ProtoReflect.Descriptor instead. +func (*FailurePolicyConstant) Descriptor() ([]byte, []int) { + return file_proto_api_failurepolicy_proto_rawDescGZIP(), []int{2} +} + +func (x *FailurePolicyConstant) GetInterval() *durationpb.Duration { + if x != nil { + return x.Interval + } + return nil +} + +func (x *FailurePolicyConstant) GetMaxRetries() uint32 { + if x != nil && x.MaxRetries != nil { + return *x.MaxRetries + } + return 0 +} + +var File_proto_api_failurepolicy_proto protoreflect.FileDescriptor + +var file_proto_api_failurepolicy_proto_rawDesc = []byte{ + 0x0a, 0x1d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x66, 0x61, 0x69, 0x6c, + 0x75, 0x72, 0x65, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x03, 0x61, 0x70, 0x69, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x81, 0x01, 0x0a, 0x0d, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, + 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x12, 0x2c, 0x0a, 0x04, 0x64, 0x72, 0x6f, 0x70, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, + 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x44, 0x72, 0x6f, 0x70, 0x48, 0x00, 0x52, 0x04, + 0x64, 0x72, 0x6f, 0x70, 0x12, 0x38, 0x0a, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x46, 0x61, 0x69, + 0x6c, 0x75, 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x43, 0x6f, 0x6e, 0x73, 0x74, 0x61, + 0x6e, 0x74, 0x48, 0x00, 0x52, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x42, 0x08, + 0x0a, 0x06, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x22, 0x13, 0x0a, 0x11, 0x46, 0x61, 0x69, 0x6c, + 0x75, 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x44, 0x72, 0x6f, 0x70, 0x22, 0x84, 0x01, + 0x0a, 0x15, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x43, + 0x6f, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x74, 0x12, 0x35, 0x0a, 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x24, + 0x0a, 0x0b, 0x6d, 0x61, 0x78, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0a, 0x6d, 0x61, 0x78, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, + 0x73, 0x88, 0x01, 0x01, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x6d, 0x61, 0x78, 0x5f, 0x72, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x73, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, + 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_api_failurepolicy_proto_rawDescOnce sync.Once + file_proto_api_failurepolicy_proto_rawDescData = file_proto_api_failurepolicy_proto_rawDesc +) + +func file_proto_api_failurepolicy_proto_rawDescGZIP() []byte { + file_proto_api_failurepolicy_proto_rawDescOnce.Do(func() { + file_proto_api_failurepolicy_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_api_failurepolicy_proto_rawDescData) + }) + return file_proto_api_failurepolicy_proto_rawDescData +} + +var file_proto_api_failurepolicy_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_proto_api_failurepolicy_proto_goTypes = []interface{}{ + (*FailurePolicy)(nil), // 0: api.FailurePolicy + (*FailurePolicyDrop)(nil), // 1: api.FailurePolicyDrop + (*FailurePolicyConstant)(nil), // 2: api.FailurePolicyConstant + (*durationpb.Duration)(nil), // 3: google.protobuf.Duration +} +var file_proto_api_failurepolicy_proto_depIdxs = []int32{ + 1, // 0: api.FailurePolicy.drop:type_name -> api.FailurePolicyDrop + 2, // 1: api.FailurePolicy.constant:type_name -> api.FailurePolicyConstant + 3, // 2: api.FailurePolicyConstant.interval:type_name -> google.protobuf.Duration + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_proto_api_failurepolicy_proto_init() } +func file_proto_api_failurepolicy_proto_init() { + if File_proto_api_failurepolicy_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_api_failurepolicy_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FailurePolicy); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_api_failurepolicy_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FailurePolicyDrop); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_api_failurepolicy_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FailurePolicyConstant); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_api_failurepolicy_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*FailurePolicy_Drop)(nil), + (*FailurePolicy_Constant)(nil), + } + file_proto_api_failurepolicy_proto_msgTypes[2].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_api_failurepolicy_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_api_failurepolicy_proto_goTypes, + DependencyIndexes: file_proto_api_failurepolicy_proto_depIdxs, + MessageInfos: file_proto_api_failurepolicy_proto_msgTypes, + }.Build() + File_proto_api_failurepolicy_proto = out.File + file_proto_api_failurepolicy_proto_rawDesc = nil + file_proto_api_failurepolicy_proto_goTypes = nil + file_proto_api_failurepolicy_proto_depIdxs = nil +} diff --git a/api/job.pb.go b/api/job.pb.go index 7b311b4..15dd2ca 100644 --- a/api/job.pb.go +++ b/api/job.pb.go @@ -14,7 +14,6 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" - _ "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -72,6 +71,12 @@ type Job struct { // payload is the serialized job payload that will be sent to the recipient // when the job is triggered. Payload *anypb.Any `protobuf:"bytes,6,opt,name=payload,proto3" json:"payload,omitempty"` + // failure_policy is the optional policy to apply when a job fails to + // trigger. + // By default, the failure policy is FailurePolicyConstant with a 1s interval + // and 3 maximum retries. + // See `failurepolicy.proto` for more information. + FailurePolicy *FailurePolicy `protobuf:"bytes,7,opt,name=failure_policy,json=failurePolicy,proto3,oneof" json:"failure_policy,omitempty"` } func (x *Job) Reset() { @@ -148,35 +153,47 @@ func (x *Job) GetPayload() *anypb.Any { return nil } +func (x *Job) GetFailurePolicy() *FailurePolicy { + if x != nil { + return x.FailurePolicy + } + return nil +} + var File_proto_api_job_proto protoreflect.FileDescriptor var file_proto_api_job_proto_rawDesc = []byte{ 0x0a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6a, 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x61, 0x70, 0x69, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8c, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x1f, - 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x48, 0x00, 0x52, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x88, 0x01, 0x01, 0x12, - 0x1e, 0x0a, 0x08, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x48, 0x01, 0x52, 0x07, 0x64, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, - 0x15, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x02, 0x52, 0x03, - 0x74, 0x74, 0x6c, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, 0x74, - 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x03, 0x52, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, - 0x74, 0x73, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x6d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, - 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x73, 0x63, 0x68, 0x65, - 0x64, 0x75, 0x6c, 0x65, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x42, 0x06, 0x0a, 0x04, 0x5f, 0x74, 0x74, 0x6c, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x72, 0x65, - 0x70, 0x65, 0x61, 0x74, 0x73, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, - 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdf, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x1f, 0x0a, 0x08, + 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, + 0x52, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x88, 0x01, 0x01, 0x12, 0x1e, 0x0a, + 0x08, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, + 0x01, 0x52, 0x07, 0x64, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, 0x15, 0x0a, + 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x02, 0x52, 0x03, 0x74, 0x74, + 0x6c, 0x88, 0x01, 0x01, 0x12, 0x1d, 0x0a, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, 0x74, 0x73, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x03, 0x52, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, 0x74, 0x73, + 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x3e, 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, + 0x5f, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x61, 0x70, 0x69, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, 0x63, + 0x79, 0x48, 0x04, 0x52, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x50, 0x6f, 0x6c, 0x69, + 0x63, 0x79, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, + 0x6c, 0x65, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, + 0x06, 0x0a, 0x04, 0x5f, 0x74, 0x74, 0x6c, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x72, 0x65, 0x70, 0x65, + 0x61, 0x74, 0x73, 0x42, 0x11, 0x0a, 0x0f, 0x5f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, + 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, + 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x69, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -193,17 +210,19 @@ func file_proto_api_job_proto_rawDescGZIP() []byte { var file_proto_api_job_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_proto_api_job_proto_goTypes = []interface{}{ - (*Job)(nil), // 0: api.Job - (*anypb.Any)(nil), // 1: google.protobuf.Any + (*Job)(nil), // 0: api.Job + (*anypb.Any)(nil), // 1: google.protobuf.Any + (*FailurePolicy)(nil), // 2: api.FailurePolicy } var file_proto_api_job_proto_depIdxs = []int32{ 1, // 0: api.Job.metadata:type_name -> google.protobuf.Any 1, // 1: api.Job.payload:type_name -> google.protobuf.Any - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 2: api.Job.failure_policy:type_name -> api.FailurePolicy + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_proto_api_job_proto_init() } @@ -211,6 +230,7 @@ func file_proto_api_job_proto_init() { if File_proto_api_job_proto != nil { return } + file_proto_api_failurepolicy_proto_init() if !protoimpl.UnsafeEnabled { file_proto_api_job_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Job); i { diff --git a/cron/cron.go b/cron/cron.go index a60205d..d4642de 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -109,7 +109,10 @@ func New(opts Options) (api.Interface, error) { log = log.WithName("diagrid-cron") } - client := client.New(opts.Client) + client := client.New(client.Options{ + Log: log, + Client: opts.Client, + }) collector, err := garbage.New(garbage.Options{ Log: log, diff --git a/cron/cron_test.go b/cron/cron_test.go index bc01176..7841cd7 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -20,6 +20,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -53,7 +54,7 @@ func Test_retry(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) lock.Lock() triggered := helper.triggered.Load() - triggered += 1 + triggered++ ok = true lock.Unlock() assert.EventuallyWithT(t, func(c *assert.CollectT) { @@ -141,7 +142,7 @@ func Test_patition(t *testing.T) { helper := testCron(t, 100) - for i := 0; i < 100; i++ { + for i := range 100 { job := &api.Job{ DueTime: ptr.Of(time.Now().Add(time.Second).Format(time.RFC3339)), } @@ -310,7 +311,7 @@ func Test_parallel(t *testing.T) { }, }) - for i := 0; i < 100; i++ { + for i := range 100 { require.NoError(t, helper.api.Add(helper.ctx, strconv.Itoa(i), &api.Job{ DueTime: ptr.Of("0s"), })) @@ -506,7 +507,7 @@ func Test_jobWithSpace(t *testing.T) { DueTime: ptr.Of(time.Now().Add(2).Format(time.RFC3339)), })) resp, err := cron.api.Get(context.Background(), "hello world") - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, resp) assert.EventuallyWithT(t, func(c *assert.CollectT) { @@ -520,18 +521,193 @@ func Test_jobWithSpace(t *testing.T) { Schedule: ptr.Of("@every 1s"), })) resp, err = cron.api.Get(context.Background(), "another hello world") - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, resp) listresp, err := cron.api.List(context.Background(), "") - assert.NoError(t, err) - assert.Len(t, listresp.Jobs, 1) + require.NoError(t, err) + assert.Len(t, listresp.GetJobs(), 1) require.NoError(t, cron.api.Delete(context.Background(), "another hello world")) resp, err = cron.api.Get(context.Background(), "another hello world") - assert.NoError(t, err) + require.NoError(t, err) assert.Nil(t, resp) listresp, err = cron.api.List(context.Background(), "") - assert.NoError(t, err) - assert.Empty(t, listresp.Jobs) + require.NoError(t, err) + assert.Empty(t, listresp.GetJobs()) +} + +func Test_FailurePolicy(t *testing.T) { + t.Parallel() + + t.Run("default policy should retry 3 times with a 1sec interval", func(t *testing.T) { + t.Parallel() + + gotCh := make(chan *api.TriggerRequest, 1) + var got atomic.Uint32 + cron := testCronWithOptions(t, testCronOptions{ + total: 1, + client: tests.EmbeddedETCDBareClient(t), + triggerFn: func(*api.TriggerRequest) bool { + assert.GreaterOrEqual(t, uint32(8), got.Add(1)) + return false + }, + gotCh: gotCh, + }) + + require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(2)), + })) + + for range 8 { + resp, err := cron.api.Get(context.Background(), "test") + require.NoError(t, err) + assert.NotNil(t, resp) + select { + case <-gotCh: + case <-time.After(time.Second * 3): + assert.Fail(t, "timeout waiting for trigger") + } + } + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + resp, err := cron.api.Get(context.Background(), "test") + assert.NoError(c, err) + assert.Nil(c, resp) + }, time.Second*5, time.Millisecond*10) + }) + + t.Run("drop policy should not retry triggering", func(t *testing.T) { + t.Parallel() + + gotCh := make(chan *api.TriggerRequest, 1) + var got atomic.Uint32 + cron := testCronWithOptions(t, testCronOptions{ + total: 1, + client: tests.EmbeddedETCDBareClient(t), + triggerFn: func(*api.TriggerRequest) bool { + assert.GreaterOrEqual(t, uint32(2), got.Add(1)) + return false + }, + gotCh: gotCh, + }) + + require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(2)), + FailurePolicy: &api.FailurePolicy{ + Policy: new(api.FailurePolicy_Drop), + }, + })) + + for range 2 { + resp, err := cron.api.Get(context.Background(), "test") + require.NoError(t, err) + assert.NotNil(t, resp) + select { + case <-gotCh: + case <-time.After(time.Second * 3): + assert.Fail(t, "timeout waiting for trigger") + } + } + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + resp, err := cron.api.Get(context.Background(), "test") + assert.NoError(c, err) + assert.Nil(c, resp) + }, time.Second*5, time.Millisecond*10) + }) + + t.Run("constant policy should only retry when it fails ", func(t *testing.T) { + t.Parallel() + + gotCh := make(chan *api.TriggerRequest, 1) + var got atomic.Uint32 + cron := testCronWithOptions(t, testCronOptions{ + total: 1, + client: tests.EmbeddedETCDBareClient(t), + triggerFn: func(*api.TriggerRequest) bool { + assert.GreaterOrEqual(t, uint32(5), got.Add(1)) + return got.Load() == 3 + }, + gotCh: gotCh, + }) + + require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Millisecond), MaxRetries: ptr.Of(uint32(1)), + }, + }, + }, + })) + + for range 5 { + resp, err := cron.api.Get(context.Background(), "test") + require.NoError(t, err) + assert.NotNil(t, resp) + select { + case <-gotCh: + case <-time.After(time.Second * 3): + assert.Fail(t, "timeout waiting for trigger") + } + } + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + resp, err := cron.api.Get(context.Background(), "test") + assert.NoError(c, err) + assert.Nil(c, resp) + }, time.Second*5, time.Millisecond*10) + }) + + t.Run("constant policy can retry forever until it succeeds", func(t *testing.T) { + t.Parallel() + + gotCh := make(chan *api.TriggerRequest, 1) + var got atomic.Uint32 + cron := testCronWithOptions(t, testCronOptions{ + total: 1, + client: tests.EmbeddedETCDBareClient(t), + triggerFn: func(*api.TriggerRequest) bool { + assert.GreaterOrEqual(t, uint32(100), got.Add(1)) + return got.Load() == 100 + }, + gotCh: gotCh, + }) + + require.NoError(t, cron.api.Add(context.Background(), "test", &api.Job{ + DueTime: ptr.Of(time.Now().Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Millisecond), + }, + }, + }, + })) + + for range 100 { + resp, err := cron.api.Get(context.Background(), "test") + require.NoError(t, err) + assert.NotNil(t, resp) + select { + case <-gotCh: + case <-time.After(time.Second * 3): + assert.Fail(t, "timeout waiting for trigger") + } + } + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + resp, err := cron.api.Get(context.Background(), "test") + assert.NoError(c, err) + assert.Nil(c, resp) + }, time.Second*5, time.Millisecond*10) + }) } type testCronOptions struct { @@ -560,7 +736,7 @@ func testCron(t *testing.T, total uint32) *helper { func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { t.Helper() - require.Greater(t, opts.total, uint32(0)) + require.Positive(t, opts.total) cl := opts.client if cl == nil { cl = tests.EmbeddedETCDBareClient(t) @@ -569,12 +745,12 @@ func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { var triggered atomic.Int64 var a api.Interface allCrns := make([]api.Interface, opts.total) - for i := 0; i < int(opts.total); i++ { + for i := range opts.total { c, err := New(Options{ Log: logr.Discard(), Client: cl, Namespace: "abc", - PartitionID: uint32(i), + PartitionID: i, PartitionTotal: opts.total, TriggerFn: func(_ context.Context, req *api.TriggerRequest) bool { defer func() { triggered.Add(1) }() @@ -601,7 +777,7 @@ func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { closeOnce := sync.OnceFunc(func() { cancel() - for i := 0; i < int(opts.total); i++ { + for range opts.total { select { case err := <-errCh: require.NoError(t, err) @@ -611,7 +787,7 @@ func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { } }) t.Cleanup(closeOnce) - for i := uint32(0); i < opts.total; i++ { + for i := range opts.total { go func(i uint32) { errCh <- allCrns[i].Run(ctx) }(i) @@ -619,7 +795,7 @@ func testCronWithOptions(t *testing.T, opts testCronOptions) *helper { return &helper{ ctx: ctx, - client: client.New(cl), + client: client.New(client.Options{Client: cl, Log: logr.Discard()}), api: a, allCrons: allCrns, triggered: &triggered, diff --git a/flake.lock b/flake.lock index 6970584..4e4aa5e 100644 --- a/flake.lock +++ b/flake.lock @@ -26,11 +26,11 @@ ] }, "locked": { - "lastModified": 1725515722, - "narHash": "sha256-+gljgHaflZhQXtr3WjJrGn8NXv7MruVPAORSufuCFnw=", + "lastModified": 1728509152, + "narHash": "sha256-tQo1rg3TlwgyI8eHnLvZSlQx9d/o2Rb4oF16TfaTOw0=", "owner": "nix-community", "repo": "gomod2nix", - "rev": "1c6fd4e862bf2f249c9114ad625c64c6c29a8a08", + "rev": "d5547e530464c562324f171006fc8f639aa01c9f", "type": "github" }, "original": { @@ -41,11 +41,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1727122398, - "narHash": "sha256-o8VBeCWHBxGd4kVMceIayf5GApqTavJbTa44Xcg5Rrk=", + "lastModified": 1728492678, + "narHash": "sha256-9UTxR8eukdg+XZeHgxW5hQA9fIKHsKCdOIUycTryeVw=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "30439d93eb8b19861ccbe3e581abf97bdc91b093", + "rev": "5633bcff0c6162b9e4b5f1264264611e950c8ec7", "type": "github" }, "original": { diff --git a/internal/api/api.go b/internal/api/api.go index 97ab4c0..1a961c4 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -23,9 +23,7 @@ import ( "github.com/diagridio/go-etcd-cron/internal/scheduler" ) -var ( - errAPIClosed = errors.New("api is closed") -) +var errAPIClosed = errors.New("api is closed") type Options struct { Client client.Interface diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 0fa96e7..94e238d 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" cronapi "github.com/diagridio/go-etcd-cron/api" "github.com/diagridio/go-etcd-cron/internal/garbage" @@ -24,9 +25,7 @@ import ( "github.com/diagridio/go-etcd-cron/tests" ) -var ( - errCancel = errors.New("custom cancel") -) +var errCancel = errors.New("custom cancel") func Test_CRUD(t *testing.T) { t.Parallel() @@ -47,16 +46,31 @@ func Test_CRUD(t *testing.T) { require.NoError(t, err) assert.Equal(t, &cronapi.Job{ DueTime: ptr.Of(now.Add(time.Hour).Format(time.RFC3339)), + FailurePolicy: &cronapi.FailurePolicy{Policy: &cronapi.FailurePolicy_Constant{ + Constant: &cronapi.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }, + }}, }, resp) newNow := time.Now() require.NoError(t, api.Add(context.Background(), "def", &cronapi.Job{ DueTime: ptr.Of(newNow.Add(time.Hour).Format(time.RFC3339)), + FailurePolicy: &cronapi.FailurePolicy{Policy: &cronapi.FailurePolicy_Constant{ + Constant: &cronapi.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }, + }}, })) resp, err = api.Get(context.Background(), "def") require.NoError(t, err) assert.Equal(t, &cronapi.Job{ DueTime: ptr.Of(newNow.Add(time.Hour).Format(time.RFC3339)), + FailurePolicy: &cronapi.FailurePolicy{Policy: &cronapi.FailurePolicy_Constant{ + Constant: &cronapi.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }, + }}, }, resp) require.NoError(t, api.Delete(context.Background(), "def")) @@ -67,12 +81,22 @@ func Test_CRUD(t *testing.T) { require.NoError(t, api.Add(context.Background(), "def", &cronapi.Job{ DueTime: ptr.Of(now.Add(time.Hour).Format(time.RFC3339)), + FailurePolicy: &cronapi.FailurePolicy{Policy: &cronapi.FailurePolicy_Constant{ + Constant: &cronapi.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }, + }}, })) resp, err = api.Get(context.Background(), "def") require.NoError(t, err) assert.Equal(t, &cronapi.Job{ DueTime: ptr.Of(now.Add(time.Hour).Format(time.RFC3339)), + FailurePolicy: &cronapi.FailurePolicy{Policy: &cronapi.FailurePolicy_Constant{ + Constant: &cronapi.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }, + }}, }, resp) } diff --git a/internal/api/serve_test.go b/internal/api/serve_test.go index ce14c0d..ddd4c2e 100644 --- a/internal/api/serve_test.go +++ b/internal/api/serve_test.go @@ -10,11 +10,11 @@ import ( "testing" "time" + "github.com/dapr/kit/ptr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "github.com/dapr/kit/ptr" "github.com/diagridio/go-etcd-cron/api" "github.com/diagridio/go-etcd-cron/tests/cron" ) @@ -52,7 +52,7 @@ func Test_Get(t *testing.T) { resp, err = c.Get(context.Background(), "xxx") require.NoError(t, err) - assert.Equal(t, ptr.Of("@every 1s"), resp.Schedule) + assert.Equal(t, "@every 1s", resp.GetSchedule()) } func Test_Delete(t *testing.T) { @@ -78,7 +78,7 @@ func Test_Delete(t *testing.T) { require.NoError(t, c.Delete(context.Background(), "abc")) assert.EventuallyWithT(t, func(co *assert.CollectT) { - assert.Len(co, c.Counters(t).Kvs, 0) + assert.Empty(co, c.Counters(t).Kvs) }, time.Second*3, time.Millisecond*10) current := c.Calls.Load() @@ -92,7 +92,7 @@ func Test_Delete(t *testing.T) { cr := cron.TripplePartitionRun(t) - for i := 0; i < 100; i++ { + for i := range 100 { require.NoError(t, cr.Add(context.Background(), "a"+strconv.Itoa(i), &api.Job{Schedule: ptr.Of("@every 1s")})) } @@ -101,7 +101,7 @@ func Test_Delete(t *testing.T) { assert.Len(c, cr.Counters(t).Kvs, 100) }, time.Second*10, time.Millisecond*10) - for i := 0; i < 100; i++ { + for i := range 100 { require.NoError(t, cr.Delete(context.Background(), "a"+strconv.Itoa(i))) } @@ -135,7 +135,7 @@ func Test_DeletePrefixes(t *testing.T) { assert.Len(t, cr.Jobs(t).Kvs, 2) assert.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Greater(t, cr.Calls.Load(), int64(0)) + assert.Positive(t, cr.Calls.Load()) }, time.Second*3, time.Millisecond*10) assert.EventuallyWithT(t, func(c *assert.CollectT) { @@ -185,7 +185,7 @@ func Test_DeletePrefixes(t *testing.T) { cr := cron.TripplePartitionRun(t) - for i := 0; i < 100; i++ { + for i := range 100 { require.NoError(t, cr.Add(context.Background(), "a"+strconv.Itoa(i), &api.Job{Schedule: ptr.Of("@every 1s")})) } @@ -238,7 +238,7 @@ func Test_List(t *testing.T) { resp, err := cron.List(context.Background(), "") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) }) t.Run("List should return jobs which are in the namespace", func(t *testing.T) { @@ -248,7 +248,7 @@ func Test_List(t *testing.T) { resp, err := cron.List(context.Background(), "") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) now := time.Now() require.NoError(t, cron.Add(context.Background(), "a123", &api.Job{ @@ -260,46 +260,46 @@ func Test_List(t *testing.T) { resp, err = cron.List(context.Background(), "") require.NoError(t, err) - assert.Len(t, resp.Jobs, 2) + assert.Len(t, resp.GetJobs(), 2) resp, err = cron.List(context.Background(), "a") require.NoError(t, err) - assert.Len(t, resp.Jobs, 2) + assert.Len(t, resp.GetJobs(), 2) resp, err = cron.List(context.Background(), "a1") require.NoError(t, err) - assert.Len(t, resp.Jobs, 1) + assert.Len(t, resp.GetJobs(), 1) resp, err = cron.List(context.Background(), "a123") require.NoError(t, err) - assert.Len(t, resp.Jobs, 1) + assert.Len(t, resp.GetJobs(), 1) resp, err = cron.List(context.Background(), "a345") require.NoError(t, err) - assert.Len(t, resp.Jobs, 1) + assert.Len(t, resp.GetJobs(), 1) resp, err = cron.List(context.Background(), "1") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) resp, err = cron.List(context.Background(), "b123") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) require.NoError(t, cron.Delete(context.Background(), "a123")) resp, err = cron.List(context.Background(), "a") require.NoError(t, err) - assert.Len(t, resp.Jobs, 1) + assert.Len(t, resp.GetJobs(), 1) resp, err = cron.List(context.Background(), "a123") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) resp, err = cron.List(context.Background(), "a345") require.NoError(t, err) - assert.Len(t, resp.Jobs, 1) + assert.Len(t, resp.GetJobs(), 1) require.NoError(t, cron.Delete(context.Background(), "a345")) resp, err = cron.List(context.Background(), "a") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) resp, err = cron.List(context.Background(), "a123") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) resp, err = cron.List(context.Background(), "a345") require.NoError(t, err) - assert.Empty(t, resp.Jobs) + assert.Empty(t, resp.GetJobs()) }) } diff --git a/internal/api/stored/counter.pb.go b/internal/api/stored/counter.pb.go index be0c22d..3cbf13a 100644 --- a/internal/api/stored/counter.pb.go +++ b/internal/api/stored/counter.pb.go @@ -41,6 +41,10 @@ type Counter struct { // last_trigger is the timestamp the job was last triggered. Used to // determine the next time the job should be triggered. LastTrigger *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_trigger,json=lastTrigger,proto3" json:"last_trigger,omitempty"` + // attempts is the number of times the job has been attempted to be triggered + // at this count. Used by failure policy to track how many times the Job + // trigger should be retried. + Attempts uint32 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"` } func (x *Counter) Reset() { @@ -96,6 +100,13 @@ func (x *Counter) GetLastTrigger() *timestamppb.Timestamp { return nil } +func (x *Counter) GetAttempts() uint32 { + if x != nil { + return x.Attempts + } + return 0 +} + var File_proto_stored_counter_proto protoreflect.FileDescriptor var file_proto_stored_counter_proto_rawDesc = []byte{ @@ -103,7 +114,7 @@ var file_proto_stored_counter_proto_rawDesc = []byte{ 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x88, 0x01, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x65, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x01, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x28, 0x0a, 0x10, 0x6a, 0x6f, 0x62, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x6a, 0x6f, 0x62, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, @@ -112,11 +123,12 @@ var file_proto_stored_counter_proto_rawDesc = []byte{ 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, - 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, - 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, - 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x42, 0x37, 0x5a, 0x35, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, + 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, + 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/api/validator/validator_test.go b/internal/api/validator/validator_test.go index ac72fc1..19a8f43 100644 --- a/internal/api/validator/validator_test.go +++ b/internal/api/validator/validator_test.go @@ -101,11 +101,10 @@ func Test_JobName(t *testing.T) { } for _, test := range tests { - name := test.name - expErr := test.expErr t.Run(test.name, func(t *testing.T) { - err := New(Options{}).JobName(name) - assert.Equal(t, expErr, err != nil, "%v", err) + t.Parallel() + err := New(Options{}).JobName(test.name) + assert.Equal(t, test.expErr, err != nil, "%v", err) }) } } diff --git a/internal/client/client.go b/internal/client/client.go index 1668e8d..6e46809 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -10,6 +10,7 @@ import ( "errors" "time" + "github.com/go-logr/logr" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/utils/clock" @@ -23,45 +24,52 @@ type Interface interface { DeleteMulti(keys ...string) error } +type Options struct { + Client *clientv3.Client + Log logr.Logger +} + type client struct { *clientv3.Client kv clientv3.KV + log logr.Logger clock clock.Clock } -func New(cl *clientv3.Client) Interface { +func New(opts Options) Interface { var kv clientv3.KV - if cl != nil { - kv = cl.KV + if opts.Client != nil { + kv = opts.Client.KV } return &client{ - Client: cl, + Client: opts.Client, kv: kv, + log: opts.Log, clock: clock.RealClock{}, } } func (c *client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { - return genericPP[string, string, clientv3.OpOption, clientv3.PutResponse](ctx, c, c.kv.Put, key, val, opts...) + return genericPP[string, string, clientv3.OpOption, clientv3.PutResponse](ctx, c.log, c, c.kv.Put, key, val, opts...) } func (c *client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - return genericP[string, clientv3.OpOption, clientv3.GetResponse](ctx, c, c.kv.Get, key, opts...) + return genericP[string, clientv3.OpOption, clientv3.GetResponse](ctx, c.log, c, c.kv.Get, key, opts...) } func (c *client) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { - return genericP[string, clientv3.OpOption, clientv3.DeleteResponse](ctx, c, c.kv.Delete, key, opts...) + return genericP[string, clientv3.OpOption, clientv3.DeleteResponse](ctx, c.log, c, c.kv.Delete, key, opts...) } func (c *client) DeleteMulti(keys ...string) error { for i := 0; i < len(keys); i += 128 { batchI := min(128, len(keys)-i) ops := make([]clientv3.Op, batchI) - for j := 0; j < batchI; j++ { + for j := range batchI { ops[j] = clientv3.OpDelete(keys[i+j]) } - err := generic(context.Background(), c, func(ctx context.Context) error { + err := generic(context.Background(), c.log, c, func(ctx context.Context) error { _, terr := c.kv.Txn(ctx).Then(ops...).Commit() return terr }) @@ -75,10 +83,10 @@ func (c *client) DeleteMulti(keys ...string) error { type genericPPFunc[T any, K any, O any, R any] func(context.Context, T, K, ...O) (*R, error) -func genericPP[T any, K any, O any, R any](ctx context.Context, c *client, op genericPPFunc[T, K, O, R], t T, k K, o ...O) (*R, error) { +func genericPP[T any, K any, O any, R any](ctx context.Context, log logr.Logger, c *client, op genericPPFunc[T, K, O, R], t T, k K, o ...O) (*R, error) { var r *R var err error - return r, generic(ctx, c, func(ctx context.Context) error { + return r, generic(ctx, log, c, func(ctx context.Context) error { r, err = op(ctx, t, k, o...) return err }) @@ -86,16 +94,16 @@ func genericPP[T any, K any, O any, R any](ctx context.Context, c *client, op ge type genericPFunc[T any, O any, R any] func(context.Context, T, ...O) (*R, error) -func genericP[T any, O any, R any](ctx context.Context, c *client, op genericPFunc[T, O, R], t T, o ...O) (*R, error) { +func genericP[T any, O any, R any](ctx context.Context, log logr.Logger, c *client, op genericPFunc[T, O, R], t T, o ...O) (*R, error) { var r *R var err error - return r, generic(ctx, c, func(ctx context.Context) error { + return r, generic(ctx, log, c, func(ctx context.Context) error { r, err = op(ctx, t, o...) return err }) } -func generic(ctx context.Context, c *client, op func(context.Context) error) error { +func generic(ctx context.Context, log logr.Logger, c *client, op func(context.Context) error) error { for { ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() @@ -109,6 +117,8 @@ func generic(ctx context.Context, c *client, op func(context.Context) error) err return err } + log.Error(err, "etcd client request rate limited, waiting before retrying") + select { case <-c.clock.After(time.Second): case <-ctx.Done(): diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 363debe..3ce5afc 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -8,58 +8,16 @@ package client import ( "context" "errors" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - clientv3 "go.etcd.io/etcd/client/v3" clocktesting "k8s.io/utils/clock/testing" -) - -type mock struct { - clientv3.KV - calls atomic.Uint32 - err error -} - -func (m *mock) If(...clientv3.Cmp) clientv3.Txn { - return m -} -func (m *mock) Else(...clientv3.Op) clientv3.Txn { - return m -} - -func (m *mock) Then(...clientv3.Op) clientv3.Txn { - return m -} - -func (m *mock) Txn(context.Context) clientv3.Txn { - return m -} - -func (m *mock) Put(context.Context, string, string, ...clientv3.OpOption) (*clientv3.PutResponse, error) { - m.calls.Add(1) - return nil, m.err -} - -func (m *mock) Get(context.Context, string, ...clientv3.OpOption) (*clientv3.GetResponse, error) { - m.calls.Add(1) - return nil, m.err -} - -func (m *mock) Delete(context.Context, string, ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { - m.calls.Add(1) - return nil, m.err -} - -func (m *mock) Commit() (*clientv3.TxnResponse, error) { - m.calls.Add(1) - return nil, m.err -} + "github.com/diagridio/go-etcd-cron/internal/client/fake" +) func Test_Delete(t *testing.T) { t.Parallel() @@ -90,25 +48,24 @@ func Test_Delete(t *testing.T) { t.Run("Successful should return nil", func(t *testing.T) { t.Parallel() - kv := new(mock) + kv := fake.New() require.NoError(t, testInLoop(&client{kv: kv})) - assert.Equal(t, uint32(1), kv.calls.Load()) + assert.Equal(t, uint32(1), kv.Calls()) }) t.Run("With error should return error", func(t *testing.T) { t.Parallel() - kv := new(mock) - kv.err = errors.New("this is an error") + kv := fake.New().WithError(errors.New("this is an error")) require.Error(t, testInLoop(&client{kv: kv})) - assert.Equal(t, uint32(1), kv.calls.Load()) + assert.Equal(t, uint32(1), kv.Calls()) }) t.Run("Too many request errors should be retried until successful", func(t *testing.T) { t.Parallel() clock := clocktesting.NewFakeClock(time.Now()) - kv := &mock{err: rpctypes.ErrTooManyRequests} + kv := fake.New().WithError(rpctypes.ErrTooManyRequests) errCh := make(chan error) t.Cleanup(func() { select { @@ -117,7 +74,7 @@ func Test_Delete(t *testing.T) { case <-time.After(time.Second): assert.Fail(t, "timeout waiting for err") } - assert.Equal(t, uint32(4), kv.calls.Load()) + assert.Equal(t, uint32(4), kv.Calls()) }) go func() { @@ -129,7 +86,7 @@ func Test_Delete(t *testing.T) { assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10) clock.Sleep(time.Second) assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10) - kv.err = nil + kv.WithError(nil) clock.Sleep(time.Second) }) @@ -137,7 +94,7 @@ func Test_Delete(t *testing.T) { t.Parallel() clock := clocktesting.NewFakeClock(time.Now()) - kv := &mock{err: rpctypes.ErrTooManyRequests} + kv := fake.New().WithError(rpctypes.ErrTooManyRequests) errCh := make(chan error) t.Cleanup(func() { select { @@ -146,7 +103,7 @@ func Test_Delete(t *testing.T) { case <-time.After(time.Second): assert.Fail(t, "timeout waiting for err") } - assert.Equal(t, uint32(4), kv.calls.Load()) + assert.Equal(t, uint32(4), kv.Calls()) }) go func() { @@ -158,7 +115,7 @@ func Test_Delete(t *testing.T) { assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10) clock.Sleep(time.Second) assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10) - kv.err = errors.New("this is an error") + kv.WithError(errors.New("this is an error")) clock.Sleep(time.Second) }) }) diff --git a/internal/client/fake/fake.go b/internal/client/fake/fake.go new file mode 100644 index 0000000..3856126 --- /dev/null +++ b/internal/client/fake/fake.go @@ -0,0 +1,117 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package fake + +import ( + "context" + "sync/atomic" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +type Fake struct { + clientv3.KV + clientv3.Watcher + clientv3.Lease + + calls atomic.Uint32 + putFn func(context.Context, string, string, ...clientv3.OpOption) (*clientv3.PutResponse, error) + getFn func(context.Context, string, ...clientv3.OpOption) (*clientv3.GetResponse, error) + delFn func(context.Context, string, ...clientv3.OpOption) (*clientv3.DeleteResponse, error) + delMu func(...string) error + + err error +} + +func New() *Fake { + return new(Fake) +} + +func (f *Fake) WithPutFn(fn func(context.Context, string, string, ...clientv3.OpOption) (*clientv3.PutResponse, error)) *Fake { + f.putFn = fn + return f +} + +func (f *Fake) WithGetFn(fn func(context.Context, string, ...clientv3.OpOption) (*clientv3.GetResponse, error)) *Fake { + f.getFn = fn + return f +} + +func (f *Fake) WithDeleteFn(fn func(context.Context, string, ...clientv3.OpOption) (*clientv3.DeleteResponse, error)) *Fake { + f.delFn = fn + return f +} + +func (f *Fake) WithDeleteMultiFn(fn func(...string) error) *Fake { + f.delMu = fn + return f +} + +func (f *Fake) WithError(err error) *Fake { + f.err = err + return f +} + +func (f *Fake) If(...clientv3.Cmp) clientv3.Txn { + return f +} + +func (f *Fake) Else(...clientv3.Op) clientv3.Txn { + return f +} + +func (f *Fake) Then(...clientv3.Op) clientv3.Txn { + return f +} + +func (f *Fake) Txn(context.Context) clientv3.Txn { + return f +} + +func (f *Fake) Put(_ context.Context, k string, b string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { + f.calls.Add(1) + if f.putFn != nil { + return f.putFn(context.Background(), k, b) + } + return nil, f.err +} + +func (f *Fake) Get(_ context.Context, k string, _ ...clientv3.OpOption) (*clientv3.GetResponse, error) { + f.calls.Add(1) + if f.getFn != nil { + return f.getFn(context.Background(), k) + } + return nil, f.err +} + +func (f *Fake) Delete(_ context.Context, k string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + f.calls.Add(1) + if f.delFn != nil { + return f.delFn(context.Background(), k) + } + return nil, f.err +} + +func (f *Fake) Commit() (*clientv3.TxnResponse, error) { + f.calls.Add(1) + return nil, f.err +} + +func (f *Fake) DeleteMulti(keys ...string) error { + f.calls.Add(1) + if f.delMu != nil { + return f.delMu(keys...) + } + return f.err +} + +func (f *Fake) Close() error { + return f.err +} + +func (f *Fake) Calls() uint32 { + return f.calls.Load() +} diff --git a/internal/client/fake/fake_test.go b/internal/client/fake/fake_test.go new file mode 100644 index 0000000..0309c19 --- /dev/null +++ b/internal/client/fake/fake_test.go @@ -0,0 +1,17 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package fake_test + +import ( + "testing" + + "github.com/diagridio/go-etcd-cron/internal/client" + "github.com/diagridio/go-etcd-cron/internal/client/fake" +) + +func Test_Fake(*testing.T) { + var _ client.Interface = fake.New() +} diff --git a/internal/counter/counter.go b/internal/counter/counter.go index 69f2a43..93f09a8 100644 --- a/internal/counter/counter.go +++ b/internal/counter/counter.go @@ -142,13 +142,35 @@ func New(ctx context.Context, opts Options) (*Counter, bool, error) { return c, true, nil } -// Trigger updates the counter state given what the next trigger time was. -// Returns true if the job will be triggered again. -func (c *Counter) Trigger(ctx context.Context) (bool, error) { - // Increment the counter and update the last trigger time as the next trigger - // time. +// ScheduledTime is the time at which the job is scheduled to be triggered +// next. Implements the kit events queueable item. +func (c *Counter) ScheduledTime() time.Time { + return c.next +} + +// Key returns the name of the job. Implements the kit events queueable item. +func (c *Counter) Key() string { + return c.jobKey +} + +// TriggerRequest is the trigger request representation for the job. +func (c *Counter) TriggerRequest() *api.TriggerRequest { + return c.triggerRequest +} + +// TriggerSuccess updates the counter state given what the next trigger time +// was. Returns true if the job will be triggered again. +func (c *Counter) TriggerSuccess(ctx context.Context) (bool, error) { + // Update the last trigger time as the next trigger time, and increment the + // counter. + // Set attempts to 0 as this trigger was successful. + //nolint:protogetter + if lt := c.schedule.Next(c.count.GetCount(), c.count.LastTrigger); lt != nil { + c.count.LastTrigger = timestamppb.New(*lt) + } c.count.Count++ - c.count.LastTrigger = timestamppb.New(c.next) + c.count.Attempts = 0 + if ok, err := c.tickNext(); err != nil || !ok { return false, err } @@ -163,43 +185,91 @@ func (c *Counter) Trigger(ctx context.Context) (bool, error) { return true, err } -// ScheduledTime is the time at which the job is scheduled to be triggered -// next. Implements the kit events queueable item. -func (c *Counter) ScheduledTime() time.Time { - return c.next -} +// TriggerFailed is called when trigging the job has been marked as failed from +// the consumer. The counter is persisted at every attempt to ensure the number +// of attempts are durable. +// Returns true if the job failure policy indicates that the job should be +// tried again. Returns false if the job should not be attempted again and was +// deleted. +func (c *Counter) TriggerFailed(ctx context.Context) (bool, error) { + // Increment the attempts counter as this count tick failed. + c.count.Attempts++ + + // If the failure policy indicates that this tick should not be tried again, + // we set the attempts to 0 and move to the next tick. + if !c.policyTryAgain() { + c.count.Count++ + c.count.Attempts = 0 + if ok, err := c.tickNext(); err != nil || !ok { + return false, err + } + } -// Key returns the name of the job. Implements the kit events queueable item. -func (c *Counter) Key() string { - return c.jobKey + b, err := proto.Marshal(c.count) + if err != nil { + return true, err + } + + // Update the counter in etcd and return the next trigger time. + _, err = c.client.Put(ctx, c.counterKey, string(b)) + return true, err } -// TriggerRequest is the trigger request representation for the job. -func (c *Counter) TriggerRequest() *api.TriggerRequest { - return c.triggerRequest +// policyTryAgain returns true if the failure policy indicates this job should +// be tried again at this tick. +func (c *Counter) policyTryAgain() bool { + fp := c.job.GetJob().GetFailurePolicy() + if fp == nil { + c.count.LastTrigger = timestamppb.New(c.next) + return false + } + + //nolint:protogetter + switch p := fp.Policy.(type) { + case *api.FailurePolicy_Drop: + c.count.LastTrigger = timestamppb.New(c.next) + return false + case *api.FailurePolicy_Constant: + // Attempts need to be MaxRetries+1 for this counter tick to be dropped. + //nolint:protogetter + tryAgain := p.Constant.MaxRetries == nil || *p.Constant.MaxRetries >= c.count.Attempts + if tryAgain { + c.next = c.next.Add(p.Constant.GetInterval().AsDuration()) + } else { + // We set the LastTrigger to the first attempt to ensure consistency of + // the Job schedule, regardless of the failure policy cadence and + // attempts. + //nolint:protogetter + if lt := c.schedule.Next(c.count.GetCount(), c.count.LastTrigger); lt != nil { + c.count.LastTrigger = timestamppb.New(*lt) + } + } + return tryAgain + default: + c.count.LastTrigger = timestamppb.New(c.next) + return false + } } // tickNext updates the next trigger time, and deletes the counter record if // needed. -// -//nonlint:contextcheck func (c *Counter) tickNext() (bool, error) { - if !c.updateNext() { - if err := c.client.DeleteMulti(c.jobKey); err != nil { - return false, err - } - // Mark the job as just been deleted, and push the counter key for garbage - // collection. - c.yard.Deleted(c.jobKey) - c.collector.Push(c.counterKey) - return false, nil + if c.updateNext() { + return true, nil } - return true, nil + if err := c.client.DeleteMulti(c.jobKey); err != nil { + return false, err + } + // Mark the job as just been deleted, and push the counter key for garbage + // collection. + c.yard.Deleted(c.jobKey) + c.collector.Push(c.counterKey) + return false, nil } // updateNext updates the counter's next trigger time. -// returns false if the job and counter should be deleted because it has +// Returns false if the job and counter should be deleted because it has // expired. func (c *Counter) updateNext() bool { // If job completed repeats, delete the counter. diff --git a/internal/counter/counter_test.go b/internal/counter/counter_test.go index b801321..018a99d 100644 --- a/internal/counter/counter_test.go +++ b/internal/counter/counter_test.go @@ -7,17 +7,21 @@ package counter import ( "context" + "sync/atomic" "testing" "time" "github.com/dapr/kit/ptr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/diagridio/go-etcd-cron/api" "github.com/diagridio/go-etcd-cron/internal/api/stored" + "github.com/diagridio/go-etcd-cron/internal/client/fake" "github.com/diagridio/go-etcd-cron/internal/garbage" "github.com/diagridio/go-etcd-cron/internal/grave" "github.com/diagridio/go-etcd-cron/internal/key" @@ -50,7 +54,7 @@ func Test_New(t *testing.T) { JobPartitionId: 123, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -122,7 +126,7 @@ func Test_New(t *testing.T) { JobPartitionId: 123, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -206,7 +210,7 @@ func Test_New(t *testing.T) { JobPartitionId: 456, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -297,7 +301,7 @@ func Test_New(t *testing.T) { JobPartitionId: 123, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -373,7 +377,7 @@ func Test_New(t *testing.T) { }, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -431,7 +435,7 @@ func Test_New(t *testing.T) { }) } -func Test_Trigger(t *testing.T) { +func Test_TriggerSuccess(t *testing.T) { t.Parallel() t.Run("if tick next is true, expect job be kept and counter to incremented", func(t *testing.T) { @@ -442,8 +446,8 @@ func Test_Trigger(t *testing.T) { now := time.Now().UTC() job := &stored.Job{ - Begin: &stored.Job_Start{ - Start: timestamppb.New(now), + Begin: &stored.Job_DueTime{ + DueTime: timestamppb.New(now), }, Job: &api.Job{ DueTime: ptr.Of(now.Format(time.RFC3339)), @@ -452,7 +456,7 @@ func Test_Trigger(t *testing.T) { } counter := &stored.Counter{LastTrigger: nil, JobPartitionId: 123} - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -487,7 +491,7 @@ func Test_Trigger(t *testing.T) { next: now, } - ok, err := c.Trigger(ctx) + ok, err := c.TriggerSuccess(ctx) require.NoError(t, err) assert.True(t, ok) @@ -540,7 +544,7 @@ func Test_Trigger(t *testing.T) { Count: 0, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -575,7 +579,7 @@ func Test_Trigger(t *testing.T) { counterKey: "abc/counters/1", } - ok, err := c.Trigger(ctx) + ok, err := c.TriggerSuccess(ctx) require.NoError(t, err) assert.False(t, ok) @@ -597,6 +601,92 @@ func Test_Trigger(t *testing.T) { require.NoError(t, err) require.Empty(t, resp.Kvs) }) + + t.Run("The number of attempts on the counter should always be reset to 0 when Trigger is called", func(t *testing.T) { + t.Parallel() + + client := tests.EmbeddedETCD(t) + + now := time.Now().UTC() + + job := &stored.Job{ + Begin: &stored.Job_DueTime{ + DueTime: timestamppb.New(now), + }, + Job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + }, + } + counter := &stored.Counter{LastTrigger: nil, JobPartitionId: 123, Attempts: 456} + + sched, err := scheduler.NewBuilder().Schedule(job) + require.NoError(t, err) + + jobBytes, err := proto.Marshal(job) + require.NoError(t, err) + counterBytes, err := proto.Marshal(counter) + require.NoError(t, err) + + _, err = client.Put(context.Background(), "abc/jobs/1", string(jobBytes)) + require.NoError(t, err) + _, err = client.Put(context.Background(), "abc/counters/1", string(counterBytes)) + require.NoError(t, err) + + collector, err := garbage.New(garbage.Options{Client: client}) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + go func() { + errCh <- collector.Run(ctx) + }() + + yard := grave.New() + c := &Counter{ + yard: yard, + client: client, + collector: collector, + job: job, + count: counter, + schedule: sched, + jobKey: "abc/jobs/1", + counterKey: "abc/counters/1", + next: now, + } + + ok, err := c.TriggerSuccess(ctx) + require.NoError(t, err) + assert.True(t, ok) + + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timedout waiting for the collector to finish") + } + + assert.False(t, yard.HasJustDeleted("abc/jobs/1")) + + resp, err := client.Get(context.Background(), "abc/jobs/1") + require.NoError(t, err) + require.Len(t, resp.Kvs, 1) + assert.Equal(t, jobBytes, resp.Kvs[0].Value) + + counterBytes, err = proto.Marshal(&stored.Counter{ + LastTrigger: timestamppb.New(now), + JobPartitionId: 123, + Count: 1, + Attempts: 0, + }) + require.NoError(t, err) + + resp, err = client.Get(context.Background(), "abc/counters/1") + require.NoError(t, err) + require.Len(t, resp.Kvs, 1) + assert.Equal(t, counterBytes, resp.Kvs[0].Value) + }) } func Test_tickNext(t *testing.T) { @@ -619,7 +709,7 @@ func Test_tickNext(t *testing.T) { } counter := &stored.Counter{LastTrigger: nil, JobPartitionId: 123} - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -699,7 +789,7 @@ func Test_tickNext(t *testing.T) { Count: 1, } - sched, err := scheduler.NewBuilder().Scheduler(job) + sched, err := scheduler.NewBuilder().Schedule(job) require.NoError(t, err) jobBytes, err := proto.Marshal(job) @@ -764,7 +854,7 @@ func Test_updateNext(t *testing.T) { builder := scheduler.NewBuilder() - oneshot, err := builder.Scheduler(&stored.Job{ + oneshot, err := builder.Schedule(&stored.Job{ Begin: &stored.Job_DueTime{ DueTime: timestamppb.New(now), }, @@ -774,7 +864,7 @@ func Test_updateNext(t *testing.T) { }) require.NoError(t, err) - repeats, err := builder.Scheduler(&stored.Job{ + repeats, err := builder.Schedule(&stored.Job{ Begin: &stored.Job_Start{ Start: timestamppb.New(now), }, @@ -785,7 +875,7 @@ func Test_updateNext(t *testing.T) { }) require.NoError(t, err) - expires, err := builder.Scheduler(&stored.Job{ + expires, err := builder.Schedule(&stored.Job{ Begin: &stored.Job_Start{ Start: timestamppb.New(now), }, @@ -890,13 +980,820 @@ func Test_updateNext(t *testing.T) { } for name, test := range tests { - counter := test.counter - exp := test.exp - expNext := test.expNext t.Run(name, func(t *testing.T) { t.Parallel() - assert.Equal(t, exp, counter.updateNext()) - assert.Equal(t, expNext, counter.next) + assert.Equal(t, test.exp, test.counter.updateNext()) + assert.Equal(t, test.expNext, test.counter.next) }) } } + +func Test_TriggerFailed(t *testing.T) { + t.Parallel() + + now := time.Now().UTC().Truncate(time.Second) + + type putExp struct { + key string + counter *stored.Counter + } + + tests := map[string]struct { + job *api.Job + count uint32 + attempts uint32 + next *time.Time + lastTriggerTime *timestamppb.Timestamp + exp bool + expNext *time.Time + expPut *putExp + expDel *string + }{ + "no failure policy defined, just due time, expect job deleted": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: nil, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: false, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "no failure policy defined, schedule, expect counter forward": { + job: &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: nil, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "no failure policy Policy defined, just due time, expect job delete": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: new(api.FailurePolicy), + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: false, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "no failure policy Policy defined, schedule, expect counter forward": { + job: &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: new(api.FailurePolicy), + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "failure policy Drop defined, just due time, expect job delete": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: new(api.FailurePolicy_Drop), + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: false, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "failure policy Drop defined, schedule, expect counter forward": { + job: &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: new(api.FailurePolicy_Drop), + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "failure policy Drop defined, schedule and count is not up, expect count forward": { + job: &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(now.Format(time.RFC3339)), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: new(api.FailurePolicy_Drop), + }, + }, + count: 1, + attempts: 0, + next: ptr.Of(now.Add(time.Second)), + lastTriggerTime: timestamppb.New(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 2)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 2, + LastTrigger: timestamppb.New(now.Add(time.Second)), + }, + }, + expDel: nil, + }, + "failure policy Drop defined, schedule but count is up, expect job delete": { + job: &api.Job{ + Schedule: ptr.Of("@every 1s"), + DueTime: ptr.Of(now.Format(time.RFC3339)), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: new(api.FailurePolicy_Drop), + }, + }, + count: 2, + attempts: 0, + next: ptr.Of(now), + exp: false, + expNext: nil, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "failure policy Constant default, nil interval and nil max retries, one shot, expect true with Put and next now": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: nil, MaxRetries: nil, + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 1, + }, + }, + expDel: nil, + }, + "failure policy Constant default, nil interval and 0 max retries, one shot, expect false with del": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: nil, MaxRetries: ptr.Of(uint32(0)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: false, + expNext: nil, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "failure policy Constant default, nil interval and 0 max retries, schedule, expect true with count forward": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: nil, MaxRetries: ptr.Of(uint32(0)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + Attempts: 0, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "failure policy Constant default, nil interval and 0 max retries, schedule but at expiration, expect false with job delete": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: nil, MaxRetries: ptr.Of(uint32(0)), + }, + }, + }, + }, + count: 2, + attempts: 0, + next: ptr.Of(now), + exp: false, + expNext: nil, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "failure policy Constant default, 3s interval and nil max retries, one shot, expect true with Put and next now+3s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: nil, + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 3)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 1, + }, + }, + expDel: nil, + }, + "failure policy Constant default, 3s interval and nil max retries, schedule, expect true with Put and next now+3s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: nil, + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 3)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 1, + }, + }, + expDel: nil, + }, + "failure policy Constant default, 3s interval and nil max retries, schedule but at expiration, expect true with Put and next now+9": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: nil, + }, + }, + }, + }, + count: 2, + attempts: 0, + next: ptr.Of(now.Add(time.Second * 6)), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 9)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 2, + Attempts: 1, + }, + }, + expDel: nil, + }, + "failure policy Constant default, 3s interval and nil max retries, schedule attempt 5, expect true with Put and next now+3*5": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: nil, + }, + }, + }, + }, + count: 0, + attempts: 4, + next: ptr.Of(now.Add(time.Second * 3 * 4)), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 3 * 5)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 5, + }, + }, + expDel: nil, + }, + "failure policy Constant default, 0s interval and nil max retries, schedule attempt 5, expect true with Put and next now": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(0), MaxRetries: nil, + }, + }, + }, + }, + count: 0, + attempts: 4, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 5, + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 0 max retries with 0 attempts, schedule, expect true with Put and next now+5s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(0)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + Attempts: 0, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 0 max retries with 0 attempts, one shot, expect false with del job:": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(0)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: false, + expNext: nil, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + "if failure policy is constant with 5s interval and 1 max retries with 0 attempts, schedule, expect true with Put and next now+5s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(1)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 5)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 1, + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 1 max retries with 0 attempts, oneshot, expect true with Put and next now+5s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(1)), + }, + }, + }, + }, + count: 0, + attempts: 0, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 5)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 1, + }, + }, + expDel: nil, + }, + + "if failure policy is constant with 5s interval and 1 max retries with 1 attempts, schedule, expect true with count forward and next now+1s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(1)), + }, + }, + }, + }, + count: 0, + attempts: 1, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + Attempts: 0, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 1 max retries with 1 attempts, oneshot, expect false with del job": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(1)), + }, + }, + }, + }, + count: 0, + attempts: 1, + next: ptr.Of(now), + exp: false, + expNext: nil, + expPut: nil, + expDel: ptr.Of("abc/jobs/1"), + }, + + "if failure policy is constant with 5s interval and 2 max retries with 2 attempts, schedule, expect true with count forward and next now+1s": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(2)), + }, + }, + }, + }, + count: 0, + attempts: 2, + next: ptr.Of(now), + exp: true, + expNext: ptr.Of(now.Add(time.Second)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 1, + Attempts: 0, + LastTrigger: timestamppb.New(now), + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 3 max retries with 2 attempts, schedule, expect true with retry": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(3)), + }, + }, + }, + }, + count: 0, + attempts: 2, + next: ptr.Of(now.Add(time.Second * 5 * 2)), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 5 * 3)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 0, + Attempts: 3, + }, + }, + expDel: nil, + }, + "if failure policy is constant with 5s interval and 2 max retries with 2 attempts, count 3, schedule, expect true with counter forward": { + job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(2)), + }, + }, + }, + }, + count: 3, + lastTriggerTime: timestamppb.New(now.Add(time.Second * 3)), + attempts: 2, + next: ptr.Of(now.Add((time.Second * 2) + (time.Second * 5 * 2))), + exp: true, + expNext: ptr.Of(now.Add(time.Second * 5)), + expPut: &putExp{ + key: "abc/counters/1", + counter: &stored.Counter{ + Count: 4, + LastTrigger: timestamppb.New(now.Add((time.Second * 4))), + }, + }, + expDel: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + client := fake.New(). + WithPutFn(func(_ context.Context, jobKey string, counter string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { + if test.expPut == nil { + assert.Fail(t, "unexpected put call") + } else { + var counterAPI stored.Counter + require.NoError(t, proto.Unmarshal([]byte(counter), &counterAPI)) + assert.Equal(t, test.expPut.key, jobKey) + assert.Truef(t, proto.Equal(test.expPut.counter, &counterAPI), "%v != %v", test.expPut.counter, &counterAPI) + } + return nil, nil + }). + WithDeleteMultiFn(func(keys ...string) error { + if test.expDel == nil { + assert.Fail(t, "unexpected delete call") + } else { + assert.Equal(t, []string{*test.expDel}, keys) + } + return nil + }) + + collector, err := garbage.New(garbage.Options{Client: client}) + require.NoError(t, err) + + next := now + if test.next != nil { + next = *test.next + } + + job := &stored.Job{ + Begin: &stored.Job_DueTime{DueTime: timestamppb.New(now)}, + Job: test.job, + } + + sched, err := scheduler.NewBuilder().Schedule(job) + require.NoError(t, err) + + counter := &Counter{ + jobKey: "abc/jobs/1", + counterKey: "abc/counters/1", + client: client, + count: &stored.Counter{ + Attempts: test.attempts, + Count: test.count, + LastTrigger: test.lastTriggerTime, + }, + job: job, + next: next, + schedule: sched, + collector: collector, + yard: grave.New(), + } + + ok, err := counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.Equal(t, test.exp, ok) + if test.expDel == nil { + assert.Equal(t, *test.expNext, counter.next, "next") + } + assert.NotEqual(t, test.expDel != nil, test.expPut != nil) + }) + } +} + +func Test_TriggerFailureSuccess(t *testing.T) { + t.Parallel() + + var putCall, delCall atomic.Uint32 + var count, del atomic.Value + client := fake.New(). + WithPutFn(func(_ context.Context, _ string, counter string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { + putCall.Add(1) + var counterAPI stored.Counter + require.NoError(t, proto.Unmarshal([]byte(counter), &counterAPI)) + count.Store(&counterAPI) + return nil, nil + }). + WithDeleteMultiFn(func(keys ...string) error { + delCall.Add(1) + del.Store(keys) + return nil + }) + + collector, err := garbage.New(garbage.Options{Client: client}) + require.NoError(t, err) + now := time.Now().UTC().Truncate(time.Second) + + job := &stored.Job{ + Begin: &stored.Job_DueTime{DueTime: timestamppb.New(now)}, + Job: &api.Job{ + DueTime: ptr.Of(now.Format(time.RFC3339)), + Schedule: ptr.Of("@every 1s"), + Repeats: ptr.Of(uint32(3)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 5), MaxRetries: ptr.Of(uint32(2)), + }, + }, + }, + }, + } + + sched, err := scheduler.NewBuilder().Schedule(job) + require.NoError(t, err) + + counter := &Counter{ + jobKey: "abc/jobs/1", + counterKey: "abc/counters/1", + client: client, + count: new(stored.Counter), + job: job, + next: now, + schedule: sched, + collector: collector, + yard: grave.New(), + } + + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 0, Attempts: 0, LastTrigger: nil, + })) + assert.Equal(t, uint32(0), putCall.Load()) + + ok, err := counter.TriggerSuccess(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 1, Attempts: 0, LastTrigger: timestamppb.New(now), + })) + assert.Equal(t, uint32(1), putCall.Load()) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 1, Attempts: 0, LastTrigger: timestamppb.New(now), + })) + + ok, err = counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second*6), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 1, Attempts: 1, LastTrigger: timestamppb.New(now), + })) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 1, Attempts: 1, LastTrigger: timestamppb.New(now), + })) + ok, err = counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second*11), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 1, Attempts: 2, LastTrigger: timestamppb.New(now), + })) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 1, Attempts: 2, LastTrigger: timestamppb.New(now), + })) + ok, err = counter.TriggerSuccess(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second*2), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 2, Attempts: 0, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 2, Attempts: 0, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + + ok, err = counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second*7), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 2, Attempts: 1, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 2, Attempts: 1, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + ok, err = counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.True(t, ok) + assert.Equal(t, now.Add(time.Second*12), counter.next) + assert.True(t, proto.Equal(counter.count, &stored.Counter{ + Count: 2, Attempts: 2, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + assert.True(t, proto.Equal(count.Load().(*stored.Counter), &stored.Counter{ + Count: 2, Attempts: 2, LastTrigger: timestamppb.New(now.Add(time.Second)), + })) + assert.Equal(t, uint32(0), delCall.Load()) + assert.Nil(t, del.Load()) + ok, err = counter.TriggerFailed(context.Background()) + require.NoError(t, err) + assert.False(t, ok) + assert.Equal(t, uint32(1), delCall.Load()) + assert.Equal(t, []string{"abc/jobs/1"}, del.Load()) +} diff --git a/internal/garbage/collector_test.go b/internal/garbage/collector_test.go index 6299cbe..c3d0abf 100644 --- a/internal/garbage/collector_test.go +++ b/internal/garbage/collector_test.go @@ -103,7 +103,7 @@ func Test_Run(t *testing.T) { cancel() require.NoError(t, c.Run(ctx)) - for i := 0; i < 500000; i++ { + for i := range 500000 { c.Push(fmt.Sprintf("test-%d", i)) } @@ -128,7 +128,7 @@ func Test_Run(t *testing.T) { errCh <- c.Run(ctx) }() - for i := 0; i < 100; i++ { + for i := range 100 { key := fmt.Sprintf("test-%d", i) _, err := client.Put(context.Background(), key, "value") require.NoError(t, err) @@ -173,7 +173,7 @@ func Test_Run(t *testing.T) { errCh <- c.Run(ctx) }() - for i := 0; i < 100-1; i++ { + for i := range 100 - 1 { key := fmt.Sprintf("test-%d", i) _, err := client.Put(context.Background(), key, "value") require.NoError(t, err) @@ -225,7 +225,7 @@ func Test_Run(t *testing.T) { errCh <- c.Run(ctx) }() - for i := 0; i < 10; i++ { + for i := range 10 { key := fmt.Sprintf("test-%d", i) _, err := client.Put(context.Background(), key, "value") require.NoError(t, err) @@ -274,7 +274,7 @@ func Test_Run(t *testing.T) { errCh <- c.Run(ctx) }() - for i := 0; i < 10; i++ { + for i := range 10 { key := fmt.Sprintf("test-%d", i) _, err := client.Put(context.Background(), key, "value") require.NoError(t, err) @@ -346,7 +346,7 @@ func Test_Push(t *testing.T) { default: } - for i := 0; i < 500000-1; i++ { + for i := range 500000 - 1 { c.Push(fmt.Sprintf("test-%d", i)) } @@ -417,13 +417,13 @@ func Test_collect(t *testing.T) { require.NoError(t, err) c := coll.(*collector) - for i := 0; i < 10; i++ { + for i := range 10 { _, err := client.Put(context.Background(), fmt.Sprintf("/test/%d", i), "value") require.NoError(t, err) c.keys[fmt.Sprintf("/test/%d", i)] = struct{}{} } - for i := 0; i < 10; i++ { + for i := range 10 { resp, err := client.Get(context.Background(), fmt.Sprintf("/test/%d", i)) require.NoError(t, err) require.Len(t, resp.Kvs, 1) @@ -434,7 +434,7 @@ func Test_collect(t *testing.T) { require.NoError(t, c.collect()) assert.Empty(t, c.keys) - for i := 0; i < 10; i++ { + for i := range 10 { resp, err := client.Get(context.Background(), fmt.Sprintf("/test/%d", i)) require.NoError(t, err) require.Empty(t, resp.Kvs) @@ -451,7 +451,7 @@ func Test_collect(t *testing.T) { require.NoError(t, err) c := coll.(*collector) - for i := 0; i < 10; i++ { + for i := range 10 { _, err := client.Put(context.Background(), fmt.Sprintf("/test/%d", i), "value") require.NoError(t, err) c.keys[fmt.Sprintf("/test/%d", i)] = struct{}{} @@ -462,7 +462,7 @@ func Test_collect(t *testing.T) { require.NoError(t, err) } - for i := 0; i < 20; i++ { + for i := range 20 { resp, err := client.Get(context.Background(), fmt.Sprintf("/test/%d", i)) require.NoError(t, err) require.Len(t, resp.Kvs, 1) @@ -473,7 +473,7 @@ func Test_collect(t *testing.T) { require.NoError(t, c.collect()) assert.Empty(t, c.keys) - for i := 0; i < 10; i++ { + for i := range 10 { resp, err := client.Get(context.Background(), fmt.Sprintf("/test/%d", i)) require.NoError(t, err) require.Empty(t, resp.Kvs) diff --git a/internal/grave/yard_test.go b/internal/grave/yard_test.go index 4d5d7c5..86df393 100644 --- a/internal/grave/yard_test.go +++ b/internal/grave/yard_test.go @@ -36,7 +36,8 @@ func Test_Deleted(t *testing.T) { yard := New() exp := make(map[string]uint64) - for i := 0; i < 500000-1; i++ { + for i := range 500000 - 1 { + //nolint:gosec exp[strconv.Itoa(i)] = uint64(i) yard.Deleted(strconv.Itoa(i)) } @@ -47,6 +48,7 @@ func Test_Deleted(t *testing.T) { assert.Len(t, yard.deletesMap, (500000 - 10000)) newExp := make(map[string]uint64) for i := 10000; i < 500000; i++ { + //nolint:gosec newExp[strconv.Itoa(i)] = uint64(i) } assert.Equal(t, newExp, yard.deletesMap) diff --git a/internal/informer/informer_test.go b/internal/informer/informer_test.go index 53ec345..d4fd36a 100644 --- a/internal/informer/informer_test.go +++ b/internal/informer/informer_test.go @@ -134,7 +134,7 @@ func Test_Run(t *testing.T) { ch, err := i.Events() require.NoError(t, err) - for i := 0; i < 2; i++ { + for i := range 2 { select { case ev := <-ch: assert.True(t, ev.IsPut) diff --git a/internal/leadership/leadership_test.go b/internal/leadership/leadership_test.go index 857296e..8ec6f02 100644 --- a/internal/leadership/leadership_test.go +++ b/internal/leadership/leadership_test.go @@ -360,7 +360,7 @@ func Test_Run(t *testing.T) { cancel() - for i := 0; i < 3; i++ { + for range 3 { select { case <-errCh: case <-time.After(2 * time.Second): @@ -480,7 +480,7 @@ func Test_checkLeadershipKeys(t *testing.T) { }), }) - for i := 0; i < 10; i++ { + for i := range 10 { _, err := client.Put(context.Background(), "abc/leadership/"+strconv.Itoa(i), "10") require.NoError(t, err) } @@ -638,7 +638,7 @@ func Test_WaitForLeadership(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - leadership := New(Options{Client: client.New(nil)}) + leadership := New(Options{Client: client.New(client.Options{})}) cancel() assert.Equal(t, context.Canceled, leadership.WaitForLeadership(ctx)) }) @@ -648,7 +648,7 @@ func Test_WaitForLeadership(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - leadership := New(Options{Client: client.New(nil)}) + leadership := New(Options{Client: client.New(client.Options{})}) close(leadership.readyCh) require.NoError(t, leadership.WaitForLeadership(ctx)) }) diff --git a/internal/partitioner/zero_test.go b/internal/partitioner/zero_test.go index d0bb60e..3623c47 100644 --- a/internal/partitioner/zero_test.go +++ b/internal/partitioner/zero_test.go @@ -18,7 +18,7 @@ func Test_zero(t *testing.T) { t.Run("always return true", func(t *testing.T) { t.Parallel() z := new(zero) - for i := 0; i < 100; i++ { + for range 100 { //nolint:gosec assert.True(t, z.IsJobManaged(rand.Uint32())) } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 0798d65..ab3bb9b 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -12,12 +12,12 @@ import ( "sync" "sync/atomic" + "github.com/dapr/kit/concurrency" + "github.com/dapr/kit/events/queue" "github.com/go-logr/logr" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/utils/clock" - "github.com/dapr/kit/concurrency" - "github.com/dapr/kit/events/queue" "github.com/diagridio/go-etcd-cron/api" "github.com/diagridio/go-etcd-cron/internal/api/stored" "github.com/diagridio/go-etcd-cron/internal/client" @@ -99,7 +99,7 @@ func New(opts Options) *Queue { clock: cl, lock: concurrency.NewMutexMap[string](), readyCh: make(chan struct{}), - errCh: make(chan error), + errCh: make(chan error, 10), } } @@ -244,21 +244,24 @@ func (q *Queue) cacheDelete(jobKey string) error { // Returns true if the job is being re-enqueued, false otherwise. func (q *Queue) handleTrigger(ctx context.Context, counter *counter.Counter) bool { if !q.triggerFn(ctx, counter.TriggerRequest()) { - // If the trigger function returns false, i.e. failed client side, - // re-enqueue the job immediately. - if err := q.queue.Enqueue(counter); err != nil { - select { - case <-ctx.Done(): - case q.errCh <- err: - } + ok, err := counter.TriggerFailed(ctx) + if err != nil { + q.log.Error(err, "failure failing job for next retry trigger", "name", counter.Key()) } - return true + + return q.enqueueCounter(ctx, counter, ok) } - ok, err := counter.Trigger(ctx) + ok, err := counter.TriggerSuccess(ctx) if err != nil { q.log.Error(err, "failure marking job for next trigger", "name", counter.Key()) } + + return q.enqueueCounter(ctx, counter, ok) +} + +// enqueueCounter enqueues the job to the queue at this count tick. +func (q *Queue) enqueueCounter(ctx context.Context, counter *counter.Counter, ok bool) bool { if ok && ctx.Err() == nil { if err := q.queue.Enqueue(counter); err != nil { select { @@ -275,7 +278,7 @@ func (q *Queue) handleTrigger(ctx context.Context, counter *counter.Counter) boo // schedule schedules a job to it's next scheduled time. func (q *Queue) schedule(ctx context.Context, name string, job *stored.Job) error { - scheduler, err := q.schedBuilder.Scheduler(job) + schedule, err := q.schedBuilder.Schedule(job) if err != nil { return err } @@ -283,7 +286,7 @@ func (q *Queue) schedule(ctx context.Context, name string, job *stored.Job) erro counter, ok, err := counter.New(ctx, counter.Options{ Name: name, Key: q.key, - Schedule: scheduler, + Schedule: schedule, Yard: q.yard, Client: q.client, Job: job, diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 45c536d..a81c819 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -14,6 +14,11 @@ import ( "time" "github.com/dapr/kit/ptr" + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/diagridio/go-etcd-cron/api" "github.com/diagridio/go-etcd-cron/internal/api/stored" "github.com/diagridio/go-etcd-cron/internal/garbage" @@ -22,10 +27,6 @@ import ( "github.com/diagridio/go-etcd-cron/internal/key" "github.com/diagridio/go-etcd-cron/internal/scheduler" "github.com/diagridio/go-etcd-cron/tests" - "github.com/go-logr/logr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/timestamppb" ) func Test_delete_race(t *testing.T) { @@ -33,7 +34,7 @@ func Test_delete_race(t *testing.T) { triggered := make([]atomic.Int64, 20) queue := newQueue(t, func(_ context.Context, req *api.TriggerRequest) bool { - i, err := strconv.Atoi(req.Name) + i, err := strconv.Atoi(req.GetName()) require.NoError(t, err) triggered[i].Add(1) return true diff --git a/internal/scheduler/builder.go b/internal/scheduler/builder.go index 8d49bd1..00b606d 100644 --- a/internal/scheduler/builder.go +++ b/internal/scheduler/builder.go @@ -13,10 +13,12 @@ import ( "github.com/dapr/kit/cron" "github.com/dapr/kit/ptr" kittime "github.com/dapr/kit/time" - "github.com/diagridio/go-etcd-cron/api" - "github.com/diagridio/go-etcd-cron/internal/api/stored" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/utils/clock" + + "github.com/diagridio/go-etcd-cron/api" + "github.com/diagridio/go-etcd-cron/internal/api/stored" ) // Builder is a builder for creating a new scheduler. @@ -33,8 +35,8 @@ func NewBuilder() *Builder { } } -// Scheduler returns the scheduler based on the given stored job. -func (b *Builder) Scheduler(job *stored.Job) (Interface, error) { +// Schedule returns the schedule based on the given stored job. +func (b *Builder) Schedule(job *stored.Job) (Interface, error) { if job.GetJob().Schedule == nil { return &oneshot{ dueTime: job.GetDueTime().AsTime(), @@ -88,8 +90,21 @@ func (b *Builder) Parse(job *api.Job) (*stored.Job, error) { } } - //nolint:gosec + //nolint:protogetter + if job.FailurePolicy == nil { + job.FailurePolicy = &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{ + Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), + MaxRetries: ptr.Of(uint32(3)), + }, + }, + } + } + storedJob := &stored.Job{ + // PartionId has no need to be crypto random. + //nolint:gosec PartitionId: rand.Uint32(), Job: job, } diff --git a/internal/scheduler/builder_test.go b/internal/scheduler/builder_test.go index 2b2d7b0..f899fa1 100644 --- a/internal/scheduler/builder_test.go +++ b/internal/scheduler/builder_test.go @@ -13,6 +13,7 @@ import ( "github.com/dapr/kit/ptr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" clocktesting "k8s.io/utils/clock/testing" @@ -20,7 +21,7 @@ import ( "github.com/diagridio/go-etcd-cron/internal/api/stored" ) -func Test_Scheduler(t *testing.T) { +func Test_Schedule(t *testing.T) { t.Parallel() now := time.Now().UTC().Truncate(time.Second) @@ -102,7 +103,7 @@ func Test_Scheduler(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() builder := &Builder{clock: clock} - gotScheduler, gotErr := builder.Scheduler(testInLoop.job) + gotScheduler, gotErr := builder.Schedule(testInLoop.job) assert.Equal(t, testInLoop.expScheduler, gotScheduler) assert.Equal(t, testInLoop.expErr, gotErr != nil, "%v", gotErr) }) @@ -172,6 +173,11 @@ func Test_Parse(t *testing.T) { Schedule: nil, Ttl: nil, Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }}, + }, }, Begin: &stored.Job_DueTime{ DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), @@ -193,6 +199,11 @@ func Test_Parse(t *testing.T) { Schedule: ptr.Of("@every 1h"), Ttl: nil, Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }}, + }, }, Begin: &stored.Job_DueTime{ DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), @@ -214,6 +225,11 @@ func Test_Parse(t *testing.T) { Schedule: ptr.Of("@every 1h"), Ttl: ptr.Of("2h"), Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }}, + }, }, Begin: &stored.Job_Start{ Start: timestamppb.New(now), @@ -235,6 +251,11 @@ func Test_Parse(t *testing.T) { Schedule: ptr.Of("@every 1h"), Ttl: ptr.Of("2h"), Repeats: ptr.Of(uint32(100)), + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }}, + }, }, Begin: &stored.Job_DueTime{ DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), @@ -266,6 +287,11 @@ func Test_Parse(t *testing.T) { Schedule: ptr.Of("@every 1h"), Ttl: ptr.Of("2024-04-24T11:42:22Z"), Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second), MaxRetries: ptr.Of(uint32(3)), + }}, + }, }, Begin: &stored.Job_DueTime{ DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), @@ -292,6 +318,64 @@ func Test_Parse(t *testing.T) { }, expErr: true, }, + "don't overwrite failure policy constant": { + job: &api.Job{ + DueTime: ptr.Of("2024-04-24T11:42:22Z"), + Schedule: nil, + Ttl: nil, + Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: ptr.Of(uint32(5)), + }}, + }, + }, + expStored: &stored.Job{ + Job: &api.Job{ + DueTime: ptr.Of("2024-04-24T11:42:22Z"), + Schedule: nil, + Ttl: nil, + Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Constant{Constant: &api.FailurePolicyConstant{ + Interval: durationpb.New(time.Second * 3), MaxRetries: ptr.Of(uint32(5)), + }}, + }, + }, + Begin: &stored.Job_DueTime{ + DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), + }, + Expiration: nil, + }, + expErr: false, + }, + "don't overwrite failure policy drop": { + job: &api.Job{ + DueTime: ptr.Of("2024-04-24T11:42:22Z"), + Schedule: nil, + Ttl: nil, + Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Drop{Drop: new(api.FailurePolicyDrop)}, + }, + }, + expStored: &stored.Job{ + Job: &api.Job{ + DueTime: ptr.Of("2024-04-24T11:42:22Z"), + Schedule: nil, + Ttl: nil, + Repeats: nil, + FailurePolicy: &api.FailurePolicy{ + Policy: &api.FailurePolicy_Drop{Drop: new(api.FailurePolicyDrop)}, + }, + }, + Begin: &stored.Job_DueTime{ + DueTime: timestamppb.New(time.Date(2024, 4, 24, 11, 42, 22, 0, time.UTC)), + }, + Expiration: nil, + }, + expErr: false, + }, } for name, test := range tests { diff --git a/nix/ci.nix b/nix/ci.nix index eb733f8..ee061fa 100644 --- a/nix/ci.nix +++ b/nix/ci.nix @@ -54,7 +54,8 @@ let echo ">> running check-gomod2nix" check-gomod2nix echo ">> running golangci-lint" - golangci-lint run --timeout 20m + golangci-lint config verify --verbose + golangci-lint run --enable-all --max-issues-per-linter=0 --max-same-issues=0 echo ">> running go test --race -v -count 1 ./..." go test --race -v -count 1 ./... ''; diff --git a/proto/api/failurepolicy.proto b/proto/api/failurepolicy.proto new file mode 100644 index 0000000..f2ca4bb --- /dev/null +++ b/proto/api/failurepolicy.proto @@ -0,0 +1,37 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +syntax = "proto3"; + +package api; + +import "google/protobuf/duration.proto"; + +option go_package = "github.com/diagridio/go-etcd-cron/api"; + +// FailurePolicy defines the policy to apply when a job fails to trigger. +message FailurePolicy { + // policy is the policy to apply when a job fails to trigger. + oneof policy { + FailurePolicyDrop drop = 1; + FailurePolicyConstant constant = 2; + } +} + +// FailurePolicyDrop is a policy which drops the job tick when the job fails to +// trigger. +message FailurePolicyDrop {} + +// FailurePolicyRetry is a policy which retries the job at a consistent +// interval when the job fails to trigger. +message FailurePolicyConstant { + // interval is the constant delay to wait before retrying the job. + google.protobuf.Duration interval = 1; + + // max_retries is the optional maximum number of retries to attempt before + // giving up. + // If unset, the Job will be retried indefinitely. + optional uint32 max_retries = 2; +} diff --git a/proto/api/job.proto b/proto/api/job.proto index d85d413..03865be 100644 --- a/proto/api/job.proto +++ b/proto/api/job.proto @@ -8,7 +8,7 @@ syntax = "proto3"; package api; import "google/protobuf/any.proto"; -import "google/protobuf/timestamp.proto"; +import "proto/api/failurepolicy.proto"; option go_package = "github.com/diagridio/go-etcd-cron/api"; @@ -59,4 +59,11 @@ message Job { // payload is the serialized job payload that will be sent to the recipient // when the job is triggered. google.protobuf.Any payload = 6; + + // failure_policy is the optional policy to apply when a job fails to + // trigger. + // By default, the failure policy is FailurePolicyConstant with a 1s interval + // and 3 maximum retries. + // See `failurepolicy.proto` for more information. + optional FailurePolicy failure_policy = 7; } diff --git a/proto/stored/counter.proto b/proto/stored/counter.proto index edbc017..185b772 100644 --- a/proto/stored/counter.proto +++ b/proto/stored/counter.proto @@ -25,4 +25,9 @@ message Counter { // last_trigger is the timestamp the job was last triggered. Used to // determine the next time the job should be triggered. google.protobuf.Timestamp last_trigger = 3; + + // attempts is the number of times the job has been attempted to be triggered + // at this count. Used by failure policy to track how many times the Job + // trigger should be retried. + uint32 attempts = 4; } diff --git a/tests/cron/cron.go b/tests/cron/cron.go index d6ca2b8..aa55554 100644 --- a/tests/cron/cron.go +++ b/tests/cron/cron.go @@ -27,6 +27,8 @@ type Cron struct { } func newCron(t *testing.T, client *clientv3.Client, total, id uint32) *Cron { + t.Helper() + var calls atomic.Int64 cron, err := cron.New(cron.Options{ Log: logr.Discard(), @@ -47,6 +49,8 @@ func newCron(t *testing.T, client *clientv3.Client, total, id uint32) *Cron { } func (c *Cron) run(t *testing.T) *Cron { + t.Helper() + errCh := make(chan error) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(func() { diff --git a/tests/tests.go b/tests/tests.go index 54d88c0..aa817bf 100644 --- a/tests/tests.go +++ b/tests/tests.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" @@ -20,7 +21,10 @@ import ( func EmbeddedETCD(t *testing.T) client.Interface { t.Helper() - return client.New(EmbeddedETCDBareClient(t)) + return client.New(client.Options{ + Log: logr.Discard(), + Client: EmbeddedETCDBareClient(t), + }) } func EmbeddedETCDBareClient(t *testing.T) *clientv3.Client {