From b574e2f755fa0e83bea3e9dbad4a4b2c080b6a08 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 18 Jun 2015 09:07:51 -0600 Subject: [PATCH] Add write ahead log This commit adds a write ahead log to the shard. Entries are cached in memory and periodically flushed back into the index. The WAL and the cache are both partitioned into buckets so that flushing doesn't stop the world as long. --- CHANGELOG.md | 1 + cmd/influxd/run/server.go | 4 + tsdb/config.go | 11 + tsdb/query_executor_test.go | 88 +++++-- tsdb/shard.go | 512 ++++++++++++++++++++++++++++++++---- tsdb/shard_test.go | 76 ++++++ tsdb/store.go | 34 ++- tsdb/tx.go | 31 ++- 8 files changed, 681 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 329fe8e4470..e4715b439f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [2650](https://github.com/influxdb/influxdb/pull/2650): Add SHOW GRANTS FOR USER statement. Thanks @n1tr0g. - [3125](https://github.com/influxdb/influxdb/pull/3125): Graphite Input Protocol Parsing - [2746](https://github.com/influxdb/influxdb/pull/2746): New Admin UI/interface +- [3036](https://github.com/influxdb/influxdb/pull/3036): Write Ahead Log (WAL) ### Bugfixes diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 421439d8b2c..ea0e967ee86 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -80,6 +80,10 @@ func NewServer(c *Config, version string) (*Server, error) { reportingDisabled: c.ReportingDisabled, } + // Copy TSDB configuration. + s.TSDBStore.MaxWALSize = c.Data.MaxWALSize + s.TSDBStore.WALFlushInterval = time.Duration(c.Data.WALFlushInterval) + // Initialize query executor. s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore) s.QueryExecutor.MetaStore = s.MetaStore diff --git a/tsdb/config.go b/tsdb/config.go index 8648698c38a..11185db431b 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -19,10 +19,19 @@ const ( // DefaultRetentionCheckPeriod is the period of time between retention policy checks are run DefaultRetentionCheckPeriod = 10 * time.Minute + + // DefaultMaxWALSize is the default size of the WAL before it is flushed. + DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB + + // DefaultWALFlushInterval is the frequency the WAL will get flushed if + // it doesn't reach its size threshold. + DefaultWALFlushInterval = 10 * time.Minute ) type Config struct { Dir string `toml:"dir"` + MaxWALSize int `toml:"max-wal-size"` + WALFlushInterval toml.Duration `toml:"wal-flush-interval"` RetentionAutoCreate bool `toml:"retention-auto-create"` RetentionCheckEnabled bool `toml:"retention-check-enabled"` RetentionCheckPeriod toml.Duration `toml:"retention-check-period"` @@ -31,6 +40,8 @@ type Config struct { func NewConfig() Config { return Config{ + MaxWALSize: DefaultMaxWALSize, + WALFlushInterval: toml.Duration(DefaultWALFlushInterval), RetentionAutoCreate: DefaultRetentionAutoCreate, RetentionCheckEnabled: DefaultRetentionCheckEnabled, RetentionCheckPeriod: toml.Duration(DefaultRetentionCheckPeriod), diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index f908fa27d84..76e1f929f37 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -18,23 +18,81 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { store, executor := testStoreAndExecutor() defer os.RemoveAll(store.path) - pt := NewPoint( + // Write first point. + if err := store.WriteToShard(shardID, []Point{NewPoint( "cpu", map[string]string{"host": "server"}, map[string]interface{}{"value": 1.0}, time.Unix(1, 2), - ) + )}); err != nil { + t.Fatalf(err.Error()) + } - err := store.WriteToShard(shardID, []Point{pt}) - if err != nil { + // Write second point. + if err := store.WriteToShard(shardID, []Point{NewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(2, 3), + )}); err != nil { t.Fatalf(err.Error()) } - pt.SetTime(time.Unix(2, 3)) - err = store.WriteToShard(shardID, []Point{pt}) - if err != nil { + got := executeAndGetJSON("select * from cpu", executor) + exepected := `[{"series":[{"name":"cpu","tags":{"host":"server"},"columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } + + store.Close() + store = NewStore(store.path) + if err := store.Open(); err != nil { t.Fatalf(err.Error()) } + executor.store = store + + got = executeAndGetJSON("select * from cpu", executor) + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } +} + +// Ensure that points can be written and flushed even after a restart. +func TestWritePointsAndExecuteQuery_FlushRestart(t *testing.T) { + store, executor := testStoreAndExecutor() + defer os.RemoveAll(store.path) + + // Write first point. + if err := store.WriteToShard(shardID, []Point{NewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + )}); err != nil { + t.Fatalf(err.Error()) + } + + // Write second point. + if err := store.WriteToShard(shardID, []Point{NewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(2, 3), + )}); err != nil { + t.Fatalf(err.Error()) + } + + // Restart the store. + if err := store.Close(); err != nil { + t.Fatal(err) + } else if err = store.Open(); err != nil { + t.Fatal(err) + } + + // Flush WAL data to the index. + if err := store.Flush(); err != nil { + t.Fatal(err) + } got := executeAndGetJSON("select * from cpu", executor) exepected := `[{"series":[{"name":"cpu","tags":{"host":"server"},"columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` @@ -44,8 +102,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { store.Close() store = NewStore(store.path) - err = store.Open() - if err != nil { + if err := store.Open(); err != nil { t.Fatalf(err.Error()) } executor.store = store @@ -127,9 +184,8 @@ func TestDropMeasurementStatement(t *testing.T) { time.Unix(1, 2), ) - err := store.WriteToShard(shardID, []Point{pt, pt2}) - if err != nil { - t.Fatalf(err.Error()) + if err := store.WriteToShard(shardID, []Point{pt, pt2}); err != nil { + t.Fatal(err) } got := executeAndGetJSON("show series", executor) @@ -190,9 +246,8 @@ func TestDropDatabase(t *testing.T) { time.Unix(1, 2), ) - err := store.WriteToShard(shardID, []Point{pt}) - if err != nil { - t.Fatalf(err.Error()) + if err := store.WriteToShard(shardID, []Point{pt}); err != nil { + t.Fatal(err) } got := executeAndGetJSON("select * from cpu", executor) @@ -233,8 +288,7 @@ func TestDropDatabase(t *testing.T) { store.Open() executor.store = store - err = store.WriteToShard(shardID, []Point{pt}) - if err == nil || err.Error() != "shard not found" { + if err := store.WriteToShard(shardID, []Point{pt}); err == nil || err.Error() != "shard not found" { t.Fatalf("expected shard to not be found") } } diff --git a/tsdb/shard.go b/tsdb/shard.go index 6b7870dcdc1..e9c1625ecfc 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1,11 +1,17 @@ package tsdb import ( + "bytes" "encoding/binary" "encoding/json" "errors" "fmt" + "hash/fnv" + "io" + "log" "math" + "os" + "sort" "sync" "time" @@ -16,25 +22,81 @@ import ( "github.com/gogo/protobuf/proto" ) -// Shard represents a self-contained time series database. An inverted index of the measurement and tag data is -// kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB -// is responsible for combining the output of many shards into a single query result. +var ( + // ErrFieldOverflow is returned when too many fields are created on a measurement. + ErrFieldOverflow = errors.New("field overflow") + + // ErrFieldTypeConflict is returned when a new field already exists with a different type. + ErrFieldTypeConflict = errors.New("field type conflict") + + // ErrFieldNotFound is returned when a field cannot be found. + ErrFieldNotFound = errors.New("field not found") + + // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID + // there is no mapping for. + ErrFieldUnmappedID = errors.New("field ID not mapped") + + // ErrWALPartitionNotFound is returns when flushing a WAL partition that + // does not exist. + ErrWALPartitionNotFound = errors.New("wal partition not found") +) + +// topLevelBucketN is the number of non-series buckets in the bolt db. +const topLevelBucketN = 3 + +// Shard represents a self-contained time series database. An inverted index of +// the measurement and tag data is kept along with the raw time series data. +// Data can be split across many shards. The query engine in TSDB is responsible +// for combining the output of many shards into a single query result. type Shard struct { db *bolt.DB // underlying data store index *DatabaseIndex path string + cache map[uint8]map[string][][]byte // values by + + walSize int // approximate size of the WAL, in bytes + flush chan struct{} // signals background flush + flushTimer *time.Timer // signals time-based flush mu sync.RWMutex measurementFields map[string]*measurementFields // measurement name to their fields + + // These coordinate closing and waiting for running goroutines. + wg sync.WaitGroup + closing chan struct{} + + // Used for out-of-band error messages. + logger *log.Logger + + // The maximum size and time thresholds for flushing the WAL. + MaxWALSize int + WALFlushInterval time.Duration + + // The writer used by the logger. + LogOutput io.Writer } // NewShard returns a new initialized Shard func NewShard(index *DatabaseIndex, path string) *Shard { - return &Shard{ + s := &Shard{ index: index, path: path, + flush: make(chan struct{}, 1), measurementFields: make(map[string]*measurementFields), + + MaxWALSize: DefaultMaxWALSize, + WALFlushInterval: DefaultWALFlushInterval, + + LogOutput: os.Stderr, } + + // Initialize all partitions of the cache. + s.cache = make(map[uint8]map[string][][]byte) + for i := uint8(0); i < WALPartitionN; i++ { + s.cache[i] = make(map[string][][]byte) + } + + return s } // Path returns the path set on the shard when it was created. @@ -42,42 +104,81 @@ func (s *Shard) Path() string { return s.path } // open initializes and opens the shard's store. func (s *Shard) Open() error { - s.mu.Lock() - defer s.mu.Unlock() + if err := func() error { + s.mu.Lock() + defer s.mu.Unlock() - // Return if the shard is already open - if s.db != nil { - return nil - } + // Return if the shard is already open + if s.db != nil { + return nil + } - // Open store on shard. - store, err := bolt.Open(s.path, 0666, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - return err - } - s.db = store + // Open store on shard. + store, err := bolt.Open(s.path, 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return err + } + s.db = store - // Initialize store. - if err := s.db.Update(func(tx *bolt.Tx) error { - _, _ = tx.CreateBucketIfNotExists([]byte("series")) - _, _ = tx.CreateBucketIfNotExists([]byte("fields")) + // Initialize store. + if err := s.db.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte("series")) + _, _ = tx.CreateBucketIfNotExists([]byte("fields")) + _, _ = tx.CreateBucketIfNotExists([]byte("wal")) + + return nil + }); err != nil { + return fmt.Errorf("init: %s", err) + } + + if err := s.loadMetadataIndex(); err != nil { + return fmt.Errorf("load metadata index: %s", err) + } + + // Initialize logger. + s.logger = log.New(s.LogOutput, "[shard] ", log.LstdFlags) + + // Start flush interval timer. + s.flushTimer = time.NewTimer(s.WALFlushInterval) + + // Start background goroutines. + s.wg.Add(1) + s.closing = make(chan struct{}) + go s.autoflusher(s.closing) return nil - }); err != nil { - _ = s.Close() - return fmt.Errorf("init: %s", err) + }(); err != nil { + s.close() + return err + } + + // Flush on-disk WAL before we return to the caller. + if err := s.Flush(); err != nil { + return fmt.Errorf("flush: %s", err) } - return s.loadMetadataIndex() + return nil } -// close shuts down the shard's store. +// Close shuts down the shard's store. func (s *Shard) Close() error { s.mu.Lock() - defer s.mu.Unlock() + err := s.close() + s.mu.Unlock() + + // Wait for open goroutines to finish. + s.wg.Wait() + return err +} + +func (s *Shard) close() error { if s.db != nil { - _ = s.db.Close() + s.db.Close() + } + if s.closing != nil { + close(s.closing) + s.closing = nil } return nil } @@ -178,26 +279,199 @@ func (s *Shard) WritePoints(points []Point) error { } } - // save the raw point data + // Write points to WAL bucket. + wal := tx.Bucket([]byte("wal")) for _, p := range points { - bp, err := tx.CreateBucketIfNotExists(p.Key()) + // Retrieve partition bucket. + key := p.Key() + b, err := wal.CreateBucketIfNotExists([]byte{WALPartition(key)}) if err != nil { - return err + return fmt.Errorf("create WAL partition bucket: %s", err) } - if err := bp.Put(u64tob(uint64(p.UnixNano())), p.Data()); err != nil { - return err + + // Generate an autoincrementing index for the WAL partition. + id, _ := b.NextSequence() + + // Append points sequentially to the WAL bucket. + v := marshalWALEntry(key, p.UnixNano(), p.Data()) + if err := b.Put(u64tob(id), v); err != nil { + return fmt.Errorf("put wal: %s", err) + } + } + + return nil + }); err != nil { + return err + } + + // If successful then save points to in-memory cache. + if err := func() error { + s.mu.Lock() + defer s.mu.Unlock() + + for _, p := range points { + // Generate in-memory cache entry of . + key, data := p.Key(), p.Data() + v := make([]byte, 8+len(data)) + binary.BigEndian.PutUint64(v[0:8], uint64(p.UnixNano())) + copy(v[8:], data) + + // Determine if we are appending. + partitionID := WALPartition(key) + a := s.cache[partitionID][string(key)] + appending := (len(a) == 0 || bytes.Compare(a[len(a)-1], v) == -1) + + // Append to cache list. + a = append(a, v) + + // Sort by timestamp if not appending. + if !appending { + sort.Sort(byteSlices(a)) + } + + s.cache[partitionID][string(key)] = a + + // Calculate estimated WAL size. + s.walSize += len(key) + len(v) + } + + // Check for flush threshold. + s.triggerAutoFlush() + + return nil + }(); err != nil { + return err + } + + return nil +} + +// Flush writes all points from the write ahead log to the index. +func (s *Shard) Flush() error { + // Retrieve a list of WAL buckets. + var partitionIDs []uint8 + if err := s.db.View(func(tx *bolt.Tx) error { + return tx.Bucket([]byte("wal")).ForEach(func(key, _ []byte) error { + partitionIDs = append(partitionIDs, uint8(key[0])) + return nil + }) + }); err != nil { + return err + } + + // Continue flushing until there are no more partition buckets. + for _, partitionID := range partitionIDs { + if err := s.FlushPartition(partitionID); err != nil { + return fmt.Errorf("flush partition: id=%d, err=%s", partitionID, err) + } + + // Wait momentarily so other threads can process. + time.Sleep(100 * time.Millisecond) + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Reset WAL size. + s.walSize = 0 + + // Reset the timer. + s.flushTimer.Reset(s.WALFlushInterval) + + return nil +} + +// FlushPartition flushes a single WAL partition. +func (s *Shard) FlushPartition(partitionID uint8) error { + s.mu.Lock() + defer s.mu.Unlock() + + startTime := time.Now() + + var pointN int + if err := s.db.Update(func(tx *bolt.Tx) error { + // Retrieve partition bucket. Exit if it doesn't exist. + pb := tx.Bucket([]byte("wal")).Bucket([]byte{byte(partitionID)}) + if pb == nil { + return ErrWALPartitionNotFound + } + + // Iterate over keys in the WAL partition bucket. + c := pb.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + key, timestamp, data := unmarshalWALEntry(v) + + // Create bucket for entry. + b, err := tx.CreateBucketIfNotExists(key) + if err != nil { + return fmt.Errorf("create bucket: %s", err) + } + + // Write point to bucket. + if err := b.Put(u64tob(uint64(timestamp)), data); err != nil { + return fmt.Errorf("put: %s", err) } + + // Remove entry in the WAL. + if err := c.Delete(); err != nil { + return fmt.Errorf("delete: %s", err) + } + + pointN++ } return nil }); err != nil { - _ = s.Close() return err } + // Reset cache. + s.cache[partitionID] = make(map[string][][]byte) + + if pointN > 0 { + s.logger.Printf("flush %d points in %.3fs", pointN, time.Since(startTime).Seconds()) + } + return nil } +// autoflusher waits for notification of a flush and kicks it off in the background. +// This method runs in a separate goroutine. +func (s *Shard) autoflusher(closing chan struct{}) { + defer s.wg.Done() + + for { + // Wait for close or flush signal. + select { + case <-closing: + return + case <-s.flushTimer.C: + if err := s.Flush(); err != nil { + s.logger.Printf("flush error: %s", err) + } + case <-s.flush: + if err := s.Flush(); err != nil { + s.logger.Printf("flush error: %s", err) + } + } + } +} + +// triggerAutoFlush signals that a flush should occur if the size is above the threshold. +// This function must be called within the context of a lock. +func (s *Shard) triggerAutoFlush() { + // Ignore if we haven't reached the threshold. + if s.walSize < s.MaxWALSize { + return + } + + // Otherwise send a non-blocking signal. + select { + case s.flush <- struct{}{}: + default: + } +} + func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error { s.mu.RLock() defer s.mu.RUnlock() @@ -252,19 +526,22 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt // deleteSeries deletes the buckets and the metadata for the given series keys func (s *Shard) deleteSeries(keys []string) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte("series")) for _, k := range keys { if err := b.Delete([]byte(k)); err != nil { return err } - if err := tx.DeleteBucket([]byte(k)); err != nil { + if err := tx.DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound { return err } + delete(s.cache[WALPartition([]byte(k))], k) } return nil }); err != nil { - _ = s.Close() return err } @@ -273,6 +550,9 @@ func (s *Shard) deleteSeries(keys []string) error { // deleteMeasurement deletes the measurement field encoding information and all underlying series from the shard func (s *Shard) deleteMeasurement(name string, seriesKeys []string) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.db.Update(func(tx *bolt.Tx) error { bm := tx.Bucket([]byte("fields")) if err := bm.Delete([]byte(name)); err != nil { @@ -283,20 +563,18 @@ func (s *Shard) deleteMeasurement(name string, seriesKeys []string) error { if err := b.Delete([]byte(k)); err != nil { return err } - if err := tx.DeleteBucket([]byte(k)); err != nil { + if err := tx.DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound { return err } + delete(s.cache[WALPartition([]byte(k))], k) } return nil }); err != nil { - _ = s.Close() return err } // Remove entry from shard index. - s.mu.Lock() - defer s.mu.Unlock() delete(s.measurementFields, name) return nil } @@ -422,6 +700,22 @@ func (s *Shard) loadMetadataIndex() error { }) } +// SeriesCount returns the number of series buckets on the shard. +// This does not include a count from the WAL. +func (s *Shard) SeriesCount() (n int, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(_ []byte, _ *bolt.Bucket) error { + n++ + return nil + }) + }) + + // Remove top-level buckets. + n -= topLevelBucketN + + return +} + type measurementFields struct { Fields map[string]*field `json:"fields"` codec *FieldCodec @@ -813,17 +1107,135 @@ func u64tob(v uint64) []byte { return b } -var ( - // ErrFieldOverflow is returned when too many fields are created on a measurement. - ErrFieldOverflow = errors.New("field overflow") +// marshalWALEntry encodes point data into a single byte slice. +// +// The format of the byte slice is: +// +// uint64 timestamp +// uint32 key length +// []byte key +// []byte data +// +func marshalWALEntry(key []byte, timestamp int64, data []byte) []byte { + v := make([]byte, 8+4, 8+4+len(key)+len(data)) + binary.BigEndian.PutUint64(v[0:8], uint64(timestamp)) + binary.BigEndian.PutUint32(v[8:12], uint32(len(key))) + v = append(v, key...) + v = append(v, data...) + return v +} - // ErrFieldTypeConflict is returned when a new field already exists with a different type. - ErrFieldTypeConflict = errors.New("field type conflict") +// unmarshalWALEntry decodes a WAL entry into it's separate parts. +// Returned byte slices point to the original slice. +func unmarshalWALEntry(v []byte) (key []byte, timestamp int64, data []byte) { + keyLen := binary.BigEndian.Uint32(v[8:12]) + key = v[12 : 12+keyLen] + timestamp = int64(binary.BigEndian.Uint64(v[0:8])) + data = v[12+keyLen:] + return +} - // ErrFieldNotFound is returned when a field cannot be found. - ErrFieldNotFound = errors.New("field not found") +// marshalCacheEntry encodes the timestamp and data to a single byte slice. +// +// The format of the byte slice is: +// +// uint64 timestamp +// []byte data +// +func marshalCacheEntry(timestamp int64, data []byte) []byte { + buf := make([]byte, 8, 8+len(data)) + binary.BigEndian.PutUint64(buf[0:8], uint64(timestamp)) + return append(buf, data...) +} - // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID - // there is no mapping for. - ErrFieldUnmappedID = errors.New("field ID not mapped") -) +// unmarshalCacheEntry returns the timestamp and data from an encoded byte slice. +func unmarshalCacheEntry(buf []byte) (timestamp int64, data []byte) { + timestamp = int64(binary.BigEndian.Uint64(buf[0:8])) + data = buf[8:] + return +} + +// byteSlices represents a sortable slice of byte slices. +type byteSlices [][]byte + +func (a byteSlices) Len() int { return len(a) } +func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } +func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// shardCursor provides ordered iteration across a Bolt bucket and shard cache. +type shardCursor struct { + // Bolt cursor and readahead buffer. + cursor *bolt.Cursor + buf struct { + key, value []byte + } + + // Cache and current cache index. + cache [][]byte + index int +} + +// Seek moves the cursor to a position and returns the closest key/value pair. +func (sc *shardCursor) Seek(seek []byte) (key, value []byte) { + // Seek bolt cursor. + if sc.cursor != nil { + sc.buf.key, sc.buf.value = sc.cursor.Seek(seek) + } + + // Seek cache index. + sc.index = sort.Search(len(sc.cache), func(i int) bool { + return bytes.Compare(sc.cache[i][0:8], seek) != -1 + }) + + return sc.read() +} + +// Next returns the next key/value pair from the cursor. +func (sc *shardCursor) Next() (key, value []byte) { + // Read next bolt key/value if not bufferred. + if sc.buf.key == nil && sc.cursor != nil { + sc.buf.key, sc.buf.value = sc.cursor.Next() + } + + return sc.read() +} + +// read returns the next key/value in the cursor buffer or cache. +func (sc *shardCursor) read() (key, value []byte) { + // If neither a buffer or cache exists then return nil. + if sc.buf.key == nil && sc.index >= len(sc.cache) { + return nil, nil + } + + // Use the buffer if it exists and there's no cache or if it is lower than the cache. + if sc.buf.key != nil && (sc.index >= len(sc.cache) || bytes.Compare(sc.buf.key, sc.cache[sc.index][0:8]) == -1) { + key, value = sc.buf.key, sc.buf.value + sc.buf.key, sc.buf.value = nil, nil + return + } + + // Otherwise read from the cache. + // Continue skipping ahead through duplicate keys in the cache list. + for { + // Read the current cache key/value pair. + key, value = sc.cache[sc.index][0:8], sc.cache[sc.index][8:] + sc.index++ + + // Exit loop if we're at the end of the cache or the next key is different. + if sc.index >= len(sc.cache) || !bytes.Equal(key, sc.cache[sc.index][0:8]) { + break + } + } + + return +} + +// WALPartitionN is the number of partitions in the write ahead log. +const WALPartitionN = 8 + +// WALPartition returns the partition number that key belongs to. +func WALPartition(key []byte) uint8 { + h := fnv.New64a() + h.Write(key) + return uint8(h.Sum64() % WALPartitionN) +} diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9d750c9768d..3455204b724 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1,9 +1,11 @@ package tsdb import ( + "fmt" "io/ioutil" "os" "path" + "path/filepath" "reflect" "testing" "time" @@ -131,6 +133,80 @@ func TestShardWriteAddNewField(t *testing.T) { } +// Ensure the shard will automatically flush the WAL after a threshold has been reached. +func TestShard_Autoflush(t *testing.T) { + path, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(path) + + // Open shard with a really low size threshold, high flush interval. + sh := NewShard(NewDatabaseIndex(), filepath.Join(path, "shard")) + sh.MaxWALSize = 1024 // 1KB + sh.WALFlushInterval = 1 * time.Hour + if err := sh.Open(); err != nil { + t.Fatal(err) + } + defer sh.Close() + + // Write a bunch of points. + for i := 0; i < 100; i++ { + if err := sh.WritePoints([]Point{NewPoint( + fmt.Sprintf("cpu%d", i), + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + )}); err != nil { + t.Fatal(err) + } + } + + // Wait for autoflush. + time.Sleep(1 * time.Second) + + // Make sure we have series buckets created outside the WAL. + if n, err := sh.SeriesCount(); err != nil { + t.Fatal(err) + } else if n < 10 { + t.Fatalf("not enough series, expected at least 10, got %d", n) + } +} + +// Ensure the shard will automatically flush the WAL after a threshold has been reached. +func TestShard_Autoflush_FlushInterval(t *testing.T) { + path, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(path) + + // Open shard with a high size threshold, small time threshold. + sh := NewShard(NewDatabaseIndex(), filepath.Join(path, "shard")) + sh.MaxWALSize = 10 * 1024 * 1024 // 10MB + sh.WALFlushInterval = 100 * time.Millisecond + if err := sh.Open(); err != nil { + t.Fatal(err) + } + defer sh.Close() + + // Write some points. + for i := 0; i < 100; i++ { + if err := sh.WritePoints([]Point{NewPoint( + fmt.Sprintf("cpu%d", i), + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + )}); err != nil { + t.Fatal(err) + } + } + + // Wait for time-based flush. + time.Sleep(1 * time.Second) + + // Make sure we have series buckets created outside the WAL. + if n, err := sh.SeriesCount(); err != nil { + t.Fatal(err) + } else if n < 10 { + t.Fatalf("not enough series, expected at least 10, got %d", n) + } +} + func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) } func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) } func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) } diff --git a/tsdb/store.go b/tsdb/store.go index 17a45444df3..c1aed6e1bc3 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -9,14 +9,17 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdb/influxdb/influxql" ) func NewStore(path string) *Store { return &Store{ - path: path, - Logger: log.New(os.Stderr, "[store] ", log.LstdFlags), + path: path, + MaxWALSize: DefaultMaxWALSize, + WALFlushInterval: DefaultWALFlushInterval, + Logger: log.New(os.Stderr, "[store] ", log.LstdFlags), } } @@ -31,6 +34,9 @@ type Store struct { databaseIndexes map[string]*DatabaseIndex shards map[uint64]*Shard + MaxWALSize int + WALFlushInterval time.Duration + Logger *log.Logger } @@ -59,7 +65,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er } shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(db, shardPath) + shard := s.newShard(db, shardPath) if err := shard.Open(); err != nil { return err } @@ -93,6 +99,14 @@ func (s *Store) DeleteShard(shardID uint64) error { return nil } +// newShard returns a shard and copies configuration settings from the store. +func (s *Store) newShard(index *DatabaseIndex, path string) *Shard { + sh := NewShard(index, path) + sh.MaxWALSize = s.MaxWALSize + sh.WALFlushInterval = s.WALFlushInterval + return sh +} + // DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error { s.mu.Lock() @@ -219,7 +233,7 @@ func (s *Store) loadShards() error { continue } - shard := NewShard(s.databaseIndexes[db], path) + shard := s.newShard(s.databaseIndexes[db], path) shard.Open() s.shards[shardID] = shard } @@ -264,6 +278,18 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error { return sh.WritePoints(points) } +// Flush forces all shards to write their WAL data to the index. +func (s *Store) Flush() error { + s.mu.RLock() + defer s.mu.RUnlock() + for shardID, sh := range s.shards { + if err := sh.Flush(); err != nil { + return fmt.Errorf("flush: shard=%d, err=%s", shardID, err) + } + } + return nil +} + func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() diff --git a/tsdb/tx.go b/tsdb/tx.go index 3c855b502d9..0f2e1a28a44 100644 --- a/tsdb/tx.go +++ b/tsdb/tx.go @@ -180,6 +180,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri mapper = &LocalMapper{ seriesKeys: t.SeriesKeys, + shard: shard, db: shard.DB(), job: job, decoder: codec, @@ -215,8 +216,9 @@ type LocalMapper struct { cursorsEmpty bool // boolean that lets us know if the cursors are empty decoder *FieldCodec // decoder for the raw data bytes filters []influxql.Expr // filters for each series - cursors []*bolt.Cursor // bolt cursors for each series id + cursors []*shardCursor // bolt cursors for each series id seriesKeys []string // seriesKeys to be read from this shard + shard *Shard // original shard db *bolt.DB // bolt store for the shard accessed by this mapper txn *bolt.Tx // read transactions by shard id job *influxql.MapReduceJob // the MRJob this mapper belongs to @@ -240,6 +242,10 @@ type LocalMapper struct { // Open opens the LocalMapper. func (l *LocalMapper) Open() error { + // Obtain shard lock to copy in-cache points. + l.shard.mu.Lock() + defer l.shard.mu.Unlock() + // Open the data store txn, err := l.db.Begin(false) if err != nil { @@ -248,15 +254,28 @@ func (l *LocalMapper) Open() error { l.txn = txn // create a bolt cursor for each unique series id - l.cursors = make([]*bolt.Cursor, len(l.seriesKeys)) + l.cursors = make([]*shardCursor, len(l.seriesKeys)) for i, key := range l.seriesKeys { + // Retrieve key bucket. b := l.txn.Bucket([]byte(key)) - if b == nil { + + // Ignore if there is no bucket or points in the cache. + partitionID := WALPartition([]byte(key)) + if b == nil && len(l.shard.cache[partitionID][key]) == 0 { continue } - l.cursors[i] = b.Cursor() + // Retrieve a copy of the in-cache points for the key. + cache := make([][]byte, len(l.shard.cache[partitionID][key])) + copy(cache, l.shard.cache[partitionID][key]) + + // Build a cursor that merges the bucket and cache together. + cur := &shardCursor{cache: cache} + if b != nil { + cur.cursor = b.Cursor() + } + l.cursors[i] = cur } return nil @@ -264,7 +283,9 @@ func (l *LocalMapper) Open() error { // Close closes the LocalMapper. func (l *LocalMapper) Close() { - _ = l.txn.Rollback() + if l.txn != nil { + _ = l.txn.Rollback() + } } // Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time