Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add status.conditions field,provide more detailed status #32

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions platforms/kubernetes/postgres-operator/postgres/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@
# storage
STORAGE_CLASS_NAME = "storageClassName"

# conditions
CONDITIONS = "conditions"
CONDITIONS_LIMIT = 10
CONDITION_TRUE = "True"
CONDITION_FALSE = "False"
CONDITION_UNKNOWN = "Unknown"

# pod PriorityClass
SPEC_POD_PRIORITY_CLASS = "priorityClassName"
SPEC_POD_PRIORITY_CLASS_SCOPE_NODE = "system-node-critical"
Expand Down
288 changes: 103 additions & 185 deletions platforms/kubernetes/postgres-operator/postgres/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,182 +13,10 @@
from kubernetes import client, config
from kubernetes.stream import stream
from config import operator_config
from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List
from typed import LabelType, InstanceConnection, InstanceConnections, TypedDict, InstanceConnectionMachine, InstanceConnectionK8S, Tuple, Any, List, Conditions
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

from constants import (
VIP,
RADONDB_POSTGRES,
POSTGRES_OPERATOR,
AUTOFAILOVER,
POSTGRESQL,
READWRITEINSTANCE,
READONLYINSTANCE,
MACHINES,
ACTION,
ACTION_START,
ACTION_STOP,
IMAGE,
PODSPEC,
SPEC,
CONTAINERS,
CONTAINER_NAME,
PODSPEC_CONTAINERS_POSTGRESQL_CONTAINER,
PODSPEC_CONTAINERS_EXPORTER_CONTAINER,
SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_MEMORY,
SPEC_POSTGRESQL_READWRITE_RESOURCES_LIMITS_CPU,
PRIME_SERVICE_PORT_NAME,
EXPORTER_SERVICE_PORT_NAME,
HBAS,
CONFIGS,
REPLICAS,
VOLUMECLAIMTEMPLATES,
AUTOCTL_NODE,
PGAUTOFAILOVER_REPLICATOR,
STREAMING,
STREAMING_SYNC,
STREAMING_ASYNC,
DELETE_PVC,
POSTGRESQL_PVC_NAME,
SUCCESS,
FAILED,
SERVICES,
SELECTOR,
SERVICE_AUTOFAILOVER,
SERVICE_PRIMARY,
SERVICE_STANDBY,
SERVICE_READONLY,
SERVICE_STANDBY_READONLY,
SPEC_POSTGRESQL_USERS,
SPEC_POSTGRESQL_USERS_ADMIN,
SPEC_POSTGRESQL_USERS_MAINTENANCE,
SPEC_POSTGRESQL_USERS_NORMAL,
SPEC_POSTGRESQL_USERS_USER_NAME,
SPEC_POSTGRESQL_USERS_USER_PASSWORD,
API_GROUP,
API_VERSION_V1,
RESOURCE_POSTGRESQL,
RESOURCE_KIND_POSTGRESQL,
CLUSTER_STATE,
CLUSTER_CREATE_BEGIN,
CLUSTER_CREATE_ADD_FAILOVER,
CLUSTER_CREATE_ADD_READWRITE,
CLUSTER_CREATE_ADD_READONLY,
CLUSTER_CREATE_FINISH,
BASE_LABEL_PART_OF,
BASE_LABEL_MANAGED_BY,
BASE_LABEL_NAME,
BASE_LABEL_NAMESPACE,
LABEL_NODE,
LABEL_NODE_AUTOFAILOVER,
LABEL_NODE_POSTGRESQL,
LABEL_NODE_USER_SERVICES,
LABEL_NODE_STATEFULSET_SERVICES,
LABEL_SUBNODE,
LABEL_SUBNODE_READWRITE,
LABEL_SUBNODE_AUTOFAILOVER,
LABEL_SUBNODE_READONLY,
LABEL_ROLE,
LABEL_ROLE_PRIMARY,
LABEL_ROLE_STANDBY,
LABEL_STATEFULSET_NAME,
MACHINE_MODE,
K8S_MODE,
PGHOME,
DOCKER_COMPOSE_FILE,
DOCKER_COMPOSE_FILE_DATA,
DOCKER_COMPOSE_ENV,
DOCKER_COMPOSE_ENV_DATA,
DOCKER_COMPOSE_ENVFILE,
DOCKER_COMPOSE_EXPORTER_ENVFILE,
DOCKER_COMPOSE_DIR,
PGDATA_DIR,
ASSIST_DIR,
DATA_DIR,
INIT_FINISH,
PG_CONFIG_PREFIX,
PG_HBA_PREFIX,
RESTORE,
RESTORE_FROMSSH,
RESTORE_FROMSSH_PATH,
RESTORE_FROMSSH_ADDRESS,
RESTORE_FROMSSH_LOCAL,
PG_DATABASE_DIR,
PG_DATABASE_RESTORING_DIR,
LVS_BODY,
LVS_REAL_MAIN_SERVER,
LVS_REAL_READ_SERVER,
LVS_REAL_EMPTY_SERVER,
LVS_SET_NET,
LVS_UNSET_NET,
CLUSTER_STATUS_CREATE,
CLUSTER_STATUS_UPDATE,
CLUSTER_STATUS_RUN,
CLUSTER_STATUS_STOP,
CLUSTER_STATUS_CREATE_FAILED,
CLUSTER_STATUS_UPDATE_FAILED,
CLUSTER_STATUS_TERMINATE,
CLUSTER_STATUS,
SPEC_ANTIAFFINITY,
SPEC_ANTIAFFINITY_POLICY,
SPEC_ANTIAFFINITY_REQUIRED,
SPEC_ANTIAFFINITY_PREFERRED,
SPEC_ANTIAFFINITY_POLICY_REQUIRED,
SPEC_ANTIAFFINITY_POLICY_PREFERRED,
SPEC_ANTIAFFINITY_PODANTIAFFINITYTERM,
SPEC_ANTIAFFINITY_TOPOLOGYKEY,
SPEC_VOLUME_TYPE,
SPEC_VOLUME_LOCAL,
SPEC_VOLUME_CLOUD,
SECONDS,
MINUTES,
HOURS,
DAYS,
UPDATE_TOLERATION,
SPEC_POD_PRIORITY_CLASS,
SPEC_POD_PRIORITY_CLASS_SCOPE_NODE,
SPEC_POD_PRIORITY_CLASS_SCOPE_CLUSTER,
SPEC_S3,
SPEC_S3_ACCESS_KEY,
SPEC_S3_SECRET_KEY,
SPEC_S3_ENDPOINT,
SPEC_S3_BUCKET,
SPEC_S3_PATH,
SPEC_BACKUPCLUSTER,
SPEC_BACKUPTOS3,
SPEC_BACKUPTOS3_NAME,
SPEC_BACKUPTOS3_MANUAL,
SPEC_BACKUPTOS3_MANUAL_TRIGGER_ID,
SPEC_BACKUPTOS3_CRON,
SPEC_BACKUPTOS3_CRON_ENABLE,
SPEC_BACKUPTOS3_CRON_SCHEDULE,
SPEC_BACKUPTOS3_POLICY,
SPEC_BACKUPTOS3_POLICY_ARCHIVE,
SPEC_BACKUPTOS3_POLICY_ARCHIVE_DEFAULT_VALUE,
SPEC_BACKUPTOS3_POLICY_COMPRESSION,
SPEC_BACKUPTOS3_POLICY_COMPRESSION_DEFAULT_VALUE,
SPEC_BACKUPTOS3_POLICY_ENCRYPTION,
SPEC_BACKUPTOS3_POLICY_ENCRYPTION_DEFAULT_VALUE,
SPEC_BACKUPTOS3_POLICY_RETENTION,
SPEC_BACKUPTOS3_POLICY_RETENTION_DEFAULT_VALUE,
SPEC_BACKUPTOS3_POLICY_RETENTION_DELETE_ALL_VALUE,
RESTORE_FROMS3,
RESTORE_FROMS3_NAME,
RESTORE_FROMS3_RECOVERY,
CLUSTER_STATUS_BACKUP,
CLUSTER_STATUS_ARCHIVE,
CLUSTER_STATUS_CRON_NEXT_RUN,
RESTORE_FROMS3_RECOVERY_LATEST,
RESTORE_FROMS3_RECOVERY_LATEST_FULL,
RESTORE_FROMS3_RECOVERY_OLDEST_FULL,
RECOVERY_FINISH,
PG_LOG_FILENAME,
SPEC_REBUILD,
SPCE_REBUILD_NODENAMES,
SPEC_DELETE_S3,
STORAGE_CLASS_NAME,
)
from constants import *

