-
Notifications
You must be signed in to change notification settings - Fork 284
/
Copy paththreadedfile.d
199 lines (170 loc) · 4.71 KB
/
threadedfile.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/**
Thread based asynchronous file I/O fallback implementation
Copyright: © 2012 RejectedSoftware e.K.
Authors: Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
*/
module vibe.core.drivers.threadedfile;
import vibe.core.log;
import vibe.core.driver;
import vibe.inet.url;
import vibe.utils.string;
import std.algorithm;
import std.conv;
import std.exception;
import std.string;
import core.stdc.errno;
version(Posix){
import core.sys.posix.fcntl;
import core.sys.posix.sys.stat;
import core.sys.posix.unistd;
}
version(Windows){
import std.c.windows.stat;
private {
extern(C){
alias long off_t;
int open(in char* name, int mode, ...);
int chmod(in char* name, int mode);
int close(int fd);
int read(int fd, void *buffer, uint count);
int write(int fd, in void *buffer, uint count);
off_t lseek(int fd, off_t offset, int whence);
}
enum O_RDONLY = 0;
enum O_WRONLY = 1;
enum O_APPEND = 8;
enum O_CREAT = 0x0100;
enum O_TRUNC = 0x0200;
enum O_BINARY = 0x8000;
enum _S_IREAD = 0x0100; /* read permission, owner */
enum _S_IWRITE = 0x0080; /* write permission, owner */
alias struct_stat stat_t;
}
}
else
{
enum O_BINARY = 0;
}
private {
enum SEEK_SET = 0;
enum SEEK_CUR = 1;
enum SEEK_END = 2;
}
class ThreadedFileStream : FileStream {
private {
int m_fileDescriptor;
Path m_path;
ulong m_size;
ulong m_ptr = 0;
FileMode m_mode;
bool m_ownFD = true;
}
this(Path path, FileMode mode)
{
auto pathstr = path.toNativeString();
final switch(mode){
case FileMode.Read:
m_fileDescriptor = open(pathstr.toStringz(), O_RDONLY|O_BINARY);
break;
case FileMode.ReadWrite:
m_fileDescriptor = open(pathstr.toStringz(), O_BINARY);
break;
case FileMode.CreateTrunc:
m_fileDescriptor = open(pathstr.toStringz(), O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, octal!644);
break;
case FileMode.Append:
m_fileDescriptor = open(pathstr.toStringz(), O_WRONLY|O_CREAT|O_APPEND|O_BINARY, octal!644);
break;
}
if( m_fileDescriptor < 0 )
//throw new Exception(format("Failed to open '%s' with %s: %d", pathstr, cast(int)mode, errno));
throw new Exception("Failed to open file '"~pathstr~"'.");
this(m_fileDescriptor, path, mode);
}
this(int fd, Path path, FileMode mode)
{
assert(fd >= 0);
m_fileDescriptor = fd;
m_path = path;
m_mode = mode;
version(linux){
// stat_t seems to be defined wrong on linux/64
m_size = .lseek(m_fileDescriptor, 0, SEEK_END);
} else {
stat_t st;
fstat(m_fileDescriptor, &st);
m_size = st.st_size;
// (at least) on windows, the created file is write protected
version(Windows){
if( mode == FileMode.CreateTrunc )
chmod(path.toNativeString().toStringz(), S_IREAD|S_IWRITE);
}
}
lseek(m_fileDescriptor, 0, SEEK_SET);
logDebug("opened file %s with %d bytes as %d", path.toNativeString(), m_size, m_fileDescriptor);
}
~this()
{
close();
}
@property int fd() { return m_fileDescriptor; }
@property Path path() const { return m_path; }
@property ulong size() const { return m_size; }
@property bool readable() const { return m_mode != FileMode.Append; }
@property bool writable() const { return m_mode != FileMode.Read; }
void takeOwnershipOfFD()
{
enforce(m_ownFD);
m_ownFD = false;
}
void seek(ulong offset)
{
enforce(.lseek(m_fileDescriptor, offset, SEEK_SET) == offset, "Failed to seek in file.");
m_ptr = offset;
}
ulong tell() { return m_ptr; }
void close()
{
if( m_fileDescriptor != -1 && m_ownFD ){
.close(m_fileDescriptor);
m_fileDescriptor = -1;
}
}
@property bool empty() const { assert(this.readable); return m_ptr >= m_size; }
@property ulong leastSize() const { assert(this.readable); return m_size - m_ptr; }
@property bool dataAvailableForRead() { return true; }
const(ubyte)[] peek()
{
return null;
}
void read(ubyte[] dst)
{
assert(this.readable);
assert(dst.length <= int.max);
enforce(dst.length <= leastSize);
enforce(.read(m_fileDescriptor, dst.ptr, cast(int)dst.length) == dst.length, "Failed to read data from disk.");
m_ptr += dst.length;
}
alias Stream.write write;
void write(in ubyte[] bytes, bool do_flush = true)
{
assert(this.writable);
assert(bytes.length <= int.max);
auto ret = .write(m_fileDescriptor, bytes.ptr, cast(int)bytes.length);
enforce(ret == bytes.length, "Failed to write data to disk."~to!string(bytes.length)~" "~to!string(errno)~" "~to!string(ret)~" "~to!string(m_fileDescriptor));
m_ptr += bytes.length;
}
void write(InputStream stream, ulong nbytes = 0, bool do_flush = true)
{
writeDefault(stream, nbytes, do_flush);
}
void flush()
{
assert(this.writable);
}
void finalize()
{
flush();
}
}