diff --git a/platforms/kubernetes/postgres-operator/postgres/constants.py b/platforms/kubernetes/postgres-operator/postgres/constants.py index 960ff75..cab0371 100644 --- a/platforms/kubernetes/postgres-operator/postgres/constants.py +++ b/platforms/kubernetes/postgres-operator/postgres/constants.py @@ -166,6 +166,13 @@ # storage STORAGE_CLASS_NAME = "storageClassName" +# conditions +CONDITIONS = "conditions" +CONDITIONS_LIMIT = 10 +CONDITION_TRUE = "True" +CONDITION_FALSE = "False" +CONDITION_UNKNOWN = "Unknown" + # pod PriorityClass SPEC_POD_PRIORITY_CLASS = "priorityClassName" SPEC_POD_PRIORITY_CLASS_SCOPE_NODE = "system-node-critical" diff --git a/platforms/kubernetes/postgres-operator/postgres/handle.py b/platforms/kubernetes/postgres-operator/postgres/handle.py index 8ece891..2edf0ab 100644 --- a/platforms/kubernetes/postgres-operator/postgres/handle.py +++ b/platforms/kubernetes/postgres-operator/postgres/handle.py @@ -13,182 +13,10 @@ from kubernetes import client, config from kubernetes.stream import stream from config import operator_config -from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List +from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List, Conditions from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger - -from constants import ( - VIP, - RADONDB_POSTGRES, - POSTGRES_OPERATOR, - AUTOFAILOVER, - POSTGRESQL, - READWRITEINSTANCE, - READONLYINSTANCE, - MACHINES, - ACTION, - ACTION_START, - ACTION_STOP, - IMAGE, - PODSPEC, - SPEC, - CONTAINERS, - CONTAINER_NAME, - PODSPEC_CONTAINERS_POSTGRESQL_CONTAINER, - PODSPEC_CONTAINERS_EXPORTER_CONTAINER, - SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_MEMORY, - SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_CPU, - PRIME_SERVICE_PORT_NAME, - EXPORTER_SERVICE_PORT_NAME, - HBAS, - CONFIGS, - REPLICAS, - VOLUMECLAIMTEMPLATES, - AUTOCTL_NODE, - PGAUTOFAILOVER_REPLICATOR, - STREAMING, - STREAMING_SYNC, - STREAMING_ASYNC, - DELETE_PVC, - POSTGRESQL_PVC_NAME, - SUCCESS, - FAILED, - SERVICES, - SELECTOR, - SERVICE_AUTOFAILOVER, - SERVICE_PRIMARY, - SERVICE_STANDBY, - SERVICE_READONLY, - SERVICE_STANDBY_READONLY, - SPEC_POSTGRESQL_USERS, - SPEC_POSTGRESQL_USERS_ADMIN, - SPEC_POSTGRESQL_USERS_MAINTENANCE, - SPEC_POSTGRESQL_USERS_NORMAL, - SPEC_POSTGRESQL_USERS_USER_NAME, - SPEC_POSTGRESQL_USERS_USER_PASSWORD, - API_GROUP, - API_VERSION_V1, - RESOURCE_POSTGRESQL, - RESOURCE_KIND_POSTGRESQL, - CLUSTER_STATE, - CLUSTER_CREATE_BEGIN, - CLUSTER_CREATE_ADD_FAILOVER, - CLUSTER_CREATE_ADD_READWRITE, - CLUSTER_CREATE_ADD_READONLY, - CLUSTER_CREATE_FINISH, - BASE_LABEL_PART_OF, - BASE_LABEL_MANAGED_BY, - BASE_LABEL_NAME, - BASE_LABEL_NAMESPACE, - LABEL_NODE, - LABEL_NODE_AUTOFAILOVER, - LABEL_NODE_POSTGRESQL, - LABEL_NODE_USER_SERVICES, - LABEL_NODE_STATEFULSET_SERVICES, - LABEL_SUBNODE, - LABEL_SUBNODE_READWRITE, - LABEL_SUBNODE_AUTOFAILOVER, - LABEL_SUBNODE_READONLY, - LABEL_ROLE, - LABEL_ROLE_PRIMARY, - LABEL_ROLE_STANDBY, - LABEL_STATEFULSET_NAME, - MACHINE_MODE, - K8S_MODE, - PGHOME, - DOCKER_COMPOSE_FILE, - DOCKER_COMPOSE_FILE_DATA, - DOCKER_COMPOSE_ENV, - DOCKER_COMPOSE_ENV_DATA, - DOCKER_COMPOSE_ENVFILE, - DOCKER_COMPOSE_EXPORTER_ENVFILE, - DOCKER_COMPOSE_DIR, - PGDATA_DIR, - ASSIST_DIR, - DATA_DIR, - INIT_FINISH, - PG_CONFIG_PREFIX, - PG_HBA_PREFIX, - RESTORE, - RESTORE_FROMSSH, - RESTORE_FROMSSH_PATH, - RESTORE_FROMSSH_ADDRESS, - RESTORE_FROMSSH_LOCAL, - PG_DATABASE_DIR, - PG_DATABASE_RESTORING_DIR, - LVS_BODY, - LVS_REAL_MAIN_SERVER, - LVS_REAL_READ_SERVER, - LVS_REAL_EMPTY_SERVER, - LVS_SET_NET, - LVS_UNSET_NET, - CLUSTER_STATUS_CREATE, - CLUSTER_STATUS_UPDATE, - CLUSTER_STATUS_RUN, - CLUSTER_STATUS_STOP, - CLUSTER_STATUS_CREATE_FAILED, - CLUSTER_STATUS_UPDATE_FAILED, - CLUSTER_STATUS_TERMINATE, - CLUSTER_STATUS, - SPEC_ANTIAFFINITY, - SPEC_ANTIAFFINITY_POLICY, - SPEC_ANTIAFFINITY_REQUIRED, - SPEC_ANTIAFFINITY_PREFERRED, - SPEC_ANTIAFFINITY_POLICY_REQUIRED, - SPEC_ANTIAFFINITY_POLICY_PREFERRED, - SPEC_ANTIAFFINITY_PODANTIAFFINITYTERM, - SPEC_ANTIAFFINITY_TOPOLOGYKEY, - SPEC_VOLUME_TYPE, - SPEC_VOLUME_LOCAL, - SPEC_VOLUME_CLOUD, - SECONDS, - MINUTES, - HOURS, - DAYS, - UPDATE_TOLERATION, - SPEC_POD_PRIORITY_CLASS, - SPEC_POD_PRIORITY_CLASS_SCOPE_NODE, - SPEC_POD_PRIORITY_CLASS_SCOPE_CLUSTER, - SPEC_S3, - SPEC_S3_ACCESS_KEY, - SPEC_S3_SECRET_KEY, - SPEC_S3_ENDPOINT, - SPEC_S3_BUCKET, - SPEC_S3_PATH, - SPEC_BACKUPCLUSTER, - SPEC_BACKUPTOS3, - SPEC_BACKUPTOS3_NAME, - SPEC_BACKUPTOS3_MANUAL, - SPEC_BACKUPTOS3_MANUAL_TRIGGER_ID, - SPEC_BACKUPTOS3_CRON, - SPEC_BACKUPTOS3_CRON_ENABLE, - SPEC_BACKUPTOS3_CRON_SCHEDULE, - SPEC_BACKUPTOS3_POLICY, - SPEC_BACKUPTOS3_POLICY_ARCHIVE, - SPEC_BACKUPTOS3_POLICY_ARCHIVE_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_COMPRESSION, - SPEC_BACKUPTOS3_POLICY_COMPRESSION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_ENCRYPTION, - SPEC_BACKUPTOS3_POLICY_ENCRYPTION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_RETENTION, - SPEC_BACKUPTOS3_POLICY_RETENTION_DEFAULT_VALUE, - SPEC_BACKUPTOS3_POLICY_RETENTION_DELETE_ALL_VALUE, - RESTORE_FROMS3, - RESTORE_FROMS3_NAME, - RESTORE_FROMS3_RECOVERY, - CLUSTER_STATUS_BACKUP, - CLUSTER_STATUS_ARCHIVE, - CLUSTER_STATUS_CRON_NEXT_RUN, - RESTORE_FROMS3_RECOVERY_LATEST, - RESTORE_FROMS3_RECOVERY_LATEST_FULL, - RESTORE_FROMS3_RECOVERY_OLDEST_FULL, - RECOVERY_FINISH, - PG_LOG_FILENAME, - SPEC_REBUILD, - SPCE_REBUILD_NODENAMES, - SPEC_DELETE_S3, - STORAGE_CLASS_NAME, -) +from constants import * PGLOG_DIR = "log" PRIMARY_FORMATION = " --formation primary " @@ -400,6 +228,53 @@ def set_password(patch: kopf.Patch, status: kopf.Status) -> None: password_length)) +def patch_cluster_conditions( + patch: kopf.Patch, + status: kopf.Status, + logger: logging.Logger, + type: str, + condition_status: str, + message: str, + override: bool = False, +) -> None: + condition = Conditions( + type, condition_status, + time.strftime(DEFAULT_TIME_FORMAT, time.localtime()), + str(message)).to_dict() + conditions = status.get(CONDITIONS, []) + + if not isinstance(conditions, list): + logger.warning( + f"patch_cluster_conditions failed, conditions is not list.") + return + + if len(conditions) > 0 and override: + conditions.pop() + conditions.append(condition) + # check conditions limit + if len(conditions) > CONDITIONS_LIMIT: + conditions = conditions[-1 * CONDITIONS_LIMIT:] + + patch.status[CONDITIONS] = conditions + + +def set_cluster_status_and_patch_conditions( + meta: kopf.Meta, + spec: kopf.Spec, + patch: kopf.Patch, + status: kopf.Status, + logger: logging.Logger, + statefield: str, + state: str, + condition_status: str, + condition_message: str, + override: bool = False, +) -> None: + set_cluster_status(meta, statefield, state, logger) + patch_cluster_conditions(patch, status, logger, state, condition_status, + condition_message, override) + + def create_statefulset_service( name: str, external_name: str, @@ -3444,7 +3319,10 @@ def create_cluster( logger: logging.Logger, ) -> None: try: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_CREATE, + CONDITION_UNKNOWN, '') logging.info("check create_cluster params") check_param(spec, logger, create=True) @@ -3464,13 +3342,30 @@ def create_cluster( time.sleep(5) # cluster running update_number_sync_standbys(meta, spec, patch, status, logger) - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_RUN, logger) + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, + CLUSTER_STATUS_RUN, + CONDITION_TRUE, + '', + override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() - traceback.format_exc() - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE_FAILED, - logger) + err = traceback.format_exc() + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, + CLUSTER_STATUS_CREATE_FAILED, + CONDITION_FALSE, + err, + override=True) def delete_cluster( @@ -3480,7 +3375,10 @@ def delete_cluster( status: kopf.Status, logger: logging.Logger, ) -> None: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_TERMINATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger, + CLUSTER_STATE, + CLUSTER_STATUS_TERMINATE, + CONDITION_UNKNOWN, '') delete_postgresql_cluster(meta, spec, patch, status, logger) @@ -5544,7 +5442,10 @@ def update_cluster( diffs: kopf.Diff, ) -> None: try: - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE, logger) + set_cluster_status_and_patch_conditions(meta, spec, patch, status, + logger, CLUSTER_STATE, + CLUSTER_STATUS_UPDATE, + CONDITION_UNKNOWN, '') logger.info("check update_cluster params") check_param(spec, logger, create=False) need_roll_update = False @@ -5665,13 +5566,30 @@ def update_cluster( else: cluster_status = CLUSTER_STATUS_RUN # set Running - set_cluster_status(meta, CLUSTER_STATE, cluster_status, logger) + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, + cluster_status, + CONDITION_TRUE, + '', + override=True) except Exception as e: logger.error(f"error occurs, {e}") traceback.print_exc() - traceback.format_exc() - set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE_FAILED, - logger) + err = traceback.format_exc() + set_cluster_status_and_patch_conditions(meta, + spec, + patch, + status, + logger, + CLUSTER_STATE, + CLUSTER_STATUS_UPDATE_FAILED, + CONDITION_FALSE, + err, + override=True) def cron_backup( diff --git a/platforms/kubernetes/postgres-operator/postgres/typed.py b/platforms/kubernetes/postgres-operator/postgres/typed.py index 8703ca8..ebe4e94 100644 --- a/platforms/kubernetes/postgres-operator/postgres/typed.py +++ b/platforms/kubernetes/postgres-operator/postgres/typed.py @@ -1,5 +1,6 @@ import paramiko from typing import Dict, TypedDict, TypeVar, Optional, List, Optional, Callable, Tuple, Any +import six from constants import ( AUTOFAILOVER, POSTGRESQL, @@ -114,3 +115,52 @@ def get_number(self): def free_conns(self): for conn in self.conns: conn.free_conn() + + +class Conditions: + + def __init__(self, type: str, status: str, lastTransitionTime: str, + message: str): + # Type of cluster condition, same as status.state. Running/CreateFailed/UpdateFailed ... + self.type = type + + # Status of the condition, one of True/False/Unknown. + self.status = status + + # The last time this Condition type changed. + self.lastTransitionTime = lastTransitionTime + + # message is a human readable message indicating details about the transition. + self.message = message + + condition_type = { + 'type': 'str', + 'status': 'str', + 'lastTransitionTime': 'str', + 'message': 'str' + } + + def __str__(self): + return str(self.to_dict()) + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self.condition_type): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list( + map(lambda x: x.to_dict() + if hasattr(x, "to_dict") else x, value)) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict( + map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items())) + else: + result[attr] = value + + return result