PGLOG_DIR = "log"
PRIMARY_FORMATION = " --formation primary "
Expand Down Expand Up @@ -400,6 +228,53 @@ def set_password(patch: kopf.Patch, status: kopf.Status) -> None:
password_length))


def patch_cluster_conditions(
patch: kopf.Patch,
status: kopf.Status,
logger: logging.Logger,
type: str,
condition_status: str,
message: str,
override: bool = False,
) -> None:
condition = Conditions(
type, condition_status,
time.strftime(DEFAULT_TIME_FORMAT, time.localtime()),
str(message)).to_dict()
conditions = status.get(CONDITIONS, [])

if not isinstance(conditions, list):
logger.warning(
f"patch_cluster_conditions failed, conditions is not list.")
return

if len(conditions) > 0 and override:
conditions.pop()
conditions.append(condition)
# check conditions limit
if len(conditions) > CONDITIONS_LIMIT:
conditions = conditions[-1 * CONDITIONS_LIMIT:]

patch.status[CONDITIONS] = conditions


def set_cluster_status_and_patch_conditions(
meta: kopf.Meta,
spec: kopf.Spec,
patch: kopf.Patch,
status: kopf.Status,
logger: logging.Logger,
statefield: str,
state: str,
condition_status: str,
condition_message: str,
override: bool = False,
) -> None:
set_cluster_status(meta, statefield, state, logger)
patch_cluster_conditions(patch, status, logger, state, condition_status,
condition_message, override)


