Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpch support to develop before query processing changes #145

Merged
merged 19 commits into from
Aug 4, 2024
Merged
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d287f56
Added sourceOrdinal to load 'csv' like files (tpc-h data).
gamolina Jun 12, 2024
95c9708
Added tpc-h data producer to generate kinesis data, fix bugs so that …
gamolina Jun 15, 2024
2109e57
Added tpc-h data producer to generate kinesis data, fix bugs so that …
gamolina Jun 16, 2024
c887cd9
Added ability to use directly mapped surrogate keys to column IDs and…
gamolina Jun 20, 2024
f14c153
Eliminate the 'commit interval' parameter from kinesis consumer as it…
gamolina Jun 28, 2024
ab35e53
Refactor the consumer to properly batch incoming data for improved bu…
gamolina Jul 8, 2024
4c9f4c9
Add TPC-H config files.
gamolina Jul 8, 2024
e49756e
Add TPC-H create and drop scripts.
gamolina Jul 8, 2024
66bbd65
More burst capacity fixes post load test.
gamolina Jul 14, 2024
5e343f1
Fix incorrect error counter increment for flushes.
gamolina Jul 14, 2024
30d893d
Fix tests,,,, maybe
gamolina Jul 15, 2024
2ab0151
Disable part of the test suite for now.
gamolina Jul 15, 2024
a3fdd7e
Don't trap SIGQUIT, because we can't get thread dumps.
gamolina Jul 17, 2024
171d931
Fix deadlock in node Sync() code. Admin tool verbosity changes.
gamolina Jul 18, 2024
b768144
Partition info report feature added to admin tool
gamolina Jul 26, 2024
1e1474d
Added partition offlining/purge feature to admin tool.
gamolina Jul 27, 2024
1793bfd
Allow for batch updates and properly implement 'exclusive' bitmap pro…
gamolina Aug 1, 2024
0d2915d
Null out non-exclusive fields bug.
gamolina Aug 1, 2024
3c9b1c5
Fixed issues with DELETE statement, added update/delete mutation supp…
gamolina Aug 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added ability to use directly mapped surrogate keys to column IDs and…
… for foreign key relationship to leverage this to significantly improve load performance.
gamolina committed Jun 20, 2024

Verified

This commit was signed with the committer’s verified signature.
Kikobeats Kiko Beats
commit c887cd910282d38f192bffc5dea91e4c5b5ea4fc
18 changes: 0 additions & 18 deletions core/mapper.go
Original file line number Diff line number Diff line change
@@ -302,19 +302,6 @@ func ResolveMapper(attr *Attribute) (mapper Mapper, err error) {
return nil, fmt.Errorf("MappingStrategy is nil for '%s'", attr.FieldName)
}

/*
if attr.MappingStrategy == "Delegated" && attr.DelegationTarget == "" {
return nil, fmt.Errorf("DelegationTarget is nil for '%s'", attr.FieldName)
} else if attr.MappingStrategy == "Delegated" && attr.DelegationTarget != "" {
target, err2 := attr.Parent.GetAttribute(attr.DelegationTarget)
if err2 != nil {
return nil, fmt.Errorf("DelegationTarget not found/specified for '%s' - %v",
attr.FieldName, err2)
}
return ResolveMapper(target)
}
*/

