Skip to content

Commit

Permalink
Merge branch 5.x into 7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Nov 25, 2020
2 parents 8c22dfd + b76a22b commit 8ff3ae4
Show file tree
Hide file tree
Showing 17 changed files with 650 additions and 71 deletions.
268 changes: 228 additions & 40 deletions cluster/module/server.c

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions meta2v2/meta2_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -2837,21 +2837,22 @@ _purge_deleted_aliases(struct sqlx_sqlite3_s *sq3, gint64 delay,
GError *err = NULL;
gchar *sql;
GVariant *params[] = {NULL, NULL, NULL};
gint64 now = oio_ext_real_time () / G_TIME_SPAN_SECOND;
gint64 now = oio_ext_real_time();
gint64 time_limit = 0;

// All aliases which have one version deleted (the last) older than time_limit
if (alias) {
sql = (" alias IN "
"(SELECT alias FROM "
" (SELECT alias,ctime,deleted FROM aliases WHERE alias = ? "
" GROUP BY alias) "
" WHERE deleted AND ctime < ?) ");
" (SELECT alias, MAX(version) as version, deleted "
" FROM aliases WHERE alias = ? GROUP BY alias) "
" WHERE deleted AND version < ?) ");
} else {
sql = (" alias IN "
"(SELECT alias FROM "
" (SELECT alias,ctime,deleted FROM aliases GROUP BY alias) "
" WHERE deleted AND ctime < ?) ");
" (SELECT alias, MAX(version) as version, deleted "
" FROM aliases GROUP BY alias) "
" WHERE deleted AND version < ?) ");
}

if (now < 0) {
Expand All @@ -2860,6 +2861,7 @@ _purge_deleted_aliases(struct sqlx_sqlite3_s *sq3, gint64 delay,
return err;
}

delay = delay * G_TIME_SPAN_SECOND;
if (delay >= 0 && delay < now) {
time_limit = now - delay;
}
Expand Down
12 changes: 12 additions & 0 deletions metautils/lib/metatype_srvinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,17 @@ gchar ** metautils_service_list_to_urlv(GSList *l);
/* Build a serialized representation of meta1_url that correspond
* to the given service. */
gchar * metautils_service_to_m1url(const struct service_info_s *si, gint64 seq);
/* -------------------------------------------------------------------------- */

struct service_info_dated_s *service_info_dated_new(
struct service_info_s *si, time_t lock_mtime);

void service_info_dated_free(struct service_info_dated_s *sid);

void service_info_dated_encode_json(GString *gstr,
const struct service_info_dated_s *sid, gboolean full);

GError* service_info_dated_load_json(const gchar *encoded,
struct service_info_dated_s **out, gboolean permissive);

#endif /*OIO_SDS__metautils__lib__metatype_srvinfo_h*/
11 changes: 11 additions & 0 deletions metautils/lib/metatypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ typedef struct namespace_info_s
typedef struct score_s
{
gint32 value; /**< The score value */
// Watch out for 19 Jan 2038 03:14:07
gint32 timestamp; /**< The timestamp this score was created */
} score_t;

Expand Down Expand Up @@ -221,4 +222,14 @@ typedef struct addr_rule_s
gboolean authorize; /**< Allow (TRUE) or deny (FALSE) */
} addr_rule_t;

/**
* Type to store a service info
*/
typedef struct service_info_dated_s
{
service_info_t *si; /**< The service info */
time_t lock_mtime; /**< The modification time of the lock */
time_t tags_mtime; /**< The modification time of the tags */
} service_info_dated_t;

#endif /*OIO_SDS__metautils__lib__metatypes_h*/
111 changes: 111 additions & 0 deletions metautils/lib/utils_service_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ void service_info_clean_tags(struct service_info_s *si)
service_tag_destroy(tag);
}
g_ptr_array_free(pa, TRUE);
si->tags = NULL;
}
}

Expand Down Expand Up @@ -608,3 +609,113 @@ metautils_service_list_to_urlv(GSList *l)
return (gchar**) g_ptr_array_free(tmp, FALSE);
}

/* -------------------------------------------------------------------------- */

struct service_info_dated_s *
service_info_dated_new(struct service_info_s *si, time_t lock_mtime)
{
struct service_info_dated_s *sid = g_malloc0(sizeof(
struct service_info_dated_s));
sid->si = service_info_dup(si);
sid->lock_mtime = lock_mtime;
sid->tags_mtime = si->score.timestamp * G_TIME_SPAN_SECOND;
return sid;
}

void
service_info_dated_free(struct service_info_dated_s *sid)
{
if (!sid)
return;

service_info_clean(sid->si);
g_free(sid);
}

