-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhttp_name_node.py
159 lines (137 loc) · 4.22 KB
/
http_name_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from typing import Optional, List, Tuple
from urllib.request import urlopen, HTTPError, URLError
from urllib.parse import urlparse, urljoin
from util import *
class HttpNameNode:
"""
Python API for client of remote DFS cluster. It implements
same API as NameNode, but delegates actual execution to remote
name node server instance.
"""
def __init__(self, url: str):
"""
Set name node server url.
Parameters
----------
url : str
URL to data node server.
"""
if not urlparse(url).netloc:
raise CommandError(f'Invalid node url {url}')
self._url = url
def add_node(self, public_url: str, url: str, node_id: str):
data = url + ' ' + node_id
urlopen(
urljoin(self._url, '/add_node'),
data=data.encode('utf-8'),
).close()
def status(self) -> List[Tuple[str, int]]:
with urlopen(urljoin(self._url, '/status')) as resp:
return deserialize_matrix(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def mkfs(self):
urlopen(urljoin(self._url, '/mkfs')).close()
def df(self) -> List[Tuple[str, int, int, int]]:
with urlopen(urljoin(self._url, '/df')) as resp:
return deserialize_matrix(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def cd(self, path: str):
urlopen(
urljoin(self._url, '/cd'),
data=path.encode('utf-8'),
).close()
def ls(self, path: Optional[str] = None) -> list:
data = path.encode('utf-8') if path else b''
with urlopen(
urljoin(self._url, '/ls'),
data=data,
) as resp:
url = resp.read().decode('utf-8')
with urlopen(
url,
data=data,
) as resp:
return deserialize_list(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def mkdir(self, path: str):
urlopen(
urljoin(self._url, '/mkdir'),
data=path.encode('utf-8'),
).close()
def rmdir(self, path: str, force: Optional[bool] = False):
data = path + ('!' if force else '')
urlopen(
urljoin(self._url, '/rmdir'),
data=data.encode('utf-8'),
).close()
def touch(self, path: str):
urlopen(
urljoin(self._url, '/touch'),
data=path.encode('utf-8'),
).close()
def cat(self, path: str) -> bytes:
data = path.encode('utf-8')
with urlopen(
urljoin(self._url, '/cat'),
data=data,
) as resp:
url = resp.read().decode('utf-8')
with urlopen(
url,
data=data,
) as resp:
return resp.read()
def tee(self, path: str, data: bytes):
data = path.encode('utf-8') + b'\0' + data
urlopen(
urljoin(self._url, '/tee'),
data=data,
).close()
def rm(self, path: str):
urlopen(
urljoin(self._url, '/rm'),
data=path.encode('utf-8'),
).close()
def stat(self, path: str) -> tuple:
data = path.encode('utf-8')
with urlopen(
urljoin(self._url, '/stat'),
data=data,
) as resp:
url = resp.read().decode('utf-8')
with urlopen(
url,
data=data,
) as resp:
return deserialize_stat(
resp,
resp.length,
urlparse(resp.url).hostname,
)
def cp(self, src: str, dst: str):
data = src + ' ' + dst
urlopen(
urljoin(self._url, '/cp'),
data=data.encode('utf-8'),
).close()
def mv(self, src: str, dst: str):
data = src + ' ' + dst
urlopen(
urljoin(self._url, '/mv'),
data=data.encode('utf-8'),
).close()
def ping_alive(self) -> bool:
try:
urlopen(urljoin(self._url, '/ping_alive')).close()
return True
except (HTTPError, URLError):
return False