Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Isolation #306

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ type BlockReader interface {
Tombstones() (TombstoneReader, error)
}

// Appendable defines an entity to which data can be appended.
type Appendable interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there an incompatibility issue with removal of public interface?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we don't use this interface in tsdb itself. I am not sure we want to provide any guarantees about stability yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but TSDB is a library. Anyone can depends on that including Prometheus itself. But I don't know, maybe the policy for TSDB is just to always vendor and pin to some version, so this is fine.

// Appender returns a new Appender against an underlying store.
Appender() Appender
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand Down
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func (a dbAppender) Commit() error {
default:
}
}

return err
}

Expand Down Expand Up @@ -683,7 +684,9 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {

sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
db: db,
}

for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {
Expand Down
131 changes: 131 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"math/rand"
"os"
"sort"
"strconv"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/stretchr/testify/require"
)

func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
Expand Down Expand Up @@ -55,6 +57,10 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
}
testutil.Ok(t, it.Err())

if len(samples) == 0 {
continue
}

name := series.Labels().String()
result[name] = samples
}
Expand Down Expand Up @@ -892,3 +898,128 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {

return result, ss.Err()
}

func TestDBCannotSeePartialCommits(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)

db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
defer db.Close()

stop := make(chan struct{})
firstInsert := make(chan struct{})

// Insert data in batches.
go func() {
iter := 0
for {
app := db.Appender()

for j := 0; j < 100; j++ {
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
require.NoError(t, err)
}
err = app.Commit()
require.NoError(t, err)

if iter == 0 {
close(firstInsert)
}
iter++

select {
case <-stop:
return
default:
}
}
}()

<-firstInsert

// This is a race condition, so do a few tests to tickle it.
// Usually most will fail.
inconsistencies := 0
for i := 0; i < 10; i++ {
func() {
querier, err := db.Querier(0, 1000000)
testutil.Ok(t, err)
defer querier.Close()

ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
testutil.Ok(t, err)

seriesSet := readSeriesSet(t, ss)

require.NoError(t, err)
values := map[float64]struct{}{}
for _, series := range seriesSet {
values[series[len(series)-1].v] = struct{}{}
}
if len(values) != 1 {
inconsistencies++
}
}()
}
stop <- struct{}{}

require.Equal(t, 0, inconsistencies, "Some queries saw inconsistent results.")
}

func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)

db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
defer db.Close()

querier, err := db.Querier(0, 1000000)
testutil.Ok(t, err)
defer querier.Close()

app := db.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
require.NoError(t, err)
// This commit is after the querier is created, so should not be returned.
err = app.Commit()
require.NoError(t, err)

ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
testutil.Ok(t, err)

seriesSet := readSeriesSet(t, ss)
require.Equal(t, map[string][]sample{}, seriesSet)

querier, err = db.Querier(0, 1000000)
testutil.Ok(t, err)
defer querier.Close()

ss, err = querier.Select(labels.NewEqualMatcher("foo", "bar"))
seriesSet = readSeriesSet(t, ss)
require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}})
}

func readSeriesSet(t *testing.T, ss SeriesSet) map[string][]sample {
seriesSet := make(map[string][]sample)
for ss.Next() {
series := ss.At()

samples := []sample{}
it := series.Iterator()
for it.Next() {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
}
if len(samples) == 0 {
continue
}

name := series.Labels().String()
seriesSet[name] = samples
}
testutil.Ok(t, ss.Err())

return seriesSet
}
Loading