From 4b10b07a985e896b94b0c4eb116f08990c92d330 Mon Sep 17 00:00:00 2001 From: "GUY.MOLINARI" Date: Fri, 8 Nov 2024 15:17:18 +0000 Subject: [PATCH] Code cleanup --- Makefile | 2 +- core/session.go | 52 +--- go.mod | 2 + .../quanta-kafka-consumer.go | 241 ------------------ server/bitmap.go | 22 +- 5 files changed, 19 insertions(+), 300 deletions(-) delete mode 100644 quanta-kafka-consumer/quanta-kafka-consumer.go diff --git a/Makefile b/Makefile index f8b637e..18b9552 100644 --- a/Makefile +++ b/Makefile @@ -147,7 +147,7 @@ test: build_all # mkdir -p $(COVERAGE_DIR) # export TZ=UTC; go test ${GOLIST} -short -v ${LDFLAGS_TEST} -coverprofile ${COV_PROFILE} # go tool cover -html=${COV_PROFILE} -o ${COV_HTML} - ./test/run-go-tests.sh. + ./test/run-go-tests.sh ifeq ($(UNAME), Darwin) # open ${COV_HTML} endif diff --git a/core/session.go b/core/session.go index 7322135..12f5625 100644 --- a/core/session.go +++ b/core/session.go @@ -699,7 +699,6 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP cval = vals[0] tbuf.CurrentPKValue[i] = cval -// NEW IMPLEMENTATION STARTS var strVal string mval, err := pk.MapValue(cval, nil, false) if err != nil { @@ -708,7 +707,10 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP } switch shared.TypeFromString(pk.Type) { case shared.String: - strVal = cval.(string) + var ok bool + if strVal, ok = cval.(string); !ok { + strVal = pk.Render(mval) + } case shared.Date, shared.DateTime: strVal = pk.Render(mval) if i == 0 { // First field in PK is TQ (if TQ != "") @@ -732,52 +734,6 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP strVal = pk.Render(mval) } -/* REFACTOR THIS - switch reflect.ValueOf(cval).Kind() { - case reflect.String: - // Do nothing already a string - if i == 0 { // First field in PK is TQ (if TQ != "") - if pk.MappingStrategy == "SysMillisBSI" || pk.MappingStrategy == "SysMicroBSI" { - strVal := cval.(string) - tbuf.CurrentTimestamp, _, _ = shared.ToTQTimestamp(tbuf.Table.TimeQuantumType, strVal) - } - } - if pk.ColumnID { - if cID, err := strconv.ParseInt(cval.(string), 10, 64); err == nil { - tbuf.CurrentColumnID = uint64(cID) - directColumnID = true - } - } - case reflect.Int64: - orig := cval.(int64) - cval = fmt.Sprintf("%d", orig) - - if i == 0 { - tFormat := shared.YMDTimeFmt - if tbuf.Table.TimeQuantumType == "YMDH" { - tFormat = shared.YMDHTimeFmt - } - if pk.MappingStrategy == "SysMillisBSI" || pk.MappingStrategy == "SysMicroBSI" { - ts := time.Unix(0, orig*1000000) - if pk.MappingStrategy == "SysMicroBSI" { - ts = time.Unix(0, orig*1000) - } - tbuf.CurrentTimestamp, _, _ = shared.ToTQTimestamp(tbuf.Table.TimeQuantumType, ts.Format(tFormat)) - } - } - case reflect.Float64: - orig := cval.(float64) - f := fmt.Sprintf("%%10.%df", pk.Scale) - cval = fmt.Sprintf(f, orig) - case reflect.Float32: - orig := cval.(float32) - f := fmt.Sprintf("%%10.%df", pk.Scale) - cval = fmt.Sprintf(f, orig) - default: - return false, fmt.Errorf("PK Lookup value [%v] unknown type, it is [%v]", cval, - reflect.ValueOf(cval).Kind()) - } -*/ if pkLookupVal.Len() == 0 { pkLookupVal.WriteString(strVal) } else { diff --git a/go.mod b/go.mod index 9d4386f..77ff4e2 100644 --- a/go.mod +++ b/go.mod @@ -144,3 +144,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/RoaringBitmap/roaring/v2 => /home/ec2-user/roaring diff --git a/quanta-kafka-consumer/quanta-kafka-consumer.go b/quanta-kafka-consumer/quanta-kafka-consumer.go deleted file mode 100644 index 177d77c..0000000 --- a/quanta-kafka-consumer/quanta-kafka-consumer.go +++ /dev/null @@ -1,241 +0,0 @@ -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - "runtime" - "strings" - "sync" - "time" - - "github.com/disney/quanta/core" - "github.com/disney/quanta/shared" - "github.com/hashicorp/consul/api" - "gopkg.in/alecthomas/kingpin.v2" - "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" -) - -// Variables to identify the build -var ( - Version string - Build string - EPOCH, _ = time.ParseInLocation(time.RFC3339, "2000-01-01T00:00:00+00:00", time.UTC) -) - -// Exit Codes -const ( - Success = 0 -) - -// Main strct defines command line arguments variables and various global meta-data associated with record loads. -type Main struct { - SchemaDir string - Index string - BufferSize uint - totalBytes int64 - bytesLock sync.RWMutex - totalRecs *Counter - Port int - ConsulAddr string - ConsulClient *api.Client - lock *api.Lock - conns []*core.Session - KafkaBroker string - KafkaGroup string - KafkaTopics []string - consumer *kafka.Consumer -} - -// NewMain allocates a new pointer to Main struct with empty record counter -func NewMain() *Main { - m := &Main{ - totalRecs: &Counter{}, - } - return m -} - -func main() { - - app := kingpin.New(os.Args[0], "Quanta kafka data loader consumer").DefaultEnvars() - app.Version("Version: " + Version + "\nBuild: " + Build) - - schemaDir := app.Arg("schema-dir-name", "Directory path for config/schema files.").Required().String() - index := app.Arg("index", "Table name (root name if nested schema)").Required().String() - broker := app.Arg("broker", "Kafka broker host").Required().String() - group := app.Arg("group", "Kafka group").Required().String() - topics := app.Arg("topics", "CSV list of Kafka topics").Required().String() - port := app.Arg("port", "Port number for service").Default("4000").Int32() - bufSize := app.Flag("buf-size", "Buffer size").Default("1000000").Int32() - environment := app.Flag("env", "Environment [DEV, QA, STG, VAL, PROD]").Default("DEV").String() - consul := app.Flag("consul-endpoint", "Consul agent address/port").Default("127.0.0.1:8500").String() - - shared.InitLogging("WARN", *environment, "Kafka-Consumer", Version, "Quanta") - - kingpin.MustParse(app.Parse(os.Args[1:])) - - main := NewMain() - main.Index = *index - main.BufferSize = uint(*bufSize) - main.SchemaDir = *schemaDir - main.Port = int(*port) - main.ConsulAddr = *consul - main.KafkaBroker = *broker - main.KafkaGroup = *group - main.KafkaTopics = strings.Split(*topics, ",") - - log.Printf("Index name %v.", main.Index) - log.Printf("Buffer size %d.", main.BufferSize) - log.Printf("Base path for schema [%s].", main.SchemaDir) - log.Printf("Service port %d.", main.Port) - log.Printf("Consul agent at [%s]\n", main.ConsulAddr) - log.Printf("Kafka broker host %s.", main.KafkaBroker) - log.Printf("Kafka group %s.", main.KafkaGroup) - log.Printf("Kafka topics %v.", main.KafkaTopics) - - if err := main.Init(); err != nil { - log.Fatal(err) - } - - msgChan := make(chan []byte, main.BufferSize) - main.conns = make([]*core.Session, runtime.NumCPU()) - - var ticker *time.Ticker - ticker = main.printStats() - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - for range c { - log.Printf("Interrupted, Bytes processed: %s, Records: %v", core.Bytes(main.BytesProcessed()), - main.totalRecs.Get()) - close(msgChan) - main.consumer.Close() - ticker.Stop() - os.Exit(0) - } - }() - - tcs := core.NewTableCacheStruct() - - // Spin up workers - for n := 0; n < runtime.NumCPU(); n++ { - go func(i int) { - var err error - main.conns[i], err = core.OpenSession(tcs, main.SchemaDir, main.Index, true, nil) - if err != nil { - log.Fatalf("Error opening connection %v", err) - } - for msg := range msgChan { - err = main.conns[i].PutRow(main.Index, nil, 0, false, false) - if err != nil { - log.Printf("ERROR %v", err) - } - main.totalRecs.Add(1) - main.AddBytes(len(msg)) - } - main.conns[i].CloseSession() - }(n) - } - - // Main processing loop - go func() { - for { - msg, err := main.consumer.ReadMessage(-1) - if err == nil { - msgChan <- msg.Value - //log.Printf("Message on %s: %s", msg.TopicPartition, string(msg.Value)) - } else { - // The client will automatically try to recover from all errors. - log.Printf("Consumer error: %v (%v)", err, msg) - } - } - }() - <-c -} - -func exitErrorf(msg string, args ...interface{}) { - fmt.Fprintf(os.Stderr, msg+"\n", args...) - os.Exit(1) -} - -// Init function initilizations loader. -// Establishes session with bitmap server and Kafka -func (m *Main) Init() error { - - var err error - - m.ConsulClient, err = api.NewClient(&api.Config{Address: m.ConsulAddr}) - if err != nil { - return err - } - - m.consumer, err = kafka.NewConsumer(&kafka.ConfigMap{ - "bootstrap.servers": m.KafkaBroker, - "group.id": m.KafkaGroup, - "auto.offset.reset": "earliest", - }) - if err != nil { - return err - } - log.Printf("Created consumer %v", m.consumer) - - m.consumer.SubscribeTopics(m.KafkaTopics, nil) - return nil -} - -// printStats outputs to Log current status of Kafka consumer -// Includes data on processed: bytes, records, time duration in seconds, and rate of bytes per sec" -func (m *Main) printStats() *time.Ticker { - t := time.NewTicker(time.Second * 10) - start := time.Now() - go func() { - for range t.C { - duration := time.Since(start) - bytes := m.BytesProcessed() - log.Printf("Bytes: %s, Records: %v, Duration: %v, Rate: %v/s", core.Bytes(bytes), m.totalRecs.Get(), duration, core.Bytes(float64(bytes)/duration.Seconds())) - for i := 0; i < len(m.conns); i++ { - m.conns[i].Flush() - } - } - }() - return t -} - -// AddBytes provides thread safe processing to set the total bytes processed. -// Adds the bytes parameter to total bytes processed. -func (m *Main) AddBytes(n int) { - m.bytesLock.Lock() - m.totalBytes += int64(n) - m.bytesLock.Unlock() -} - -// BytesProcessed provides thread safe read of total bytes processed. -func (m *Main) BytesProcessed() (num int64) { - m.bytesLock.Lock() - num = m.totalBytes - m.bytesLock.Unlock() - return -} - -// Counter - Generic counter with mutex (threading) support -type Counter struct { - num int64 - lock sync.Mutex -} - -// Add function provides thread safe addition of counter value based on input parameter. -func (c *Counter) Add(n int) { - c.lock.Lock() - c.num += int64(n) - c.lock.Unlock() -} - -// Get function provides thread safe read of counter value. -func (c *Counter) Get() (ret int64) { - c.lock.Lock() - ret = c.num - c.lock.Unlock() - return -} diff --git a/server/bitmap.go b/server/bitmap.go index b1906fa..1d9058f 100644 --- a/server/bitmap.go +++ b/server/bitmap.go @@ -11,7 +11,7 @@ import ( "io" "unsafe" - "math" + //"math" "os" "path/filepath" "runtime" @@ -330,12 +330,12 @@ type BSIBitmap struct { func (m *BitmapIndex) newBSIBitmap(index, field string) *BSIBitmap { attr, err := m.getFieldConfig(index, field) - var minValue, maxValue int64 +// var minValue, maxValue int64 var timeQuantumType string if err == nil { timeQuantumType = attr.TimeQuantumType - minValue = int64(attr.MinValue) - maxValue = int64(attr.MaxValue) +// minValue = int64(attr.MinValue) +// maxValue = int64(attr.MaxValue) } var seq *SequencerQueue if attr.Parent.PrimaryKey != "" || attr.Parent.TimeQuantumField != "" { @@ -343,13 +343,15 @@ func (m *BitmapIndex) newBSIBitmap(index, field string) *BSIBitmap { if attr.FieldName == pkInfo[0].FieldName { // If compound key, sequencer installed on first key attr seq = NewSequencerQueue() - if maxValue == 0 { - maxValue = math.MaxInt64 - } +// if maxValue == 0 { +// maxValue = math.MaxInt64 +// } } } ts := time.Now() - return &BSIBitmap{BSI: roaring64.NewBSI(maxValue, minValue), +// return &BSIBitmap{BSI: roaring64.NewBSI(maxValue, minValue), +// TQType: timeQuantumType, ModTime: ts, AccessTime: ts, sequencerQueue: seq} + return &BSIBitmap{BSI: roaring64.NewDefaultBSI(), TQType: timeQuantumType, ModTime: ts, AccessTime: ts, sequencerQueue: seq} } @@ -1438,9 +1440,9 @@ func (m *BitmapIndex) OfflinePartitions(ctx context.Context, req *pb.PartitionIn ts := time.Unix(0, req.Time) if req.Index != "" { - u.Info("Offline partition request for %v, table = %s", ts.Format(timeFmt), req.Index) + u.Infof("Offline partition request for %v, table = %s", ts.Format(timeFmt), req.Index) } else { - u.Info("Offline partition request for %v, all partitioned tables", ts.Format(timeFmt)) + u.Infof("Offline partition request for %v, all partitioned tables", ts.Format(timeFmt)) } // Iterate over shard cache insert into partition operation queue