Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend support for ctdb cluster meta stored in ceph rados #123

Merged
merged 8 commits into from
Jun 28, 2024
30 changes: 29 additions & 1 deletion sambacc/commands/ctdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

from sambacc import ctdb
from sambacc import jfile
from sambacc.simple_waiter import Waiter
from sambacc import rados_opener
from sambacc.simple_waiter import Sleeper, Waiter

from .cli import best_waiter, commands, Context, Fail

Expand Down Expand Up @@ -84,6 +85,17 @@ def _ctdb_general_node_args(parser: argparse.ArgumentParser) -> None:
" the specified policy."
),
)
parser.add_argument(
"--take-node-number-from-env",
"-E",
const="NODE_NUMBER",
nargs="?",
help=(
"Take the node number from the environment. If specified"
" with a value, use that value as the environment variable"
" name. Otherwise, use environment variable NODE_NUMBER."
),
)
parser.add_argument(
"--persistent-path",
help="Path to a persistent path for storing nodes file",
Expand Down Expand Up @@ -146,6 +158,16 @@ def __init__(self, ctx: Context):
f"invalid hostname for node number: {self.hostname}"
)
self.node_number = int(self.hostname.rsplit("-")[-1])
elif ctx.cli.take_node_number_from_env:
try:
self.node_number = int(
os.environ[ctx.cli.take_node_number_from_env]
)
except (KeyError, ValueError):
raise ValueError(
"failed to get node number from environment var"
f" {ctx.cli.take_node_number_from_env}"
)
else:
self.node_number = None

Expand Down Expand Up @@ -195,6 +217,12 @@ def _cluster_meta_init(self) -> None:
# don't do file modes the way we need for JSON state file or do
# writable file types in the url_opener (urllib wrapper). For now, just
# manually handle the string.
if rados_opener.is_rados_uri(uri):
self._cluster_meta_obj = (
rados_opener.ClusterMetaRADOSObject.create_from_uri(uri)
)
self._waiter_obj = Sleeper()
return
if uri.startswith("file:"):
path = uri.split(":", 1)[-1]
else:
Expand Down
4 changes: 2 additions & 2 deletions sambacc/commands/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ def pre_action(cli: typing.Any) -> None:
if cli.samba_command_prefix:
samba_cmds.set_global_prefix([cli.samba_command_prefix])

