forked from sashka/atomicfile-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
atomicfile.py
82 lines (65 loc) · 2.21 KB
/
atomicfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# encoding: utf-8
import errno
import os
import tempfile
umask = os.umask(0)
os.umask(umask)
def _maketemp(name, createmode=None):
"""
Create a temporary file with the filename similar the given ``name``.
The permission bits are copied from the original file or ``createmode``.
Returns: the name of the temporary file.
"""
d, fn = os.path.split(name)
fd, tempname = tempfile.mkstemp(prefix=".%s-" % fn, dir=d)
os.close(fd)
# Temporary files are created with mode 0600, which is usually not
# what we want. If the original file already exists, just copy its mode.
# Otherwise, manually obey umask.
try:
st_mode = os.lstat(name).st_mode & 0o777
except OSError as err:
if err.errno != errno.ENOENT:
raise
st_mode = createmode
if st_mode is None:
st_mode = ~umask
st_mode &= 0o666
os.chmod(tempname, st_mode)
return tempname
class AtomicFile(object):
"""
Writeable file object that atomically writes a file.
All writes will go to a temporary file.
Call ``close()`` when you are done writing, and AtomicFile will rename
the temporary copy to the original name, making the changes visible.
If the object is destroyed without being closed, all your writes are
discarded.
"""
def __init__(self, name, mode="w+b", createmode=None):
self.__name = name # permanent name
self._tempname = _maketemp(name, createmode=createmode)
self._fp = open(self._tempname, mode)
# delegated methods
self.write = self._fp.write
self.fileno = self._fp.fileno
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
return
self.close()
def close(self):
if not self._fp.closed:
self._fp.close()
os.rename(self._tempname, self.__name)
def discard(self):
if not self._fp.closed:
try:
os.unlink(self._tempname)
except OSError:
pass
self._fp.close()
def __del__(self):
if getattr(self, "_fp", None): # constructor actually did something
self.discard()