-
Notifications
You must be signed in to change notification settings - Fork 1
/
sizewatch
executable file
·114 lines (102 loc) · 3.3 KB
/
sizewatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#! /usr/bin/env python
"watch file(s) grow in size"
import os
import stat
import subprocess
import sys
import time
import argparse
def timeout_arg(string):
"handle argument for -t: float >= 0.0"
value = float(string)
if value < 0:
raise argparse.ArgumentTypeError('timeout must be at least 0.0')
return value
def main():
"parse arguments"
parser = argparse.ArgumentParser(description = 'watch file sizes')
# If we use argparse.FileType('r') here, we have poor
# control over open failures, so ... don't.
parser.add_argument('files', metavar = 'FILES',
nargs = '+',
#type = argparse.FileType('r'),
help = 'files to watch')
parser.add_argument('-t', '--timeout', type = timeout_arg, default = 1)
args = parser.parse_args()
fdlist = []
for name in args.files:
try:
fdlist.append(os.open(name, os.O_RDONLY))
except OSError, err:
print >> sys.stderr, '%s: %s' % (name, err.strerror)
if len(fdlist) < len(args.files):
sys.exit(1)
watch(args.files, fdlist, args.timeout)
def totalsize(name, statresult):
"""
Produce the size of a file or directory.
If the argument is a directory, the size is the
result of sizing all the files in it, NOT
the size of the directory itself.
"""
if stat.S_ISDIR(statresult.st_mode):
size = 0
for fname in os.listdir(name):
path = os.path.join(name, fname)
size += totalsize(path, os.lstat(path))
return size
return statresult.st_size
def watch(names, fdlist, timeout):
"watch the listed files"
statlist = []
sizes = []
for fdesc in fdlist:
statlist.append(os.fstat(fdesc))
namesize = max(len(name) for name in names)
for (i, name) in enumerate(names):
size = totalsize(name, statlist[i])
print '%*s %s' % (namesize, name, size_string(size))
sizes.append(size)
try:
while True:
time.sleep(timeout)
sys.stdout.write('\033[A' * len(names))
for (i, name) in enumerate(names):
statresult = os.fstat(fdlist[i])
newsize = totalsize(name, statresult)
print '%*s %s %s' % (namesize, name, size_string(newsize),
delta_rate(sizes[i], newsize, timeout))
statlist[i] = statresult
sizes[i] = newsize
except KeyboardInterrupt:
# normal exit
print
return
def size_string(size):
"""return "human-readable" size string, always has leading blank"""
if size <= 1024:
return '%9d' % size
val = float(size)
for suffix in 'kMGT':
val /= 1024.0
if val <= 1024:
return '%8.3f%c' % (val, suffix)
raise ValueError("can't represent %s" % (size,))
def delta_rate(old, new, timeout):
"estimate bytes/s (or similar)"
if timeout < 0.1:
return '' # not really representable, so we do not even try
delta = new - old
if delta < 0:
prefix = '-'
delta = -delta
else:
prefix = ' '
ss = size_string(delta / timeout).split('.')
if len(ss) == 1:
# it's all in decimal, drop 5 chars from the front
ss[0] = ss[0][5:]
ss.append(' ')
return prefix + ss[0] + ss[1][-1] + 'B/s'
if __name__ == '__main__':
main()