Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stat calls in Linux filesystem collector. #1772

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 88 additions & 56 deletions collector/filesystem_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}

Expand All @@ -50,72 +53,101 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
return nil, err
}
stats := []filesystemStats{}
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}
stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
wg := sync.WaitGroup{}

workerCount := *statWorkerCount
if workerCount < 1 {
workerCount = 1
}

for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for labels := range labelChan {
Sircular marked this conversation as resolved.
Show resolved Hide resolved
statChan <- c.processStat(labels)
}
}()
}

go func() {
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}

stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
continue
}

stuckMountsMtx.Unlock()
continue
labelChan <- labels
}
stuckMountsMtx.Unlock()
close(labelChan)
wg.Wait()
close(statChan)
}()

// The success channel is used do tell the "watcher" that the stat
// finished successfully. The channel is closed on success.
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)
for stat := range statChan {
stats = append(stats, stat)
}
return stats, nil
}

buf := new(unix.Statfs_t)
err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
}
stuckMountsMtx.Unlock()
func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)

if err != nil {
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
buf := new(unix.Statfs_t)
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)

level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
continue
}
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
}
stuckMountsMtx.Unlock()

var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
if err != nil {
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
return filesystemStats{
labels: labels,
deviceError: 1,
}
}

stats = append(stats, filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
})
var ro float64
for _, option := range strings.Split(labels.options, ",") {
if option == "ro" {
ro = 1
break
}
}
return filesystemStats{
labels: labels,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
files: float64(buf.Files),
filesFree: float64(buf.Ffree),
ro: ro,
}
return stats, nil
}

// stuckMountWatcher listens on the given success channel and if the channel closes
Expand Down