# should there be an option to force {en,dis}able openers?
# should there be an option to force {en,dis}able rados?
# Right now we just always try to enable rados when possible.
rados_opener.enable_rados_url_opener(
rados_opener.enable_rados(
url_opener.URLOpener,
client_name=cli.ceph_id.get("client_name", ""),
full_name=cli.ceph_id.get("full_name", False),
Expand Down
202 changes: 187 additions & 15 deletions sambacc/rados_opener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import io
import json
import logging
import time
import typing
import urllib.request
import uuid

from . import url_opener
from .typelets import ExcType, ExcValue, ExcTraceback
from .typelets import ExcType, ExcValue, ExcTraceback, Self

_RADOSModule = typing.Any
_RADOSObject = typing.Any
Expand Down Expand Up @@ -62,37 +64,88 @@ class _RADOSHandler(urllib.request.BaseHandler):
_interface: typing.Optional[_RADOSInterface] = None

def rados_open(self, req: urllib.request.Request) -> typing.IO:
"""Open a rados-style url. Called from urllib."""
if self._interface is None:
raise RADOSUnsupported()
if req.selector.startswith("mon-config-key:"):
return _get_mon_config_key(
self._interface, req.selector.split(":", 1)[1]
)
rinfo = self._parse_req(req)
if rinfo.get("subtype") == "mon-config-key":
return _get_mon_config_key(self._interface, rinfo["path"])
return RADOSObjectRef(
self._interface, rinfo["pool"], rinfo["ns"], rinfo["key"]
)

def get_object(
self, uri: str, *, must_exist: bool = False
) -> RADOSObjectRef:
"""Return a rados object reference for the given rados uri. The uri
must refer to a rados object only as the RADOSObjectRef can do various
rados-y things, more than an IO requires.
"""
if self._interface is None:
raise RADOSUnsupported()
rinfo = self._parse_req(urllib.request.Request(uri))
if rinfo.get("type") != "rados":
raise ValueError("only rados URI values supported")
if rinfo.get("subtype") == "mon-config-key":
raise ValueError("only rados object URI values supported")
return RADOSObjectRef(
self._interface,
rinfo["pool"],
rinfo["ns"],
rinfo["key"],
must_exist=must_exist,
)

def _parse_req(self, req: urllib.request.Request) -> dict[str, str]:
"""Parse a urlib request into useful components."""
subtype = "mon-config-key"
if req.selector.startswith(subtype + ":"):
return {
"type": req.type,
"subtype": subtype,
"path": req.selector.split(":", 1)[1],
}
sel = req.selector.lstrip("/")
if req.host:
pool = req.host
ns, key = sel.split("/", 1)
else:
pool, ns, key = sel.split("/", 2)
return _RADOSResponse(self._interface, pool, ns, key)
return {
"type": req.type,
"subtype": "object",
"pool": pool,
"ns": ns,
"key": key,
}


# it's quite annoying to have a read-only typing.IO we're forced to
# have so many stub methods. Go's much more granular io interfaces for
# readers/writers is much nicer for this.
class _RADOSResponse(typing.IO):
class RADOSObjectRef(typing.IO):
def __init__(
self, interface: _RADOSInterface, pool: str, ns: str, key: str
self,
interface: _RADOSInterface,
pool: str,
ns: str,
key: str,
*,
must_exist: bool = True,
) -> None:
self._pool = pool
self._ns = ns
self._key = key
self._lock_description = "sambacc RADOS library"
self._lock_duration = None

self._open(interface)
self._test()
if must_exist:
self._test()

def _open(self, interface: _RADOSInterface) -> None:
# TODO: connection caching
self._api = interface.api
self._conn = interface.Rados()
self._conn.connect()
self._connected = True
Expand Down Expand Up @@ -142,15 +195,15 @@ def mode(self) -> str:
def name(self) -> str:
return self._key

def __enter__(self) -> _RADOSResponse:
def __enter__(self) -> Self:
return self

def __exit__(
self, exc_type: ExcType, exc_val: ExcValue, exc_tb: ExcTraceback
) -> None:
self.close()

def __iter__(self) -> _RADOSResponse:
def __iter__(self) -> Self:
return self

def __next__(self) -> bytes:
Expand Down Expand Up @@ -198,6 +251,38 @@ def write(self, s: typing.Any) -> int:
def writelines(self, ls: typing.Iterable[typing.Any]) -> None:
raise NotImplementedError()

def write_full(self, data: bytes) -> None:
"""Write the object such that its contents are exactly `data`."""
self._ioctx.write_full(self._key, data)

def _lock_exclusive(self, name: str, cookie: str) -> None:
self._ioctx.lock_exclusive(
self._key,
name,
cookie,
desc=self._lock_description,
duration=self._lock_duration,
)

def _acquire_lock_exclusive(
self, name: str, cookie: str, *, delay: int = 1
anoopcs9 marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
while True:
try:
self._lock_exclusive(name, cookie)
return
except self._api.ObjectBusy:
_logger.debug(
"lock failed: %r, %r, %r: object busy",
self._key,
name,
cookie,
)
time.sleep(delay)

def _unlock(self, name: str, cookie: str) -> None:
self._ioctx.unlock(self._key, name, cookie)


def _get_mon_config_key(interface: _RADOSInterface, key: str) -> io.BytesIO:
mcmd = json.dumps(
Expand All @@ -218,15 +303,102 @@ def _get_mon_config_key(interface: _RADOSInterface, key: str) -> io.BytesIO:
raise OSError(ret, msg)


def enable_rados_url_opener(
class ClusterMetaRADOSHandle:
"A Cluster Meta Object can load or dump persistent cluster descriptions."

def __init__(
self,
rados_obj: RADOSObjectRef,
uri: str,
*,
read: bool,
write: bool,
locked: bool,
):
self._rados_obj = rados_obj
self._uri = uri
self._read = read
self._write = write
self._locked = locked
if self._locked:
self._lock_name = "cluster_meta"
self._cookie = f"sambacc:{uuid.uuid4()}"

def load(self) -> typing.Any:
if not self._read:
raise ValueError("not readable")
buf = self._rados_obj.read()
if not buf:
return {}
return json.loads(buf)

def dump(self, data: typing.Any) -> None:
if not self._read:
raise ValueError("not writable")
buf = json.dumps(data).encode("utf8")
self._rados_obj.write_full(buf)

def __enter__(self) -> Self:
if self._locked:
self._rados_obj._acquire_lock_exclusive(
self._lock_name, self._cookie
)
return self

def __exit__(
self, exc_type: ExcType, exc_val: ExcValue, exc_tb: ExcTraceback
) -> None:
if self._locked:
self._rados_obj._unlock(self._lock_name, self._cookie)
return


class ClusterMetaRADOSObject:
def __init__(self, rados_handler: _RADOSHandler, uri: str) -> None:
self._handler = rados_handler
self._uri = uri

def open(
self, *, read: bool = True, write: bool = False, locked: bool = False
) -> ClusterMetaRADOSHandle:
return ClusterMetaRADOSHandle(
self._handler.get_object(self._uri),
self._uri,
read=read,
write=write,
locked=locked,
)

@classmethod
def create_from_uri(cls, uri: str) -> Self:
"""Return a new ClusterMetaRADOSObject given a rados uri string.
If rados module is unavailable RADOSUnsupported will be raised.
"""
handler = _RADOSHandler()
if not handler._interface:
raise RADOSUnsupported()
return cls(handler, uri)


def is_rados_uri(uri: str) -> bool:
"""Return true if the string can be used as a rados (pseudo) URI.
This function does not require the rados libraries to be available.
NB: It does not validate the structure of the URI.
"""
return uri.startswith("rados:")


def enable_rados(
cls: typing.Type[url_opener.URLOpener],
*,
client_name: str = "",
full_name: bool = False,
) -> None:
"""Extend the URLOpener type to support pseudo-URLs for rados
object storage. If rados libraries are not found the function
does nothing.
"""Enable Ceph RADOS support in sambacc.
As as side-effect it will extend the URLOpener type to support pseudo-URLs
for rados object storage. It will also enable the
ClusterMetaRADOSObject.create_from_uri constructor. If rados libraries are
not found the function does nothing.

If rados libraries are found than URLOpener can be used like:
>>> uo = url_opener.URLOpener()
Expand Down
Loading
Loading