-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ import ( | |
// only occurs when the reader is at the end of all the data. | ||
const DefaultPollInterval = 100 * time.Millisecond | ||
|
||
const DefaultTruncationInterval = 10 * time.Minute | ||
|
||
// DefaultMaxTopicSize is the largest a topic can get before truncation. | ||
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB | ||
|
||
|
@@ -40,8 +42,13 @@ type Broker struct { | |
meta *bolt.DB // metadata | ||
topics map[uint64]*Topic // topics by id | ||
|
||
MaxTopicSize int64 // Maximum size of a topic in bytes | ||
MaxSegmentSize int64 // Maximum size of a segment in bytes | ||
TruncationInterval time.Duration | ||
MaxTopicSize int64 // Maximum size of a topic in bytes | ||
MaxSegmentSize int64 // Maximum size of a segment in bytes | ||
|
||
// Goroutinte shutdown | ||
done chan struct{} | ||
wg sync.WaitGroup | ||
|
||
// Log is the distributed raft log that commands are applied to. | ||
Log interface { | ||
|
@@ -59,11 +66,13 @@ type Broker struct { | |
// NewBroker returns a new instance of a Broker with default values. | ||
func NewBroker() *Broker { | ||
b := &Broker{ | ||
topics: make(map[uint64]*Topic), | ||
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), | ||
topics: make(map[uint64]*Topic), | ||
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), | ||
TruncationInterval: DefaultTruncationInterval, | ||
MaxTopicSize: DefaultMaxTopicSize, | ||
MaxSegmentSize: DefaultMaxSegmentSize, | ||
done: make(chan struct{}), | ||
} | ||
b.MaxTopicSize = DefaultMaxTopicSize | ||
b.MaxSegmentSize = DefaultMaxSegmentSize | ||
return b | ||
} | ||
|
||
|
@@ -176,6 +185,16 @@ func (b *Broker) Open(path string) error { | |
return fmt.Errorf("open topics: %s", err) | ||
} | ||
|
||
// Start topic truncation. | ||
go func() { | ||
b.wg.Add(1) | ||
tick := time.NewTicker(b.TruncationInterval) | ||
This comment has been minimized.
Sorry, something went wrong. |
||
for { | ||
<-tick.C | ||
This comment has been minimized.
Sorry, something went wrong.
jwilder
Contributor
|
||
b.TruncateTopics() | ||
} | ||
}() | ||
|
||
return nil | ||
}(); err != nil { | ||
_ = b.close() | ||
|
@@ -236,6 +255,11 @@ func (b *Broker) close() error { | |
// Close all topics. | ||
b.closeTopics() | ||
|
||
// Shutdown all goroutines. | ||
close(b.done) | ||
b.wg.Wait() | ||
b.done = nil | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -247,6 +271,19 @@ func (b *Broker) closeTopics() { | |
b.topics = make(map[uint64]*Topic) | ||
} | ||
|
||
// truncateTopics forces topics to truncate such that they are equal to | ||
// or less than the requested size, if possible. | ||
func (b *Broker) TruncateTopics() error { | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
otoolep
Author
Contributor
|
||
for _, t := range b.topics { | ||
if n, err := t.Truncate(b.MaxTopicSize); err != nil { | ||
b.Logger.Printf("error truncating topic %s: %s", t.Path(), err.Error()) | ||
} else if n > 0 { | ||
b.Logger.Printf("topic %s, %d bytes deleted", t.Path(), n) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// SetMaxIndex sets the highest index applied by the broker. | ||
// This is only used for internal log messages. Topics may have a higher index. | ||
func (b *Broker) SetMaxIndex(index uint64) error { | ||
|
@@ -829,6 +866,12 @@ func (t *Topic) WriteMessage(m *Message) error { | |
return nil | ||
} | ||
|
||
// Truncate attempts to delete topic segments such that the total size of the topic on-disk | ||
// is equal to or less-than maxSize. Returns the number of bytes deleted, and error if any. | ||
func (t *Topic) Truncate(maxSize int64) (int64, error) { | ||
return 0, nil | ||
} | ||
|
||
// Topics represents a list of topics sorted by id. | ||
type Topics []*Topic | ||
|
||
|
Needs a
defer tick.Stop()