Skip to content

Commit

Permalink
Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nikunjgit committed Sep 28, 2018
1 parent 7a83b62 commit 350cdc1
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 27 deletions.
10 changes: 6 additions & 4 deletions src/query/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ type Result struct {
// ConsolidationFunc consolidates a bunch of datapoints into a single float value
type ConsolidationFunc func(datapoints ts.Datapoints) float64

// TakeLast is a consolidation function which takes the last datapoint
// TakeLast is a consolidation function which takes the last datapoint which has non nan value
func TakeLast(values ts.Datapoints) float64 {
if len(values) == 0 {
return math.NaN()
for i := len(values) - 1; i >= 0; i-- {
if !math.IsNaN(values[i].Value) {
return values[i].Value
}
}

return values[len(values)-1].Value
return math.NaN()
}
17 changes: 15 additions & 2 deletions src/query/executor/transform/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,23 @@ func (s *seriesIter) Next() bool {
}

type lazyBlock struct {
mu sync.Mutex

mu sync.Mutex
rawBlock block.Block
lazyNode *lazyNode
ID parser.NodeID
processedBlock block.Block
processError error
}

// Unconsolidated returns the unconsolidated version for the block
func (f *lazyBlock) Unconsolidated() (block.UnconsolidatedBlock, error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.Unconsolidated()
}
Expand All @@ -165,6 +169,10 @@ func (f *lazyBlock) StepIter() (block.StepIter, error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.StepIter()
}
Expand Down Expand Up @@ -194,6 +202,10 @@ func (f *lazyBlock) SeriesIter() (block.SeriesIter, error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.SeriesIter()
}
Expand Down Expand Up @@ -226,6 +238,7 @@ func (f *lazyBlock) Close() error {
func (f *lazyBlock) process() error {
err := f.lazyNode.fNode.Process(f.ID, f.rawBlock)
if err != nil {
f.processError = err
return err
}

Expand Down
19 changes: 12 additions & 7 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *baseNode) Process(ID parser.NodeID, b block.Block) error {
}

if unconsolidatedBlock == nil {
return fmt.Errorf("block needs to be unconsolidated for the op: %s", c.op)
return fmt.Errorf("block needs to be unconsolidated for temporal operations: %s", c.op)
}

iter, err := unconsolidatedBlock.StepIter()
Expand Down Expand Up @@ -304,15 +304,20 @@ func (c *baseNode) processSingleRequest(request processRequest) error {
// TODO: Consider using a rotating slice since this is inefficient
if desiredLength <= len(values) {
values = values[len(values)-desiredLength:]
flattenedValues := make(ts.Datapoints, 0)
flattenedValues := make(ts.Datapoints, 0, len(values))
for _, dps := range values {
for _, dp := range dps {
if dp.Timestamp.Before(oldestDatapointTimestamp) {
continue
var (
idx int
dp ts.Datapoint
)
for idx, dp = range dps {
// Keep going until we find datapoints at the oldest timestamp for window
if !dp.Timestamp.Before(oldestDatapointTimestamp) {
break
}

flattenedValues = append(flattenedValues, dp)
}

flattenedValues = append(flattenedValues, dps[idx:]...)
}

newVal = c.processor.Process(flattenedValues)
Expand Down
26 changes: 13 additions & 13 deletions src/query/storage/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,41 +49,41 @@ func NewMultiBlockWrapper(unconsolidatedBlock block.UnconsolidatedBlock) block.B
}

type multiBlockWrapper struct {
unconsolidated block.UnconsolidatedBlock
consolidated block.Block
mu sync.Mutex
unconsolidated block.UnconsolidatedBlock
consolidated block.Block
consolidateError error
once sync.Once
}

func (m *multiBlockWrapper) Unconsolidated() (block.UnconsolidatedBlock, error) {
return m.unconsolidated, nil
}

func (m *multiBlockWrapper) StepIter() (block.StepIter, error) {
if err := m.tryConsolidate(); err != nil {
if err := m.consolidate(); err != nil {
return nil, err
}

return m.consolidated.StepIter()
}

func (m *multiBlockWrapper) tryConsolidate() error {
m.mu.Lock()
defer m.mu.Unlock()

if m.consolidated == nil {
func (m *multiBlockWrapper) consolidate() error {
m.once.Do(func() {
consolidated, err := m.unconsolidated.Consolidate()
if err != nil {
return err
m.consolidateError = err
return
}

m.consolidated = consolidated
}

return nil
})

return m.consolidateError
}

func (m *multiBlockWrapper) SeriesIter() (block.SeriesIter, error) {
if err := m.tryConsolidate(); err != nil {
if err := m.consolidate(); err != nil {
return nil, err
}

Expand Down
4 changes: 3 additions & 1 deletion src/query/ts/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func (d Datapoints) AlignToBounds(bounds models.Bounds) []Datapoints {
steps := bounds.Steps()
stepValues := make([]Datapoints, steps)
dpIdx := 0
stepSize := bounds.StepSize
t := bounds.Start
for i := 0; i < steps; i++ {
startIdx := dpIdx
t, _ := bounds.TimeForIndex(i)
for dpIdx < numDatapoints && d[dpIdx].Timestamp.Before(t) {
dpIdx++
}
Expand All @@ -97,6 +98,7 @@ func (d Datapoints) AlignToBounds(bounds models.Bounds) []Datapoints {
}

stepValues[i] = singleStepValues
t = t.Add(stepSize)
}

return stepValues
Expand Down

0 comments on commit 350cdc1

Please sign in to comment.