From 0b8da8d647f92f27c280a43280c0309e62092eee Mon Sep 17 00:00:00 2001 From: dashjay Date: Mon, 1 Apr 2024 17:10:27 +0800 Subject: [PATCH] init Signed-off-by: dashjay --- .gitignore | 2 + Makefile | 8 + api/hlcv1/hlcv1.pb.go | 316 +++++++++++++++++++++++++++++++++++ api/hlcv1/hlcv1.proto | 32 ++++ api/hlcv1/hlcv1_grpc.pb.go | 142 ++++++++++++++++ go.mod | 20 +++ go.sum | 31 ++++ pkg/persistence/disk.go | 81 +++++++++ pkg/persistence/disk_test.go | 34 ++++ pkg/persistence/interface.go | 13 ++ pkg/service/hlc.go | 173 +++++++++++++++++++ pkg/service/hlc_test.go | 78 +++++++++ 12 files changed, 930 insertions(+) create mode 100644 Makefile create mode 100644 api/hlcv1/hlcv1.pb.go create mode 100644 api/hlcv1/hlcv1.proto create mode 100644 api/hlcv1/hlcv1_grpc.pb.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/persistence/disk.go create mode 100644 pkg/persistence/disk_test.go create mode 100644 pkg/persistence/interface.go create mode 100644 pkg/service/hlc.go create mode 100644 pkg/service/hlc_test.go diff --git a/.gitignore b/.gitignore index 3b735ec..5118da7 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ # Go workspace file go.work + +.idea/ \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..62c5bd9 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +.PHONY: proto + +proto: + protoc \ + --go-grpc_out="${PWD}"/api \ + --go_out="${PWD}"/api \ + -I "${PWD}/api/hlcv1" \ + hlcv1.proto diff --git a/api/hlcv1/hlcv1.pb.go b/api/hlcv1/hlcv1.pb.go new file mode 100644 index 0000000..541ab56 --- /dev/null +++ b/api/hlcv1/hlcv1.pb.go @@ -0,0 +1,316 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.25.3 +// source: hlcv1.proto + +package hlcv1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Clock int64 `protobuf:"varint,1,opt,name=clock,proto3" json:"clock,omitempty"` +} + +func (x *GetResp) Reset() { + *x = GetResp{} + if protoimpl.UnsafeEnabled { + mi := &file_hlcv1_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetResp) ProtoMessage() {} + +func (x *GetResp) ProtoReflect() protoreflect.Message { + mi := &file_hlcv1_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetResp.ProtoReflect.Descriptor instead. +func (*GetResp) Descriptor() ([]byte, []int) { + return file_hlcv1_proto_rawDescGZIP(), []int{0} +} + +func (x *GetResp) GetClock() int64 { + if x != nil { + return x.Clock + } + return 0 +} + +type BatchGetReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // default count is 1, means that only get one. + Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` + // if count is more than two, return_first can be set to reduce the size of the response + ReturnFirst bool `protobuf:"varint,2,opt,name=return_first,json=returnFirst,proto3" json:"return_first,omitempty"` +} + +func (x *BatchGetReq) Reset() { + *x = BatchGetReq{} + if protoimpl.UnsafeEnabled { + mi := &file_hlcv1_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchGetReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchGetReq) ProtoMessage() {} + +func (x *BatchGetReq) ProtoReflect() protoreflect.Message { + mi := &file_hlcv1_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchGetReq.ProtoReflect.Descriptor instead. +func (*BatchGetReq) Descriptor() ([]byte, []int) { + return file_hlcv1_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchGetReq) GetCount() uint32 { + if x != nil { + return x.Count + } + return 0 +} + +func (x *BatchGetReq) GetReturnFirst() bool { + if x != nil { + return x.ReturnFirst + } + return false +} + +type BatchGetResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Req *BatchGetReq `protobuf:"bytes,1,opt,name=Req,proto3" json:"Req,omitempty"` + // if return_first specified true, first clock will be set + First int64 `protobuf:"varint,2,opt,name=first,proto3" json:"first,omitempty"` + // if request count more than one and return_first = false, all clock allocated will be set here + Clocks []int64 `protobuf:"varint,3,rep,packed,name=clocks,proto3" json:"clocks,omitempty"` +} + +func (x *BatchGetResp) Reset() { + *x = BatchGetResp{} + if protoimpl.UnsafeEnabled { + mi := &file_hlcv1_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchGetResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchGetResp) ProtoMessage() {} + +func (x *BatchGetResp) ProtoReflect() protoreflect.Message { + mi := &file_hlcv1_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchGetResp.ProtoReflect.Descriptor instead. +func (*BatchGetResp) Descriptor() ([]byte, []int) { + return file_hlcv1_proto_rawDescGZIP(), []int{2} +} + +func (x *BatchGetResp) GetReq() *BatchGetReq { + if x != nil { + return x.Req + } + return nil +} + +func (x *BatchGetResp) GetFirst() int64 { + if x != nil { + return x.First + } + return 0 +} + +func (x *BatchGetResp) GetClocks() []int64 { + if x != nil { + return x.Clocks + } + return nil +} + +var File_hlcv1_proto protoreflect.FileDescriptor + +var file_hlcv1_proto_rawDesc = []byte{ + 0x0a, 0x0b, 0x68, 0x6c, 0x63, 0x76, 0x31, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x68, + 0x6c, 0x63, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x1f, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x14, 0x0a, 0x05, + 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x6c, 0x6f, + 0x63, 0x6b, 0x22, 0x46, 0x0a, 0x0b, 0x42, 0x61, 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x52, 0x65, + 0x71, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, 0x74, 0x75, 0x72, + 0x6e, 0x5f, 0x66, 0x69, 0x72, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x72, + 0x65, 0x74, 0x75, 0x72, 0x6e, 0x46, 0x69, 0x72, 0x73, 0x74, 0x22, 0x62, 0x0a, 0x0c, 0x42, 0x61, + 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x24, 0x0a, 0x03, 0x52, 0x65, + 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x68, 0x6c, 0x63, 0x76, 0x31, 0x2e, + 0x42, 0x61, 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x52, 0x03, 0x52, 0x65, 0x71, + 0x12, 0x14, 0x0a, 0x05, 0x66, 0x69, 0x72, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x05, 0x66, 0x69, 0x72, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x73, + 0x18, 0x03, 0x20, 0x03, 0x28, 0x03, 0x52, 0x06, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x32, 0x70, + 0x0a, 0x0a, 0x48, 0x43, 0x4c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x2d, 0x0a, 0x03, + 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0e, 0x2e, 0x68, 0x6c, + 0x63, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x33, 0x0a, 0x08, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x12, 0x12, 0x2e, 0x68, 0x6c, 0x63, 0x76, 0x31, 0x2e, + 0x42, 0x61, 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x1a, 0x13, 0x2e, 0x68, 0x6c, + 0x63, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x68, 0x6c, 0x63, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_hlcv1_proto_rawDescOnce sync.Once + file_hlcv1_proto_rawDescData = file_hlcv1_proto_rawDesc +) + +func file_hlcv1_proto_rawDescGZIP() []byte { + file_hlcv1_proto_rawDescOnce.Do(func() { + file_hlcv1_proto_rawDescData = protoimpl.X.CompressGZIP(file_hlcv1_proto_rawDescData) + }) + return file_hlcv1_proto_rawDescData +} + +var file_hlcv1_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_hlcv1_proto_goTypes = []interface{}{ + (*GetResp)(nil), // 0: hlcv1.GetResp + (*BatchGetReq)(nil), // 1: hlcv1.BatchGetReq + (*BatchGetResp)(nil), // 2: hlcv1.BatchGetResp + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty +} +var file_hlcv1_proto_depIdxs = []int32{ + 1, // 0: hlcv1.BatchGetResp.Req:type_name -> hlcv1.BatchGetReq + 3, // 1: hlcv1.HCLService.Get:input_type -> google.protobuf.Empty + 1, // 2: hlcv1.HCLService.BatchGet:input_type -> hlcv1.BatchGetReq + 0, // 3: hlcv1.HCLService.Get:output_type -> hlcv1.GetResp + 2, // 4: hlcv1.HCLService.BatchGet:output_type -> hlcv1.BatchGetResp + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_hlcv1_proto_init() } +func file_hlcv1_proto_init() { + if File_hlcv1_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_hlcv1_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_hlcv1_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchGetReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_hlcv1_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchGetResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_hlcv1_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_hlcv1_proto_goTypes, + DependencyIndexes: file_hlcv1_proto_depIdxs, + MessageInfos: file_hlcv1_proto_msgTypes, + }.Build() + File_hlcv1_proto = out.File + file_hlcv1_proto_rawDesc = nil + file_hlcv1_proto_goTypes = nil + file_hlcv1_proto_depIdxs = nil +} diff --git a/api/hlcv1/hlcv1.proto b/api/hlcv1/hlcv1.proto new file mode 100644 index 0000000..7fbecea --- /dev/null +++ b/api/hlcv1/hlcv1.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package hlcv1; + +import "google/protobuf/empty.proto"; + +option go_package = "/hlcv1"; + + +message GetResp { + int64 clock = 1; +} + +message BatchGetReq { + // default count is 1, means that only get one. + uint32 count = 1; + // if count is more than two, return_first can be set to reduce the size of the response + bool return_first = 2; +} + +message BatchGetResp { + BatchGetReq Req = 1; + // if return_first specified true, first clock will be set + int64 first = 2; + // if request count more than one and return_first = false, all clock allocated will be set here + repeated int64 clocks = 3; +} + +service HCLService{ + rpc Get(google.protobuf.Empty) returns(GetResp); + rpc BatchGet(BatchGetReq) returns(BatchGetResp); +} \ No newline at end of file diff --git a/api/hlcv1/hlcv1_grpc.pb.go b/api/hlcv1/hlcv1_grpc.pb.go new file mode 100644 index 0000000..2e488a5 --- /dev/null +++ b/api/hlcv1/hlcv1_grpc.pb.go @@ -0,0 +1,142 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v4.25.3 +// source: hlcv1.proto + +package hlcv1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HCLServiceClient is the client API for HCLService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HCLServiceClient interface { + Get(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetResp, error) + BatchGet(ctx context.Context, in *BatchGetReq, opts ...grpc.CallOption) (*BatchGetResp, error) +} + +type hCLServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewHCLServiceClient(cc grpc.ClientConnInterface) HCLServiceClient { + return &hCLServiceClient{cc} +} + +func (c *hCLServiceClient) Get(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetResp, error) { + out := new(GetResp) + err := c.cc.Invoke(ctx, "/hlcv1.HCLService/Get", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *hCLServiceClient) BatchGet(ctx context.Context, in *BatchGetReq, opts ...grpc.CallOption) (*BatchGetResp, error) { + out := new(BatchGetResp) + err := c.cc.Invoke(ctx, "/hlcv1.HCLService/BatchGet", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HCLServiceServer is the server API for HCLService service. +// All implementations must embed UnimplementedHCLServiceServer +// for forward compatibility +type HCLServiceServer interface { + Get(context.Context, *emptypb.Empty) (*GetResp, error) + BatchGet(context.Context, *BatchGetReq) (*BatchGetResp, error) + mustEmbedUnimplementedHCLServiceServer() +} + +// UnimplementedHCLServiceServer must be embedded to have forward compatible implementations. +type UnimplementedHCLServiceServer struct { +} + +func (UnimplementedHCLServiceServer) Get(context.Context, *emptypb.Empty) (*GetResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") +} +func (UnimplementedHCLServiceServer) BatchGet(context.Context, *BatchGetReq) (*BatchGetResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method BatchGet not implemented") +} +func (UnimplementedHCLServiceServer) mustEmbedUnimplementedHCLServiceServer() {} + +// UnsafeHCLServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HCLServiceServer will +// result in compilation errors. +type UnsafeHCLServiceServer interface { + mustEmbedUnimplementedHCLServiceServer() +} + +func RegisterHCLServiceServer(s grpc.ServiceRegistrar, srv HCLServiceServer) { + s.RegisterService(&HCLService_ServiceDesc, srv) +} + +func _HCLService_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HCLServiceServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hlcv1.HCLService/Get", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HCLServiceServer).Get(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _HCLService_BatchGet_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BatchGetReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HCLServiceServer).BatchGet(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hlcv1.HCLService/BatchGet", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HCLServiceServer).BatchGet(ctx, req.(*BatchGetReq)) + } + return interceptor(ctx, in, info, handler) +} + +// HCLService_ServiceDesc is the grpc.ServiceDesc for HCLService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var HCLService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "hlcv1.HCLService", + HandlerType: (*HCLServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Get", + Handler: _HCLService_Get_Handler, + }, + { + MethodName: "BatchGet", + Handler: _HCLService_BatchGet_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "hlcv1.proto", +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..78c2a6e --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module github.com/dashjay/gohlc + +go 1.21.4 + +require ( + github.com/stretchr/testify v1.9.0 + google.golang.org/grpc v1.62.1 + google.golang.org/protobuf v1.33.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a41ef6b --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= +google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/persistence/disk.go b/pkg/persistence/disk.go new file mode 100644 index 0000000..56e8d9f --- /dev/null +++ b/pkg/persistence/disk.go @@ -0,0 +1,81 @@ +package persistence + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "sync" +) + +type Disk struct { + path string + fd *os.File + mu sync.Mutex +} + +func (d *Disk) Persist(ctx context.Context, in int64) error { + d.mu.Lock() + defer d.mu.Unlock() + deadline, ok := ctx.Deadline() + if ok { + err := d.fd.SetWriteDeadline(deadline) + if err != nil { + fmt.Fprintf(os.Stderr, "set write deadline failed: %s\n", err) + // ignore the deadline maybe not support by this os + } + } + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(in)) + n, err := d.fd.WriteAt(buf[:8], 0) + if err != nil { + return err + } + if n != 8 { + return errors.New("incomplete clock persistent") + } + err = d.fd.Sync() + if err != nil { + return err + } + return nil +} + +func (d *Disk) Load(ctx context.Context) (int64, error) { + d.mu.Lock() + defer d.mu.Unlock() + + deadline, ok := ctx.Deadline() + if ok { + err := d.fd.SetReadDeadline(deadline) + if err != nil { + fmt.Fprintf(os.Stderr, "set write deadline failed: %s\n", err) + // ignore the deadline maybe not support by this os + } + } + var buf [8]byte + n, err := d.fd.ReadAt(buf[:8], 0) + if n != 8 { + // if read error but not eof, means other problem happened on read + if !errors.Is(err, io.EOF) { + return 0, err + } + return 0, ClockNotFound + } + return int64(binary.BigEndian.Uint64(buf[:8])), nil +} + +var _ Interface = (*Disk)(nil) + +func NewDisk(path string) (Interface, error) { + fd, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644) + if err != nil { + return nil, err + } + return &Disk{ + path: path, + fd: fd, + }, nil +} diff --git a/pkg/persistence/disk_test.go b/pkg/persistence/disk_test.go new file mode 100644 index 0000000..5ed2d13 --- /dev/null +++ b/pkg/persistence/disk_test.go @@ -0,0 +1,34 @@ +package persistence_test + +import ( + "context" + "github.com/dashjay/gohlc/pkg/persistence" + "github.com/stretchr/testify/assert" + "path/filepath" + "testing" + "time" +) + +func TestDisk(t *testing.T) { + fPath := filepath.Join(t.TempDir(), "temp-file") + d, err := persistence.NewDisk(fPath) + assert.Nil(t, err) + + t.Run("rw", func(t *testing.T) { + _, err = d.Load(context.TODO()) + assert.ErrorIs(t, err, persistence.ClockNotFound) + clock := time.Now().UnixNano() + err = d.Persist(context.TODO(), clock) + assert.Nil(t, err) + getClock, err := d.Load(context.TODO()) + assert.Nil(t, err) + assert.Equal(t, clock, getClock) + }) + + t.Run("r-with-deadline", func(t *testing.T) { + ctx, cancel := context.WithDeadline(context.Background(), time.Now()) + defer cancel() + _, err = d.Load(ctx) + assert.Nil(t, err) + }) +} diff --git a/pkg/persistence/interface.go b/pkg/persistence/interface.go new file mode 100644 index 0000000..3a2c44f --- /dev/null +++ b/pkg/persistence/interface.go @@ -0,0 +1,13 @@ +package persistence + +import ( + "context" + "errors" +) + +var ClockNotFound = errors.New("persistence clock not found") + +type Interface interface { + Persist(ctx context.Context, in int64) error + Load(ctx context.Context) (int64, error) +} diff --git a/pkg/service/hlc.go b/pkg/service/hlc.go new file mode 100644 index 0000000..e0d3b66 --- /dev/null +++ b/pkg/service/hlc.go @@ -0,0 +1,173 @@ +package service + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/dashjay/gohlc/api/hlcv1" + "github.com/dashjay/gohlc/pkg/persistence" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +const InvalidClock = math.MaxInt64 + +type HLCService struct { + persistenceHandler persistence.Interface + persistentIntervalSecond int64 + + allocatedTimestamp atomic.Int64 + + lastSavedSuccess atomic.Bool + lastSavedTimestamp atomic.Int64 + + LockOSThread bool + + mu sync.Mutex + cond *sync.Cond + hlcv1.UnimplementedHCLServiceServer +} + +func (h *HLCService) loadClockFromPersistence(ctx context.Context) error { + clock, err := h.persistenceHandler.Load(ctx) + if err != nil { + if errors.Is(err, persistence.ClockNotFound) { + return h.persistenceHandler.Persist(ctx, time.Now().UnixNano()) + } + return err + } + h.lastSavedTimestamp.Store(clock) + return nil +} + +func (h *HLCService) doPersistentAndUpdateLastSaved(ctx context.Context) error { + h.mu.Lock() + base := _max(h.allocatedTimestamp.Load(), time.Now().UnixNano()) + save := base + h.persistentIntervalSecond*int64(time.Second) + h.mu.Unlock() + + err := h.persistenceHandler.Persist(ctx, save) + if err != nil { + h.lastSavedSuccess.Store(false) + return err + } + + h.lastSavedSuccess.Store(true) + h.lastSavedTimestamp.Store(save) + h.cond.Broadcast() + return nil +} + +func (h *HLCService) doPersistentAndUpdateLastSavedLoop(ctx context.Context) { + // lock this goroutine + if h.LockOSThread { + runtime.LockOSThread() + } + + for { + startTime := time.Now() + timeout := time.Second * time.Duration(h.persistentIntervalSecond) / 3 + subCtx, cancel := context.WithTimeout(ctx, timeout) + err := h.doPersistentAndUpdateLastSaved(subCtx) + cancel() + if err != nil { + fmt.Fprintf(os.Stderr, "do persistent and update lastSaved clock error: %s", err) + } + intervalLeft := timeout - time.Since(startTime) + if intervalLeft > 0 { + time.Sleep(intervalLeft) + } + } +} + +func NewHLCService(path string, intervalSec int64) *HLCService { + ph, err := persistence.NewDisk(path) + if err != nil { + panic(err) + } + hlc := &HLCService{ + persistenceHandler: ph, + persistentIntervalSecond: intervalSec, + allocatedTimestamp: atomic.Int64{}, + lastSavedTimestamp: atomic.Int64{}, + cond: sync.NewCond(&sync.Mutex{}), + } + return hlc +} + +func (h *HLCService) Start() error { + err := h.loadClockFromPersistence(context.Background()) + if err != nil { + return err + } + err = h.doPersistentAndUpdateLastSaved(context.Background()) + if err != nil { + return err + } + go h.doPersistentAndUpdateLastSavedLoop(context.Background()) + return nil +} + +func (h *HLCService) allocateClock(count uint32) int64 { + c64 := int64(count) + loopCount := 0 + for { + h.cond.L.Lock() + expected := h.allocatedTimestamp.Load() + desired := _max(expected+c64, time.Now().UnixNano()) + if desired >= h.lastSavedTimestamp.Load() { + h.cond.Wait() + } + h.cond.L.Unlock() + if h.allocatedTimestamp.CompareAndSwap(expected, desired) { + return desired - c64 + 1 + } + loopCount++ + } +} + +func (h *HLCService) Get(_ context.Context, _ *emptypb.Empty) (*hlcv1.GetResp, error) { + clk := h.allocateClock(1) + if clk == InvalidClock { + return nil, status.Error(codes.Internal, "get clock error") + } + return &hlcv1.GetResp{Clock: clk}, nil +} + +func (h *HLCService) BatchGet(_ context.Context, req *hlcv1.BatchGetReq) (*hlcv1.BatchGetResp, error) { + first := h.allocateClock(req.Count) + if first == InvalidClock { + return nil, status.Errorf(codes.Internal, "get clocks error") + } + resp := &hlcv1.BatchGetResp{ + Req: req, + First: first, + Clocks: nil, + } + if req.ReturnFirst { + return resp, nil + } + clocks := make([]int64, req.Count) + for i := uint32(0); i < req.Count; i++ { + clocks[i] = first + int64(i) + } + resp.Clocks = clocks + return resp, nil +} + +func _max(a, b int64) int64 { + if a >= b { + return a + } + return b +} + +var _ hlcv1.HCLServiceServer = (*HLCService)(nil) diff --git a/pkg/service/hlc_test.go b/pkg/service/hlc_test.go new file mode 100644 index 0000000..4bfefe0 --- /dev/null +++ b/pkg/service/hlc_test.go @@ -0,0 +1,78 @@ +package service_test + +import ( + "context" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dashjay/gohlc/api/hlcv1" + "github.com/dashjay/gohlc/pkg/service" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/emptypb" +) + +func TestHLC(t *testing.T) { + tempFile := filepath.Join(t.TempDir(), "temp-file") + hlc := service.NewHLCService(tempFile, 1) + hlc.Start() + var query int64 + var count int64 + + allocateOne := func() int64 { + resp, err := hlc.Get(context.TODO(), &emptypb.Empty{}) + assert.Nil(t, err) + assert.NotEqual(t, service.InvalidClock, resp.Clock) + return resp.Clock + } + + allocateBatch := func(in uint32) int64 { + resp, err := hlc.BatchGet(context.TODO(), &hlcv1.BatchGetReq{ + Count: in, + ReturnFirst: true, + }) + assert.Nil(t, err) + assert.NotEqual(t, service.InvalidClock, resp.First) + return resp.First + } + + start := time.Now() + wg := sync.WaitGroup{} + for i := 0; i < 200; i++ { + wg.Add(1) + times := 100000 + go func() { + defer wg.Done() + for j := 0; j < times; j++ { + _ = allocateOne() + } + atomic.AddInt64(&query, int64(times)) + atomic.AddInt64(&count, int64(times)) + }() + } + wg.Wait() + t.Logf("finished %d query & get %d clocks in %.2f sec", query, count, time.Since(start).Seconds()) + + query = 0 + count = 0 + + start = time.Now() + wg = sync.WaitGroup{} + for i := 0; i < 200; i++ { + wg.Add(1) + times := 100000 + per := 50 + go func() { + defer wg.Done() + for j := 0; j < times; j++ { + _ = allocateBatch(uint32(per)) + } + atomic.AddInt64(&query, int64(times)) + atomic.AddInt64(&count, int64(times*per)) + }() + } + wg.Wait() + t.Logf("finished %d query & get %d clocks in %.2f sec", query, count, time.Since(start).Seconds()) +}