if attr.MappingStrategy == "Custom" || attr.MappingStrategy == "CustomBSI" {
if attr.MapperConfig != nil {
if name, ok := attr.MapperConfig["name"]; ok {
@@ -331,11 +318,6 @@ func ResolveMapper(attr *Attribute) (mapper Mapper, err error) {

func lookupMapper(mapperName string, conf map[string]string) (Mapper, error) {

/*
if mapperName == "Undefined" || mapperName == "Delegated" {
return nil, nil
}
*/
mapperFactory, ok := mapperFactories[mapperName]
if !ok {
// Factory has not been registered.
26 changes: 20 additions & 6 deletions core/projector.go
Original file line number Diff line number Diff line change
@@ -502,15 +502,22 @@ func (p *Projector) fetchStrings(columnIDs []uint64, bsiResults map[string]map[s
//continue
}
trxColumnIDs = p.transposeFKColumnIDs(bsiResults[p.childTable][key], columnIDs)
//if v.MappingStrategy == "ParentRelation" {
nochild: if v.MappingStrategy == "ParentRelation" {
if strings.HasSuffix(v.ForeignKey, "@rownum") {
continue
}
if len(pka) > 1 {
return nil, fmt.Errorf("Projector error - Can only support single PK with link [%s]", key)
var pv *Attribute
/*
if len(pka) > 1 && pka.Parent.TimeQuantumType != "" {
return nil, fmt.Errorf("fetchStrings error - Can only support single PK with link [%s]", key)
}
pv := pka[0]
*/
if pka[0].Parent.TimeQuantumType == "" {
pv = pka[0]
} else {
pv = pka[1]
}

if pv.MappingStrategy != "StringHashBSI" {
continue
}
@@ -606,11 +613,18 @@ func (p *Projector) getRow(colID uint64, strMap map[string]map[interface{}]inter
return
}
if pka := p.getPKAttributes(relation); pka != nil {
/*
if len(pka) > 1 && !strings.HasSuffix(v.ForeignKey, "@rownum") {
err = fmt.Errorf("Projector error - Can only support single PK with link [%s]", v.FieldName)
err = fmt.Errorf("getRow error - Can only support single PK with link [%s]", v.FieldName)
return
}
pv := pka[0]
*/
var pv *Attribute
if pka[0].Parent.TimeQuantumType == "" {
pv = pka[0]
} else {
pv = pka[1]
}
if pv.MappingStrategy == "StringHashBSI" {
cid, err2 := p.checkColumnID(v, colID, child, bsiResults)
if err2 != nil {
116 changes: 39 additions & 77 deletions core/session.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"
@@ -106,10 +107,17 @@ func (t *TableBuffer) NextColumnID(bi *shared.BitmapIndex) error {
return nil
}

// HasPrimaryKey - Does this table have a primary key
func (t *TableBuffer) HasPrimaryKey() bool {
// ShouldLookupPrimaryKey - Does this table have a primary key
func (t *TableBuffer) ShouldLookupPrimaryKey() bool {

if t.PKAttributes[0].ColumnID {
return false
}

if t.Table.TimeQuantumType != "" && len(t.PKAttributes) > 1 {
if t.PKAttributes[1].ColumnID {
return false
}
return true
}
if t.Table.TimeQuantumType == "" && len(t.PKAttributes) > 0 {
@@ -340,7 +348,7 @@ func (s *Session) recursivePutRow(name string, row interface{}, pqTablePath stri
//if !s.Nested {
if !isChild {
// Directly provided parent columnID
if v.Type == "Integer" && (!relBuf.HasPrimaryKey() || fkFieldSpec == "@rownum") {
if v.Type == "Integer" && (!relBuf.ShouldLookupPrimaryKey() || fkFieldSpec == "@rownum") {
vals, _, err := s.readColumn(row, pqTablePath, &v, false, ignoreSourcePath, useNerdCapitalization)
//vals, _, err := s.readColumn(row, pqTablePath, &v, false, true, false)
if err != nil {
@@ -352,11 +360,19 @@ func (s *Session) recursivePutRow(name string, row interface{}, pqTablePath stri
if vals[0] == nil {
continue
}
if colId, ok := vals[0].(int64); !ok {
switch reflect.ValueOf(vals[0]).Kind() {
case reflect.String:
if colId, err := strconv.ParseInt(vals[0].(string), 10, 64); err == nil {
relColumnID = uint64(colId)
} else {
return fmt.Errorf("cannot parse string %v for parent relation %v type is %T",
vals[0], v.FieldName, vals[0])
}
case reflect.Int64:
relColumnID = uint64(vals[0].(int64))
default:
return fmt.Errorf("cannot cast %v to uint64 for parent relation %v type is %T",
vals[0], v.FieldName, vals[0])
} else {
relColumnID = uint64(colId)
}
} else { // Lookup based
//if v.SourceName == "" {
@@ -656,6 +672,7 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP
tbuf.CurrentTimestamp = time.Unix(0, 0)
}

directColumnID := false
tbuf.CurrentPKValue = make([]interface{}, len(tbuf.PKAttributes))
pqColPaths := make([]string, len(tbuf.PKAttributes))
var pkLookupVal strings.Builder
@@ -689,6 +706,12 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP
tbuf.CurrentTimestamp, _, _ = shared.ToTQTimestamp(tbuf.Table.TimeQuantumType, strVal)
}
}
if pk.ColumnID {
if cID, err := strconv.ParseInt(cval.(string), 10, 64); err == nil {
tbuf.CurrentColumnID = uint64(cID)
directColumnID = true
}
}
case reflect.Int64:
orig := cval.(int64)
cval = fmt.Sprintf("%d", orig)
@@ -725,7 +748,7 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP
}
}

if tbuf.HasPrimaryKey() {
if tbuf.ShouldLookupPrimaryKey() {
// Can't use batch operation here unfortunately, but at least we have local batch cache
localKey := indexPath(tbuf, tbuf.PKAttributes[0].FieldName, tbuf.Table.PrimaryKey+".PK")
if lColID, ok := s.BatchBuffer.LookupLocalCIDForString(localKey, pkLookupVal.String()); !ok {
@@ -754,14 +777,16 @@ func (s *Session) processPrimaryKey(tbuf *TableBuffer, row interface{}, pqTableP
u.Warnf("PK %s found in cache. PK mapping error?", pkLookupVal.String())
}
} else {
if providedColID == 0 {
// Generate new ColumnID. Lookup the sequencer from the local cache by TQ
errx := tbuf.NextColumnID(s.BitIndex)
if errx != nil {
return false, errx
if !directColumnID {
if providedColID == 0 {
// Generate new ColumnID. Lookup the sequencer from the local cache by TQ
errx := tbuf.NextColumnID(s.BitIndex)
if errx != nil {
return false, errx
}
} else {
tbuf.CurrentColumnID = providedColID
}
} else {
tbuf.CurrentColumnID = providedColID
}
}

@@ -1006,69 +1031,6 @@ func (s *Session) MapValue(tableName, fieldName string, value interface{}, updat
return attr.MapValue(value, nil) // Non load use case pass nil connection context
}

/*
func resolveJSColumnPathForField(jsTablePath string, v *Attribute, isChild bool) (jsColPath string) {
// BEGIN CUSTOM CODE FOR VISION
//if strings.HasSuffix(jsTablePath, "media.0") {
if v.Parent.Name == "media" {
jsColPath = fmt.Sprintf("events.0.event_tracktype_properties.%s", v.SourceName)
return
}
if v.Parent.Name == "events" {
s := strings.Split(v.SourceName, ".")
if len(s) > 1 {
switch s[0] {
case "media", "pzncon", "ad", "prompt", "api":
jsColPath = fmt.Sprintf("events.0.event_tracktype_properties.%s", s[1])
return
}
}
}
// END CUSTOM CODE FOR VISION
jsColPath = fmt.Sprintf("%s.%s", jsTablePath, v.SourceName)
if strings.HasPrefix(v.SourceName, "/") {
jsColPath = v.SourceName[1:]
} else if strings.HasPrefix(v.SourceName, "^") {
jsColPath = fmt.Sprintf("%s.%s", v.Parent.Name, v.SourceName[1:])
}
return
}
func readColumnByPath(path string, line []byte) []interface{} {
s := strings.Split(path, ".")
p := make([]interface{}, len(s))
var returnArray bool
for i, v := range s {
if val, err := strconv.ParseInt(v, 10, 32); err == nil {
p[i] = int(val)
returnArray = (i == len(s)-1)
} else {
p[i] = v
}
}
val := jsoniter.Get(line, p...)
if returnArray {
//return val.GetInterface()
return []interface{}{val.GetInterface()}
}
return []interface{}{val.GetInterface()}
}
func toJulianDay(t time.Time) (int32, int64) {
utc := t.UTC()
nanos := utc.UnixNano()
micros := nanos / time.Microsecond.Nanoseconds()
julianUs := micros + julianDayOfEpoch*microsPerDay
days := int32(julianUs / microsPerDay)
us := (julianUs % microsPerDay) * 1000
return days, us
}
*/

func fromJulianDay(days int32, nanos int64) time.Time {
nanos = ((int64(days)-julianDayOfEpoch)*microsPerDay + nanos/1000) * 1000
sec, nsec := nanos/time.Second.Nanoseconds(), nanos%time.Second.Nanoseconds()
35 changes: 31 additions & 4 deletions quanta-kinesis-consumer-lib/q-kinesis-lib.go
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ type Main struct {
HashTable *rendezvous.Table
shardChannels map[string]chan DataRecord
shardSessionCache map[string]*core.Session
shardSessionLock sync.Mutex
shardSessionLock sync.RWMutex
eg errgroup.Group
CancelFunc context.CancelFunc
CommitIntervalMs int
@@ -76,6 +76,8 @@ type Main struct {
ScanInterval int
metrics *cloudwatch.CloudWatch
tableCache *core.TableCacheStruct
metricsTicker *time.Ticker
commitTicker *time.Ticker
}

// NewMain allocates a new pointer to Main struct with empty record counter
@@ -226,7 +228,6 @@ func (m *Main) Init(customEndpoint string) (int, error) {
shardId := k
theChan := m.shardChannels[k]
m.eg.Go(func() error {
nextCommit := time.Now().Add(time.Millisecond * time.Duration(m.CommitIntervalMs))
for rec := range theChan {
shardTableKey := fmt.Sprintf("%v+%v", shardId, rec.TableName)
m.shardSessionLock.Lock()
@@ -248,9 +249,8 @@ func (m *Main) Init(customEndpoint string) (int, error) {
return err
}
m.processedRecs.Add(1)
if time.Now().After(nextCommit) {
if m.CommitIntervalMs == 0 {
conn.Flush()
nextCommit = time.Now().Add(time.Millisecond * time.Duration(m.CommitIntervalMs))
}
}
u.Errorf("shard channel closed. %v", shardId)
@@ -265,6 +265,10 @@ func (m *Main) Init(customEndpoint string) (int, error) {
})
}
m.HashTable = rendezvous.New(shardIds)
m.metricsTicker = m.PrintStats()
if m.CommitIntervalMs > 0 {
m.commitTicker = m.CommitTicker()
}
u.Infof("Created consumer. ")

return shardCount, nil
@@ -307,6 +311,12 @@ func (m *Main) MainProcessingLoop() error {

func (m *Main) Destroy() {

if m.CommitTicker != nil {
m.commitTicker.Stop()
}
if m.PrintStats != nil {
m.metricsTicker.Stop()
}
m.CancelFunc = nil
for _, v := range m.shardChannels {
close(v)
@@ -452,6 +462,23 @@ func (m *Main) PrintStats() *time.Ticker {
return t
}

// CommitTicker - Flush all sessions at a defined interval
func (m *Main) CommitTicker() *time.Ticker {

t := time.NewTicker(time.Millisecond * time.Duration(m.CommitIntervalMs))

go func() {
for range t.C {
m.shardSessionLock.RLock()
for _, v := range m.shardSessionCache {
v.Flush()
}
m.shardSessionLock.RUnlock()
}
}()
return t
}

func (m *Main) publishMetrics(upTime time.Duration, lastPublishedAt time.Time) time.Time {

interval := time.Since(lastPublishedAt).Seconds()
3 changes: 2 additions & 1 deletion shared/batch.go
Original file line number Diff line number Diff line change
@@ -285,7 +285,8 @@ func (c *BatchBuffer) SetValue(index, field string, columnID uint64, value int64
c.batchValues[index][field] = make(map[int64]*roaring64.BSI)
}
if bmap, ok := c.batchValues[index][field][ts.UnixNano()]; !ok {
b := roaring64.NewDefaultBSI()
//b := roaring64.NewDefaultBSI() // FIXME - possible bug in BSI libraries with zero values
b := roaring64.NewBSI(roaring64.Min64BitSigned, roaring64.Max64BitSigned)
b.SetValue(columnID, value)
c.batchValues[index][field][ts.UnixNano()] = b
bsize = b.BitCount()
1 change: 0 additions & 1 deletion shared/table.go
Original file line number Diff line number Diff line change
@@ -61,7 +61,6 @@ type BasicAttribute struct {
Searchable bool `yaml:"searchable,omitempty"`
DefaultValue string `yaml:"defaultValue,omitempty"`
ColumnID bool `yaml:"columnID,omitempty"`
ColumnIDMSV bool `yaml:"columnIDMSV,omitempty"`
IsTimeSeries bool `yaml:"isTimeSeries,omitempty"`
TimeQuantumType string `yaml:"timeQuantumType,omitempty"`
Exclusive bool `yaml:"exclusive,omitempty"`