Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client.WriteSeries returns: Server returned (400): IO error: /opt/influxdb/shared/data/db/shard_db_v2/00190/MANIFEST-000006: No such file or directory #985

Merged
merged 1 commit into from
Oct 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,8 @@ func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32)
// now actually remove it from disk if it lives here
for _, serverId := range serverIds {
if serverId == self.LocalServer.Id {
return self.shardStore.DeleteShard(shardId)
self.shardStore.DeleteShard(shardId)
return nil
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type LocalShardStore interface {
BufferWrite(request *p.Request)
GetOrCreateShard(id uint32) (LocalShardDb, error)
ReturnShard(id uint32)
DeleteShard(shardId uint32) error
DeleteShard(shardId uint32)
}

func (self *ShardData) Id() uint32 {
Expand Down
47 changes: 35 additions & 12 deletions datastore/shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ShardDatastore struct {
lastAccess map[uint32]time.Time
shardRefCounts map[uint32]int
shardsToClose map[uint32]bool
shardsToDelete map[uint32]struct{}
shardsLock sync.RWMutex
writeBuffer *cluster.WriteBuffer
maxOpenShards int
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewShardDatastore(config *configuration.Configuration, metaStore *metastore
lastAccess: make(map[uint32]time.Time),
shardRefCounts: make(map[uint32]int),
shardsToClose: make(map[uint32]bool),
shardsToDelete: make(map[uint32]struct{}),
pointBatchSize: config.StoragePointBatchSize,
writeBatchSize: config.StorageWriteBatchSize,
metaStore: metaStore,
Expand Down Expand Up @@ -211,7 +213,16 @@ func (self *ShardDatastore) ReturnShard(id uint32) {
self.shardsLock.Lock()
defer self.shardsLock.Unlock()
self.shardRefCounts[id] -= 1
if self.shardsToClose[id] && self.shardRefCounts[id] == 0 {
if self.shardRefCounts[id] != 0 {
return
}

if _, ok := self.shardsToDelete[id]; ok {
self.deleteShard(id)
return
}

if self.shardsToClose[id] {
self.closeShard(id)
}
}
Expand All @@ -233,20 +244,20 @@ func (self *ShardDatastore) SetWriteBuffer(writeBuffer *cluster.WriteBuffer) {
self.writeBuffer = writeBuffer
}

func (self *ShardDatastore) DeleteShard(shardId uint32) error {
func (self *ShardDatastore) DeleteShard(shardId uint32) {
self.shardsLock.Lock()
shardDb := self.shards[shardId]
delete(self.shards, shardId)
delete(self.lastAccess, shardId)
self.shardsLock.Unlock()

if shardDb != nil {
shardDb.close()
defer self.shardsLock.Unlock()
// If someone has a reference to the shard we can't delete it
// now. We have to wait until it's returned and delete
// it. ReturnShard will take care of that as soon as the reference
// count becomes 0.
if self.shardRefCounts[shardId] > 0 {
self.shardsToDelete[shardId] = struct{}{}
return
}

dir := self.shardDir(shardId)
log.Info("DATASTORE: dropping shard %s", dir)
return os.RemoveAll(dir)
// otherwise, close the shard and delete it now
self.deleteShard(shardId)
}

func (self *ShardDatastore) shardDir(id uint32) string {
Expand All @@ -269,6 +280,17 @@ func (self *ShardDatastore) closeOldestShard() {
}
}

func (self *ShardDatastore) deleteShard(id uint32) {
self.closeShard(id)
dir := self.shardDir(id)
log.Info("DATASTORE: dropping shard %s", dir)
if err := os.RemoveAll(dir); err != nil {
// TODO: we should do some cleanup to make sure any shards left
// behind are deleted properly
log.Error("Cannot delete %s: %s", dir, err)
}
}

func (self *ShardDatastore) closeShard(id uint32) {
shard := self.shards[id]
if shard != nil {
Expand All @@ -278,6 +300,7 @@ func (self *ShardDatastore) closeShard(id uint32) {
delete(self.shards, id)
delete(self.lastAccess, id)
delete(self.shardsToClose, id)
delete(self.shardsToDelete, id)
log.Debug("DATASTORE: closing shard %s", self.shardDir(id))
}

Expand Down
27 changes: 27 additions & 0 deletions integration/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ func (self *DataTestSuite) TestInfiniteValues(c *C) {
c.Assert(maps[0]["derivative"], IsNil)
}

// This test tries to write a large batch of points to a shard that is
// supposed to be dropped. This will demonstrate issue #985: while the
// data is being written, InfluxDB will close the underlying storage
// engine which will cause random errors to be thrown and could
// possibly corrupt the db.
func (self *DataTestSuite) TestWritingToExpiredShards(c *C) {
client := self.server.GetClient(self.dbname, c)
err := client.CreateShardSpace(self.dbname, &influxdb.ShardSpace{
Name: "default",
Regex: ".*",
RetentionPolicy: "7d",
ShardDuration: "1y",
})
c.Assert(err, IsNil)

data := CreatePoints("test_using_deleted_shard", 1, 1000000)
data[0].Columns = append(data[0].Columns, "time")
for i, _ := range data[0].Points {
data[0].Points[i] = append(data[0].Points[i], 0)
}
// This test will fail randomly without the fix submitted in the
// same commit. 10 times is sufficient to trigger the bug.
for i := 0; i < 10; i++ {
self.client.WriteData(data, c, influxdb.Second)
}
}

// test large integer values
func (self *DataTestSuite) TestLargeIntegerValues(c *C) {
// make sure we exceed the pointBatchSize, so we force a yield to
Expand Down