void
service_info_dated_encode_json(GString *gstr,
const struct service_info_dated_s *sid, gboolean full)
{
if (!sid)
return;
gchar straddr[STRLEN_ADDRINFO];
grid_addrinfo_to_string(&(sid->si->addr), straddr, sizeof(straddr));
g_string_append_c(gstr, '{');
OIO_JSON_append_str(gstr, "addr", straddr);
g_string_append_c(gstr, ',');
OIO_JSON_append_int(gstr, "score", sid->si->score.value);
if (full) {
g_string_append_c(gstr, ',');
OIO_JSON_append_str(gstr, "ns", sid->si->ns_name);
g_string_append_c(gstr, ',');
OIO_JSON_append_str(gstr, "type", sid->si->type);
}
g_string_append_static(gstr, ",\"tags\":{");
_append_all_tags(gstr, sid->si->tags);
g_string_append_static(gstr, "},\"mtime\":{");
OIO_JSON_append_int(gstr, "lock", sid->lock_mtime);
g_string_append_c(gstr, ',');
OIO_JSON_append_int(gstr, "tags", sid->tags_mtime);
g_string_append_static(gstr, "}}");
}

GError*
service_info_dated_load_json(const gchar *encoded,
struct service_info_dated_s **out, gboolean permissive)
{
struct json_tokener *tok = json_tokener_new();
struct json_object *obj = json_tokener_parse_ex(tok,
encoded, strlen(encoded));
json_tokener_free(tok);

struct service_info_dated_s *sid = NULL;
struct service_info_s *si = NULL;

GError *err = service_info_load_json_object(obj, &si, permissive);
if (err)
goto end;

struct json_object *mtime = NULL, *lock_mtime = NULL, *tags_mtime = NULL;
struct oio_ext_json_mapping_s mapping[] = {
{"mtime", &mtime, json_type_object, 0},
{NULL, NULL, 0, 0}
};
err = oio_ext_extract_json(obj, mapping);
if (err)
goto end;
if (mtime) {
struct oio_ext_json_mapping_s mapping_mtime[] = {
{"lock", &lock_mtime, json_type_int, 0},
{"tags", &tags_mtime, json_type_int, 0},
{NULL, NULL, 0, 0}
};
err = oio_ext_extract_json(mtime, mapping_mtime);
if (err)
goto end;
}
sid = g_malloc0(sizeof(struct service_info_dated_s));
sid->si = si;
if (lock_mtime) {
sid->lock_mtime = json_object_get_int64(lock_mtime);
} else {
/* Conscience is not up to date.
* Consider the service up to date. */
sid->lock_mtime = oio_ext_real_time();
}
if (tags_mtime) {
sid->tags_mtime = json_object_get_int64(tags_mtime);
} else {
/* Conscience is not up to date.
* Consider the service up to date. */
sid->tags_mtime = oio_ext_real_time();
}

end:
json_object_put(obj);
if (!err) {
*out = sid;
} else {
service_info_clean(si);
}
return err;
}
68 changes: 65 additions & 3 deletions oio/cli/rdir/rdir.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2018-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2018-2020 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -39,7 +39,7 @@ def _format_assignments(all_services, svc_col_title='Rawx'):


class RdirBootstrap(Lister):
"""Assign an rdir services"""
"""Assign rdir services"""

log = getLogger(__name__ + '.RdirBootstrap')

Expand All @@ -59,14 +59,24 @@ def get_parser(self, prog_name):
type=int,
help=("Minimum required distance between any service and "
"its assigned rdir service."))
parser.add_argument(
'--service-id',
metavar='<service-id>',
help="Assign an rdir only for this service ID.")
parser.add_argument(
'--dry-run', action='store_true',
help='Display actions but do nothing.')
return parser

def take_action(self, parsed_args):
dispatcher = self.app.client_manager.rdir_dispatcher
try:
all_services = dispatcher.assign_services(
parsed_args.service_type, parsed_args.max_per_rdir,
parsed_args.service_type,
max_per_rdir=parsed_args.max_per_rdir,
min_dist=parsed_args.min_dist,
service_id=parsed_args.service_id,
dry_run=parsed_args.dry_run,
connection_timeout=30.0, read_timeout=90.0)
except OioException as exc:
self.success = False
Expand Down Expand Up @@ -130,3 +140,55 @@ def take_action(self, parsed_args):
results.sort()
columns = ('Rdir', 'Number of bases', 'Bases')
return columns, results


class RdirReassign(Lister):
"""Reassign rdir services"""

log = getLogger(__name__ + '.RdirReassign')

def get_parser(self, prog_name):
parser = super(RdirReassign, self).get_parser(prog_name)
parser.add_argument(
'service_type',
help="Which service type to assign rdir to.")
parser.add_argument(
'--max-per-rdir',
metavar='<N>',
type=int,
help="Maximum number of databases per rdir service.")
parser.add_argument(
'--min-dist',
metavar='<N>',
type=int,
help=("Minimum required distance between any service and "
"its assigned rdir service."))
parser.add_argument(
'--service-id',
metavar='<service-id>',
help="Assign an rdir only for this service ID.")
parser.add_argument(
'--dry-run', action='store_true',
help='Display actions but do nothing.')
return parser

