Skip to content

Commit

Permalink
Address build errors
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew DeVenny <[email protected]>
  • Loading branch information
matthewdevenny committed Mar 3, 2022
1 parent 8f13661 commit 150f7bb
Showing 1 changed file with 69 additions and 73 deletions.
142 changes: 69 additions & 73 deletions pkg/drivers/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
)

const (
kineBucket = "kine"
revHistory = 12
kSlowMethodMilliseconds = 500
kineBucket = "kine"
revHistory = 12
slowMethodMilliseconds = 500
)

var (
Expand Down Expand Up @@ -127,7 +127,7 @@ func (j *JetStream) Get(ctx context.Context, key string, revision int64) (revRet
size = len(kvRet.Value)
}
fStr := "GET %s, rev=%d => revRet=%d, kv=%v, size=%d, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.Milliseconds())
} else {
logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, size, errRet, duration.Milliseconds())
Expand Down Expand Up @@ -214,7 +214,7 @@ func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease
defer func() {
duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond())
fStr := "CREATE %s, size=%d, lease=%d => rev=%d, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, key, len(value), lease, revRet, errRet, duration.Milliseconds())
} else {
logrus.Tracef(fStr, key, len(value), lease, revRet, errRet, duration.Milliseconds())
Expand Down Expand Up @@ -269,13 +269,12 @@ func (j *JetStream) Create(ctx context.Context, key string, value []byte, lease
return 0, err
}
return int64(seq), nil
} else {
seq, err := j.kvBucket.Create(key, event)
if err != nil {
return 0, err
}
return int64(seq), nil
}
seq, err := j.kvBucket.Create(key, event)
if err != nil {
return 0, err
}
return int64(seq), nil
}

