Skip to content

Commit

Permalink
Add write ahead log
Browse files Browse the repository at this point in the history
This commit adds a write ahead log to the shard. Entries are cached
in memory and periodically flushed back into the index. The WAL and
the cache are both partitioned into buckets so that flushing doesn't
stop the world as long.
  • Loading branch information
benbjohnson committed Jun 25, 2015
1 parent 9eb5840 commit b574e2f
Show file tree
Hide file tree
Showing 8 changed files with 681 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [2650](https://github.com/influxdb/influxdb/pull/2650): Add SHOW GRANTS FOR USER statement. Thanks @n1tr0g.
- [3125](https://github.com/influxdb/influxdb/pull/3125): Graphite Input Protocol Parsing
- [2746](https://github.com/influxdb/influxdb/pull/2746): New Admin UI/interface
- [3036](https://github.com/influxdb/influxdb/pull/3036): Write Ahead Log (WAL)

### Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func NewServer(c *Config, version string) (*Server, error) {
reportingDisabled: c.ReportingDisabled,
}

// Copy TSDB configuration.
s.TSDBStore.MaxWALSize = c.Data.MaxWALSize
s.TSDBStore.WALFlushInterval = time.Duration(c.Data.WALFlushInterval)

// Initialize query executor.
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
s.QueryExecutor.MetaStore = s.MetaStore
Expand Down
11 changes: 11 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ const (

// DefaultRetentionCheckPeriod is the period of time between retention policy checks are run
DefaultRetentionCheckPeriod = 10 * time.Minute

// DefaultMaxWALSize is the default size of the WAL before it is flushed.
DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB

// DefaultWALFlushInterval is the frequency the WAL will get flushed if
// it doesn't reach its size threshold.
DefaultWALFlushInterval = 10 * time.Minute
)

type Config struct {
Dir string `toml:"dir"`
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
RetentionAutoCreate bool `toml:"retention-auto-create"`
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod toml.Duration `toml:"retention-check-period"`
Expand All @@ -31,6 +40,8 @@ type Config struct {

func NewConfig() Config {
return Config{
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
RetentionAutoCreate: DefaultRetentionAutoCreate,
RetentionCheckEnabled: DefaultRetentionCheckEnabled,
RetentionCheckPeriod: toml.Duration(DefaultRetentionCheckPeriod),
Expand Down
88 changes: 71 additions & 17 deletions tsdb/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,81 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
store, executor := testStoreAndExecutor()
defer os.RemoveAll(store.path)

pt := NewPoint(
// Write first point.
if err := store.WriteToShard(shardID, []Point{NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
)}); err != nil {
t.Fatalf(err.Error())
}

err := store.WriteToShard(shardID, []Point{pt})
if err != nil {
// Write second point.
if err := store.WriteToShard(shardID, []Point{NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(2, 3),
)}); err != nil {
t.Fatalf(err.Error())
}

pt.SetTime(time.Unix(2, 3))
err = store.WriteToShard(shardID, []Point{pt})
if err != nil {
got := executeAndGetJSON("select * from cpu", executor)
exepected := `[{"series":[{"name":"cpu","tags":{"host":"server"},"columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}

store.Close()
store = NewStore(store.path)
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
executor.store = store

got = executeAndGetJSON("select * from cpu", executor)
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
}

// Ensure that points can be written and flushed even after a restart.
func TestWritePointsAndExecuteQuery_FlushRestart(t *testing.T) {
store, executor := testStoreAndExecutor()
defer os.RemoveAll(store.path)

// Write first point.
if err := store.WriteToShard(shardID, []Point{NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)}); err != nil {
t.Fatalf(err.Error())
}

// Write second point.
if err := store.WriteToShard(shardID, []Point{NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(2, 3),
)}); err != nil {
t.Fatalf(err.Error())
}

// Restart the store.
if err := store.Close(); err != nil {
t.Fatal(err)
} else if err = store.Open(); err != nil {
t.Fatal(err)
}

// Flush WAL data to the index.
if err := store.Flush(); err != nil {
t.Fatal(err)
}

got := executeAndGetJSON("select * from cpu", executor)
exepected := `[{"series":[{"name":"cpu","tags":{"host":"server"},"columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]`
Expand All @@ -44,8 +102,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {

store.Close()
store = NewStore(store.path)
err = store.Open()
if err != nil {
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
executor.store = store
Expand Down Expand Up @@ -127,9 +184,8 @@ func TestDropMeasurementStatement(t *testing.T) {
time.Unix(1, 2),
)

err := store.WriteToShard(shardID, []Point{pt, pt2})
if err != nil {
t.Fatalf(err.Error())
if err := store.WriteToShard(shardID, []Point{pt, pt2}); err != nil {
t.Fatal(err)
}

got := executeAndGetJSON("show series", executor)
Expand Down Expand Up @@ -190,9 +246,8 @@ func TestDropDatabase(t *testing.T) {
time.Unix(1, 2),
)

err := store.WriteToShard(shardID, []Point{pt})
if err != nil {
t.Fatalf(err.Error())
if err := store.WriteToShard(shardID, []Point{pt}); err != nil {
t.Fatal(err)
}

got := executeAndGetJSON("select * from cpu", executor)
Expand Down Expand Up @@ -233,8 +288,7 @@ func TestDropDatabase(t *testing.T) {
store.Open()
executor.store = store

err = store.WriteToShard(shardID, []Point{pt})
if err == nil || err.Error() != "shard not found" {
if err := store.WriteToShard(shardID, []Point{pt}); err == nil || err.Error() != "shard not found" {
t.Fatalf("expected shard to not be found")
}
}
Expand Down
Loading

0 comments on commit b574e2f

Please sign in to comment.