def take_action(self, parsed_args):
dispatcher = self.app.client_manager.rdir_dispatcher
try:
all_services = dispatcher.assign_services(
parsed_args.service_type,
reassign=True,
service_id=parsed_args.service_id,
max_per_rdir=parsed_args.max_per_rdir,
min_dist=parsed_args.min_dist,
dry_run=parsed_args.dry_run,
connection_timeout=30.0, read_timeout=90.0)
except OioException as exc:
self.success = False
self.log.warn('Failed to assign all %s services: %s',
parsed_args.service_type, exc)
all_services, _ = dispatcher.get_assignments(
parsed_args.service_type, connection_timeout=30.0,
read_timeout=90.0)
return _format_assignments(all_services,
parsed_args.service_type.capitalize())
42 changes: 36 additions & 6 deletions oio/rdir/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,36 @@ def get_assignments(self, service_type, **kwargs):
return all_services, all_rdir

def assign_services(self, service_type,
max_per_rdir=None, min_dist=None, **kwargs):
max_per_rdir=None, min_dist=None, service_id=None,
reassign=False, **kwargs):
"""
Assign an rdir service to all `service_type` servers that aren't
already assigned one.
:param max_per_rdir: Maximum number of services an rdir can handle.
:type max_per_rdir: `int`
:param min_dist: minimum required distance between any service and
:param min_dist: Minimum required distance between any service and
its assigned rdir service.
:type min_dist: `int`
:param service_id: Assign only this service ID.
:type service_id: `str`
:param reassign: Reassign an rdir service.
:type reassign: `bool`
:param dry_run: Display actions but do nothing.
:type dry_run: `bool`
:returns: The list of `service_type` services that were assigned
rdir services.
"""
all_services = self.cs.all_services(service_type, **kwargs)
if service_id:
for provider in all_services:
provider_id = provider['tags'].get('tag.service_id',
provider['addr'])
if service_id == provider_id:
break
else:
raise ValueError('%s isn\'t a %s' % (service_id, service_type))
all_services = [provider]
all_rdir = self.cs.all_services('rdir', True, **kwargs)
if len(all_rdir) <= 0:
raise ServiceUnavailable("No rdir service found in %s" % self.ns)
Expand All @@ -141,18 +157,26 @@ def assign_services(self, service_type,
service_type='rdir', **kwargs)
rdir_host = _filter_rdir_host(resp)
try:
provider['rdir'] = by_id[_make_id(self.ns, 'rdir',
rdir_host)]
rdir = by_id[_make_id(self.ns, 'rdir', rdir_host)]
if reassign:
rdir['tags']['stat.opened_db_count'] = \
rdir['tags'].get('stat.opened_db_count', 0) - 1
# TODO(adu) Delete database
raise NotFound('Reassign an rdir services')
provider['rdir'] = rdir
except KeyError:
self.logger.warn("rdir %s linked to %s %s seems down",
rdir_host, service_type,
provider_id)
if reassign:
raise NotFound('Reassign an rdir services')
except NotFound:
try:
rdir = self._smart_link_rdir(provider_id, all_rdir,
service_type=service_type,
max_per_rdir=max_per_rdir,
min_dist=min_dist,
reassign=reassign,
**kwargs)
except OioException as exc:
self.logger.warn("Failed to link an rdir to %s %s: %s",
Expand Down Expand Up @@ -201,7 +225,7 @@ def assign_all_rawx(self, max_per_rdir=None, **kwargs):

def _smart_link_rdir(self, volume_id, all_rdir, max_per_rdir=None,
max_attempts=7, service_type='rawx', min_dist=None,
**kwargs):
reassign=False, dry_run=False, **kwargs):
"""
Force the load balancer to avoid services that already host more
bases than the average (or more than `max_per_rdir`)
Expand Down Expand Up @@ -230,13 +254,19 @@ def _smart_link_rdir(self, volume_id, all_rdir, max_per_rdir=None,
# Retry without `avoids`, hoping the next iteration will rebalance
polled = self._poll_rdir(known=known, min_dist=min_dist, **kwargs)

if dry_run:
# No association of the rdir to the rawx
# No creation in the rdir
return polled['id']

# Associate the rdir to the rawx
forced = {'host': polled['addr'], 'type': 'rdir',
'seq': 1, 'args': "", 'id': polled['id']}
for i in range(max_attempts):
try:
self.directory.force(RDIR_ACCT, volume_id, 'rdir',
forced, autocreate=True, **kwargs)
forced, autocreate=True,
replace=reassign, **kwargs)
break
except ClientException as ex:
# Already done
Expand Down
Loading

0 comments on commit 8ff3ae4

Please sign in to comment.