func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
Expand All @@ -284,7 +283,7 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev
defer func() {
duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond())
fStr := "DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.Milliseconds())
} else {
logrus.Tracef(fStr, key, revision, revRet, kvRet != nil, deletedRet, errRet, duration.Milliseconds())
Expand All @@ -305,9 +304,8 @@ func (j *JetStream) Delete(ctx context.Context, key string, revision int64) (rev
if err != nil {
if err == nats.ErrKeyNotFound {
return rev, nil, true, nil
} else {
return rev, nil, false, err
}
return rev, nil, false, err
}

if value == nil {
Expand Down Expand Up @@ -357,7 +355,7 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re
defer func() {
duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond())
fStr := "LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.Milliseconds())
} else {
logrus.Tracef(fStr, prefix, startKey, limit, revision, revRet, len(kvRet), errRet, duration.Milliseconds())
Expand Down Expand Up @@ -413,17 +411,17 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re
}
}
}
var nextRevId = minRev
var nextRevID = minRev
var nextRevision nats.KeyValueEntry
for k, v := range histories {
logrus.Debugf("Checking %s history", k)
for i := len(v) - 1; i >= 0; i-- {
if int64(v[i].Revision()) > nextRevId && int64(v[i].Revision()) <= revision {
nextRevId = int64(v[i].Revision())
if int64(v[i].Revision()) > nextRevID && int64(v[i].Revision()) <= revision {
nextRevID = int64(v[i].Revision())
nextRevision = v[i]
logrus.Debugf("found next rev=%d", nextRevId)
logrus.Debugf("found next rev=%d", nextRevID)
break
} else if int64(v[i].Revision()) <= nextRevId {
} else if int64(v[i].Revision()) <= nextRevID {
break
}
}
Expand All @@ -437,67 +435,65 @@ func (j *JetStream) List(ctx context.Context, prefix, startKey string, limit, re
}

return rev, kvs, nil
}

} else {

current := true
current := true

if revision != 0 {
rev = revision
current = false
}
if revision != 0 {
rev = revision
current = false
}

if current {
if current {

entries, err := j.getKeyValues(ctx, prefix, true)
if err != nil {
return 0, nil, err
}
for _, e := range entries {
if count < limit || limit == 0 {
kv, err := decode(e)
if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil {
kvs = append(kvs, kv.KV)
count++
}
} else {
break
entries, err := j.getKeyValues(ctx, prefix, true)
if err != nil {
return 0, nil, err
}
for _, e := range entries {
if count < limit || limit == 0 {
kv, err := decode(e)
if !j.isKeyExpired(ctx, e.Created(), &kv) && err == nil {
kvs = append(kvs, kv.KV)
count++
}
} else {
break
}
}

} else {
keys, err := j.getKeys(ctx, prefix, true)
if err != nil {
return 0, nil, err
}
if revision == 0 && len(keys) == 0 {
return rev, nil, nil
}
} else {
keys, err := j.getKeys(ctx, prefix, true)
if err != nil {
return 0, nil, err
}
if revision == 0 && len(keys) == 0 {
return rev, nil, nil
}

for _, key := range keys {
if count < limit || limit == 0 {
if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil {
for i := len(history) - 1; i >= 0; i-- {
if int64(history[i].Revision()) <= revision {
if entry, err := decode(history[i]); err == nil {
kvs = append(kvs, entry.KV)
count++
} else {
logrus.Warnf("Could not decode %s rev=> %d", key, history[i].Revision())
}
break
for _, key := range keys {
if count < limit || limit == 0 {
if history, err := j.kvBucket.History(key, nats.Context(ctx)); err == nil {
for i := len(history) - 1; i >= 0; i-- {
if int64(history[i].Revision()) <= revision {
if entry, err := decode(history[i]); err == nil {
kvs = append(kvs, entry.KV)
count++
} else {
logrus.Warnf("Could not decode %s rev=> %d", key, history[i].Revision())
}
break
}
} else {
// should not happen
logrus.Warnf("no history for %s", key)
}
} else {
// should not happen
logrus.Warnf("no history for %s", key)
}
}

}
return rev, kvs, nil

}
return rev, kvs, nil
}

func (j *JetStream) listAfter(ctx context.Context, prefix string, revision int64) (revRet int64, eventRet []*server.Event, errRet error) {
Expand Down Expand Up @@ -543,7 +539,7 @@ func (j *JetStream) Count(ctx context.Context, prefix string) (revRet int64, cou
defer func() {
duration := time.Duration(time.Now().Nanosecond() - start.Nanosecond())
fStr := "COUNT %s => rev=%d, count=%d, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, prefix, revRet, count, err, duration.Milliseconds())
} else {
logrus.Tracef(fStr, prefix, revRet, count, err, duration.Milliseconds())
Expand Down Expand Up @@ -579,7 +575,7 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi
kvRev = kvRet.ModRevision
}
fStr := "UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v, duration=%d"
if duration.Milliseconds() > kSlowMethodMilliseconds {
if duration.Milliseconds() > slowMethodMilliseconds {
logrus.Warnf(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.Milliseconds())
} else {
logrus.Tracef(fStr, key, len(value), revision, lease, revRet, kvRev, updateRet, errRet, duration.Milliseconds())
Expand Down Expand Up @@ -648,16 +644,16 @@ func (j *JetStream) Update(ctx context.Context, key string, value []byte, revisi

func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {

watchCtx, _ := context.WithCancel(ctx)
//watchCtx, _ := context.WithCancel(ctx)

//logrus.Tracef("WATCH %s, rev=%d", prefix, revision)

watcher, err := j.kvBucket.(*kv.EncodedKV).WatchWithCtx(watchCtx, prefix, nats.IgnoreDeletes())
watcher, err := j.kvBucket.(*kv.EncodedKV).WatchWithCtx(ctx, prefix, nats.IgnoreDeletes())

if revision > 0 {
revision--
}
_, events, err := j.listAfter(watchCtx, prefix, revision)
_, events, err := j.listAfter(ctx, prefix, revision)

if err != nil {
logrus.Errorf("failed to create watcher %s for revision %d", prefix, revision)
Expand Down Expand Up @@ -702,7 +698,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-
if err != nil {
logrus.Warnf("watch event: could not decode %s seq %d", i.Key(), i.Revision())
}
if _, prevEntry, prevErr := j.get(watchCtx, i.Key(), value.PrevRevision, false); prevErr == nil {
if _, prevEntry, prevErr := j.get(ctx, i.Key(), value.PrevRevision, false); prevErr == nil {
if prevEntry != nil {
prevValue = *prevEntry
}
Expand Down Expand Up @@ -737,7 +733,7 @@ func (j *JetStream) Watch(ctx context.Context, prefix string, revision int64) <-
}
// }
}
case <-watchCtx.Done():
case <-ctx.Done():
logrus.Infof("watcher: %s context cancelled", prefix)
if err := watcher.Stop(); err != nil && err != nats.ErrBadSubscription {
logrus.Warnf("error stopping %s watcher: %v", prefix, err)
Expand Down

0 comments on commit 150f7bb

Please sign in to comment.