def create_statefulset_service(
name: str,
external_name: str,
Expand Down Expand Up @@ -3444,7 +3319,10 @@ def create_cluster(
logger: logging.Logger,
) -> None:
try:
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE, logger)
set_cluster_status_and_patch_conditions(meta, spec, patch, status,
logger, CLUSTER_STATE,
CLUSTER_STATUS_CREATE,
CONDITION_UNKNOWN, '')

logging.info("check create_cluster params")
check_param(spec, logger, create=True)
Expand All @@ -3464,13 +3342,30 @@ def create_cluster(
time.sleep(5)
# cluster running
update_number_sync_standbys(meta, spec, patch, status, logger)
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_RUN, logger)
set_cluster_status_and_patch_conditions(meta,
spec,
patch,
status,
logger,
CLUSTER_STATE,
CLUSTER_STATUS_RUN,
CONDITION_TRUE,
'',
override=True)
except Exception as e:
logger.error(f"error occurs, {e}")
traceback.print_exc()
traceback.format_exc()
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_CREATE_FAILED,
logger)
err = traceback.format_exc()
set_cluster_status_and_patch_conditions(meta,
spec,
patch,
status,
logger,
CLUSTER_STATE,
CLUSTER_STATUS_CREATE_FAILED,
CONDITION_FALSE,
err,
override=True)


def delete_cluster(
Expand All @@ -3480,7 +3375,10 @@ def delete_cluster(
status: kopf.Status,
logger: logging.Logger,
) -> None:
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_TERMINATE, logger)
set_cluster_status_and_patch_conditions(meta, spec, patch, status, logger,
CLUSTER_STATE,
CLUSTER_STATUS_TERMINATE,
CONDITION_UNKNOWN, '')
delete_postgresql_cluster(meta, spec, patch, status, logger)


Expand Down Expand Up @@ -5544,7 +5442,10 @@ def update_cluster(
diffs: kopf.Diff,
) -> None:
try:
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE, logger)
set_cluster_status_and_patch_conditions(meta, spec, patch, status,
logger, CLUSTER_STATE,
CLUSTER_STATUS_UPDATE,
CONDITION_UNKNOWN, '')
logger.info("check update_cluster params")
check_param(spec, logger, create=False)
need_roll_update = False
Expand Down Expand Up @@ -5665,13 +5566,30 @@ def update_cluster(
else:
cluster_status = CLUSTER_STATUS_RUN
# set Running
set_cluster_status(meta, CLUSTER_STATE, cluster_status, logger)
set_cluster_status_and_patch_conditions(meta,
spec,
patch,
status,
logger,
CLUSTER_STATE,
cluster_status,
CONDITION_TRUE,
'',
override=True)
except Exception as e:
logger.error(f"error occurs, {e}")
traceback.print_exc()
traceback.format_exc()
set_cluster_status(meta, CLUSTER_STATE, CLUSTER_STATUS_UPDATE_FAILED,
logger)
err = traceback.format_exc()
set_cluster_status_and_patch_conditions(meta,
spec,
patch,
status,
logger,
CLUSTER_STATE,
CLUSTER_STATUS_UPDATE_FAILED,
CONDITION_FALSE,
err,
override=True)


def cron_backup(
Expand Down
Loading