-
Notifications
You must be signed in to change notification settings - Fork 101
/
Copy pathcontainers.py
551 lines (451 loc) · 19.5 KB
/
containers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
"""Model and Manager for Container resources."""
import json
import logging
import shlex
from contextlib import suppress
from typing import Any, Optional, Union
from collections.abc import Iterable, Iterator, Mapping
import requests
from podman import api
from podman.api.output_utils import demux_output
from podman.domain.images import Image
from podman.domain.images_manager import ImagesManager
from podman.domain.manager import PodmanResource
from podman.errors import APIError
logger = logging.getLogger("podman.containers")
class Container(PodmanResource):
"""Details and configuration for a container managed by the Podman service."""
@property
def name(self):
"""str: Returns container's name."""
with suppress(KeyError):
if 'Name' in self.attrs:
return self.attrs["Name"].lstrip("/")
return self.attrs["Names"][0].lstrip("/")
return None
@property
def image(self):
"""podman.domain.images.Image: Returns Image object used to create Container."""
if "Image" in self.attrs:
image_id = self.attrs["Image"]
return ImagesManager(client=self.client).get(image_id)
return Image()
@property
def labels(self):
"""dict[str, str]: Returns labels associated with container."""
labels = None
with suppress(KeyError):
# Container created from ``list()`` operation
if "Labels" in self.attrs:
labels = self.attrs["Labels"]
# Container created from ``get()`` operation
else:
labels = self.attrs["Config"].get("Labels", {})
return labels or {}
@property
def status(self):
"""Literal["created", "initialized", "running", "stopped", "exited", "unknown"]:
Returns status of container."""
with suppress(KeyError):
return self.attrs["State"]["Status"]
return "unknown"
@property
def ports(self):
"""dict[str, int]: Return ports exposed by container."""
with suppress(KeyError):
return self.attrs["NetworkSettings"]["Ports"]
return {}
def attach(self, **kwargs) -> Union[str, Iterator[str]]:
"""Attach to container's tty.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return iterator of string(s) vs single string. Default: False
logs (bool): Include previous container output. Default: False
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def attach_socket(self, **kwargs):
"""Not Implemented.
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def commit(self, repository: str = None, tag: str = None, **kwargs) -> Image:
"""Save container to given repository.
Args:
repository: Where to save Image
tag: Tag to push with Image
Keyword Args:
author (str): Name of commit author
changes (list[str]): Instructions to apply during commit
comment (str): Commit message to include with Image, overrides keyword message
conf (dict[str, Any]): Ignored.
format (str): Format of the image manifest and metadata
message (str): Commit message to include with Image
pause (bool): Pause the container before committing it
"""
params = {
"author": kwargs.get("author"),
"changes": kwargs.get("changes"),
"comment": kwargs.get("comment", kwargs.get("message")),
"container": self.id,
"format": kwargs.get("format"),
"pause": kwargs.get("pause"),
"repo": repository,
"tag": tag,
}
response = self.client.post("/commit", params=params)
response.raise_for_status()
body = response.json()
return ImagesManager(client=self.client).get(body["Id"])
def diff(self) -> list[dict[str, int]]:
"""Report changes of a container's filesystem.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/changes")
response.raise_for_status()
return response.json()
# pylint: disable=too-many-arguments
def exec_run(
self,
cmd: Union[str, list[str]],
*,
stdout: bool = True,
stderr: bool = True,
stdin: bool = False,
tty: bool = False,
privileged: bool = False,
user=None,
detach: bool = False,
stream: bool = False,
socket: bool = False, # pylint: disable=unused-argument
environment: Union[Mapping[str, str], list[str]] = None,
workdir: str = None,
demux: bool = False,
) -> tuple[
Optional[int],
Union[Iterator[Union[bytes, tuple[bytes, bytes]]], Any, tuple[bytes, bytes]],
]:
"""Run given command inside container and return results.
Args:
cmd: Command to be executed
stdout: Attach to stdout. Default: True
stderr: Attach to stderr. Default: True
stdin: Attach to stdin. Default: False
tty: Allocate a pseudo-TTY. Default: False
privileged: Run as privileged.
user: User to execute command as.
detach: If true, detach from the exec command.
Default: False
stream: Stream response data. Ignored if ``detach`` is ``True``. Default: False
socket: Return the connection socket to allow custom
read/write operations. Default: False
environment: A dictionary or a list[str] in
the following format ["PASSWORD=xxx"] or
{"PASSWORD": "xxx"}.
workdir: Path to working directory for this exec session
demux: Return stdout and stderr separately
Returns:
A tuple of (``response_code``, ``output``).
``response_code``:
The exit code of the provided command. ``None`` if ``stream``.
``output``:
If ``stream``, then a generator yielding response chunks.
If ``demux``, then a tuple of (``stdout``, ``stderr``).
Else the response content.
Raises:
NotImplementedError: method not implemented.
APIError: when service reports error
"""
# pylint: disable-msg=too-many-locals
if isinstance(environment, dict):
environment = [f"{k}={v}" for k, v in environment.items()]
data = {
"AttachStderr": stderr,
"AttachStdin": stdin,
"AttachStdout": stdout,
"Cmd": cmd if isinstance(cmd, list) else shlex.split(cmd),
# "DetachKeys": detach, # This is something else
"Env": environment,
"Privileged": privileged,
"Tty": tty,
"WorkingDir": workdir,
}
if user:
data["User"] = user
stream = stream and not detach
# create the exec instance
response = self.client.post(f"/containers/{self.name}/exec", data=json.dumps(data))
response.raise_for_status()
exec_id = response.json()['Id']
# start the exec instance, this will store command output
start_resp = self.client.post(
f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream
)
start_resp.raise_for_status()
if stream:
return None, api.stream_frames(start_resp, demux=demux)
# get and return exec information
response = self.client.get(f"/exec/{exec_id}/json")
response.raise_for_status()
if demux:
stdout_data, stderr_data = demux_output(start_resp.content)
return response.json().get('ExitCode'), (stdout_data, stderr_data)
return response.json().get('ExitCode'), start_resp.content
def export(self, chunk_size: int = api.DEFAULT_CHUNK_SIZE) -> Iterator[bytes]:
"""Download container's filesystem contents as a tar archive.
Args:
chunk_size: <= number of bytes to return for each iteration of the generator.
Yields:
tarball in size/chunk_size chunks
Raises:
NotFound: when container has been removed from service
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/export", stream=True)
response.raise_for_status()
yield from response.iter_content(chunk_size=chunk_size)
def get_archive(
self, path: str, chunk_size: int = api.DEFAULT_CHUNK_SIZE
) -> tuple[Iterable, dict[str, Any]]:
"""Download a file or folder from the container's filesystem.
Args:
path: Path to file or folder.
chunk_size: <= number of bytes to return for each iteration of the generator.
Returns:
First item is a raw tar data stream.
Second item is a dict containing os.stat() information on the specified path.
"""
response = self.client.get(f"/containers/{self.id}/archive", params={"path": [path]})
response.raise_for_status()
stat = response.headers.get("x-docker-container-path-stat", None)
stat = api.decode_header(stat)
return response.iter_content(chunk_size=chunk_size), stat
def init(self) -> None:
"""Initialize the container."""
response = self.client.post(f"/containers/{self.id}/init")
response.raise_for_status()
def inspect(self) -> dict:
"""Inspect a container.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/json")
response.raise_for_status()
return response.json()
def kill(self, signal: Union[str, int, None] = None) -> None:
"""Send signal to container.
Raises:
APIError: when service reports an error
"""
response = self.client.post(f"/containers/{self.id}/kill", params={"signal": signal})
response.raise_for_status()
def logs(self, **kwargs) -> Union[bytes, Iterator[bytes]]:
"""Get logs from the container.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return generator of strings as the response. Default: False
timestamps (bool): Show timestamps in output. Default: False
tail (Union[str, int]): Output specified number of lines at the end of
logs. Integer representing the number of lines to display, or the string all.
Default: all
since (Union[datetime, int]): Show logs since a given datetime or
integer epoch (in seconds)
follow (bool): Follow log output. Default: False
until (Union[datetime, int]): Show logs that occurred before the given
datetime or integer epoch (in seconds)
"""
stream = bool(kwargs.get("stream", False))
params = {
"follow": kwargs.get("follow", kwargs.get("stream", None)),
"since": api.prepare_timestamp(kwargs.get("since")),
"stderr": kwargs.get("stderr", True),
"stdout": kwargs.get("stdout", True),
"tail": kwargs.get("tail"),
"timestamps": kwargs.get("timestamps"),
"until": api.prepare_timestamp(kwargs.get("until")),
}
response = self.client.get(f"/containers/{self.id}/logs", stream=stream, params=params)
response.raise_for_status()
if stream:
return api.stream_frames(response)
return api.frames(response)
def pause(self) -> None:
"""Pause processes within the container."""
response = self.client.post(f"/containers/{self.id}/pause")
response.raise_for_status()
def put_archive(self, path: str, data: bytes = None) -> bool:
"""Upload tar archive containing a file or folder to be written into container.
Args:
path: File to write data into
data: Contents to write to file, when None path will be read on client to
build tarfile.
Returns:
True when successful
Raises:
APIError: when server reports error
"""
if path is None:
raise ValueError("'path' is a required argument.")
if data is None:
data = api.create_tar("/", path)
response = self.client.put(
f"/containers/{self.id}/archive", params={"path": path}, data=data
)
return response.ok
def remove(self, **kwargs) -> None:
"""Delete container.
Keyword Args:
v (bool): Delete associated volumes as well.
link (bool): Ignored.
force (bool): Kill a running container before deleting.
"""
self.manager.remove(self.id, **kwargs)
def rename(self, name: str) -> None:
"""Rename container.
Container updated in-situ to avoid reload().
Args:
name: New name for container.
"""
if not name:
raise ValueError("'name' is a required argument.")
response = self.client.post(f"/containers/{self.id}/rename", params={"name": name})
response.raise_for_status()
self.attrs["Name"] = name # shortcut to avoid needing reload()
def resize(self, height: int = None, width: int = None) -> None:
"""Resize the tty session.
Args:
height: New height of tty session.
width: New width of tty session.
"""
params = {
"h": height,
"w": width,
}
response = self.client.post(f"/containers/{self.id}/resize", params=params)
response.raise_for_status()
def restart(self, **kwargs) -> None:
"""Restart processes in container.
Keyword Args:
timeout (int): Seconds to wait for container to stop before killing container.
"""
params = {"timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/restart", params=params, **post_kwargs)
response.raise_for_status()
def start(self, **kwargs) -> None:
"""Start processes in container.
Keyword Args:
detach_keys: Override the key sequence for detaching a container (Podman only)
"""
response = self.client.post(
f"/containers/{self.id}/start", params={"detachKeys": kwargs.get("detach_keys")}
)
response.raise_for_status()
def stats(
self, **kwargs
) -> Union[bytes, dict[str, Any], Iterator[bytes], Iterator[dict[str, Any]]]:
"""Return statistics for container.
Keyword Args:
decode (bool): If True and stream is True, stream will be decoded into dict's.
Default: False.
stream (bool): Stream statistics until cancelled. Default: True.
Raises:
APIError: when service reports an error
"""
# FIXME Errors in stream are not handled, need content and json to read Errors.
stream = kwargs.get("stream", True)
decode = kwargs.get("decode", False)
params = {
"containers": self.id,
"stream": stream,
}
response = self.client.get("/containers/stats", params=params, stream=stream)
response.raise_for_status()
if stream:
return api.stream_helper(response, decode_to_json=decode)
return json.loads(response.content) if decode else response.content
def stop(self, **kwargs) -> None:
"""Stop container.
Keyword Args:
all (bool): When True, stop all containers. Default: False (Podman only)
ignore (bool): When True, ignore error if container already stopped (Podman only)
timeout (int): Number of seconds to wait on container to stop before killing it.
"""
params = {"all": kwargs.get("all"), "timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/stop", params=params, **post_kwargs)
response.raise_for_status()
if response.status_code == requests.codes.no_content:
return
if response.status_code == requests.codes.not_modified:
if kwargs.get("ignore", False):
return
body = response.json()
raise APIError(body["cause"], response=response, explanation=body["message"])
def top(self, **kwargs) -> Union[Iterator[dict[str, Any]], dict[str, Any]]:
"""Report on running processes in the container.
Keyword Args:
ps_args (str): When given, arguments will be passed to ps
stream (bool): When True, repeatedly return results. Default: False
Raises:
NotFound: when the container no longer exists
APIError: when the service reports an error
"""
stream = kwargs.get("stream", False)
params = {
"stream": stream,
"ps_args": kwargs.get("ps_args"),
}
response = self.client.get(f"/containers/{self.id}/top", params=params, stream=stream)
response.raise_for_status()
if stream:
return api.stream_helper(response, decode_to_json=True)
return response.json()
def unpause(self) -> None:
"""Unpause processes in container."""
response = self.client.post(f"/containers/{self.id}/unpause")
response.raise_for_status()
def update(self, **kwargs):
"""Update resource configuration of the containers.
Raises:
NotImplementedError: Podman service unsupported operation.
"""
raise NotImplementedError("Container.update() is not supported by Podman service.")
def wait(self, **kwargs) -> int:
"""Block until the container enters given state.
Keyword Args:
condition (Union[str, list[str]]): Container state on which to release.
One or more of: "configured", "created", "running", "stopped",
"paused", "exited", "removing", "stopping".
interval (int): Time interval to wait before polling for completion.
Returns:
"Error" key has a dictionary value with the key "Message".
Raises:
NotFound: when Container not found
ReadTimeoutError: when timeout is exceeded
APIError: when service returns an error
"""
condition = kwargs.get("condition")
if isinstance(condition, str):
condition = [condition]
interval = kwargs.get("interval")
params = {}
if condition != []:
params["condition"] = condition
if interval != "":
params["interval"] = interval
# This API endpoint responds with a JSON encoded integer.
# See:
# https://docs.podman.io/en/latest/_static/api.html#tag/containers/operation/ContainerWaitLibpod
response = self.client.post(f"/containers/{self.id}/wait", params=params)
response.raise_for_status()
return response.json()