Skip to content

Commit

Permalink
Do not send connections to a non-ready shard
Browse files Browse the repository at this point in the history
Co-authored-by: VyacheslavSemin <[email protected]>
Co-committed-by: VyacheslavSemin <[email protected]>
  • Loading branch information
VyacheslavSemin committed Oct 4, 2024
1 parent 9b888f2 commit 0adaabd
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 39 deletions.
29 changes: 12 additions & 17 deletions sources/scripts/add_shardkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
redisUser = os.environ.get('REDIS_SERVER_USER')
redisPassword = os.environ.get('REDIS_SERVER_PWD')
redisDBNum = os.environ.get('REDIS_SERVER_DB_KEYS_NUM')
redisDBNumDSVersion = os.environ.get('REDIS_SERVER_DB_DS_VERSION')
redisConnectTimeout = 15
if os.environ.get('REDIS_CLUSTER_NODES'):
redisClusterNodes = list(os.environ.get('REDIS_CLUSTER_NODES').split(" "))
Expand All @@ -20,9 +19,7 @@
shardKey = os.environ.get('DEFAULT_SHARD_KEY')
epIP = os.environ.get('SHARD_IP')
epPort = os.environ.get('SHARD_PORT')
dsVersion = os.environ.get('APP_VERSION') + '-' + os.environ.get('DS_VERSION_HASH')
ipShard = epIP + ':' + epPort
shardDSVersion = ipShard + '-' + dsVersion

total_result = {}

Expand Down Expand Up @@ -53,10 +50,10 @@ def get_redis_status():
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error(f'Failed to check the availability of the Redis Standalone... {msg_redis}\n')
logger_endpoints_ds.error(f'Failed to check the availability of the Redis Standalone... {msg_redis}\n')
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Standalone')
logger_endpoints_ds.info('Successful connection to Redis Standalone')
return rc.ping()


Expand All @@ -75,10 +72,10 @@ def get_redis_cluster_status():
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error(f'Failed to check the availability of the Redis Cluster... {msg_redis}\n')
logger_endpoints_ds.error(f'Failed to check the availability of the Redis Cluster... {msg_redis}\n')
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Cluster')
logger_endpoints_ds.info('Successful connection to Redis Cluster')
return rc.ping()


Expand All @@ -100,10 +97,10 @@ def get_redis_sentinel_status():
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error(f'Failed to check the availability of the Redis Sentinel... {msg_redis}\n')
logger_endpoints_ds.error(f'Failed to check the availability of the Redis Sentinel... {msg_redis}\n')
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Sentinel')
logger_endpoints_ds.info('Successful connection to Redis Sentinel')
return rc.ping()


Expand All @@ -112,18 +109,16 @@ def add_redis_key():
rc.set(shardKey, ipShard)
rc.append(ipShard, f' {shardKey}')
test_key = rc.get(shardKey).decode('utf-8')
rc.select(redisDBNumDSVersion)
rc.set(shardDSVersion, '0')
except Exception as msg_check_redis:
logger_test_ds.error(f'Error when trying to write a ShardKey to Redis... {msg_check_redis}\n')
logger_endpoints_ds.error(f'Error when trying to write a ShardKey to Redis... {msg_check_redis}\n')
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info(f'ShardKey {shardKey} = {test_key} was successfully recorded to Redis\n')
logger_endpoints_ds.info(f'ShardKey {shardKey} = {test_key} was successfully recorded to Redis\n')
rc.close()


def init_redis():
logger_test_ds.info('Checking Redis availability...')
logger_endpoints_ds.info('Checking Redis availability...')
if redisConnectorName == 'redis' and not os.environ.get('REDIS_CLUSTER_NODES'):
if get_redis_status() is True:
add_redis_key()
Expand All @@ -137,11 +132,11 @@ def init_redis():

def total_status():
if 'Failed' in total_result.values():
logger_test_ds.error('Recording of "ShardKey" in Redis failed')
logger_endpoints_ds.error('Recording of "ShardKey" in Redis failed')
sys.exit(1)


init_logger('test')
logger_test_ds = logging.getLogger('test.ds')
init_logger('endpoints')
logger_endpoints_ds = logging.getLogger('endpoints.ds')
init_redis()
total_status()
154 changes: 154 additions & 0 deletions sources/scripts/add_shardkey_dsversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import os
import sys
import logging
import time
import subprocess

redisConnectorName = os.environ.get('REDIS_CONNECTOR_NAME')
redisHost = os.environ.get('REDIS_SERVER_HOST')
redisPort = os.environ.get('REDIS_SERVER_PORT')
redisUser = os.environ.get('REDIS_SERVER_USER')
redisPassword = os.environ.get('REDIS_SERVER_PWD')
redisDBNumDSVersion = os.environ.get('REDIS_SERVER_DB_DS_VERSION')
redisConnectTimeout = 15
if os.environ.get('REDIS_CLUSTER_NODES'):
redisClusterNodes = list(os.environ.get('REDIS_CLUSTER_NODES').split(" "))
redisClusterNode = redisClusterNodes[0].split(":")[0]
redisClusterPort = redisClusterNodes[0].split(":")[1]
if redisConnectorName == 'ioredis':
redisSentinelGroupName = os.environ.get('REDIS_SENTINEL_GROUP_NAME')

shardKey = os.environ.get('DEFAULT_SHARD_KEY')
epIP = os.environ.get('SHARD_IP')
epPort = os.environ.get('SHARD_PORT')
dsVersion = os.environ.get('APP_VERSION') + '-' + os.environ.get('DS_VERSION_HASH')
ipShard = epIP + ':' + epPort
shardDSVersion = ipShard + '-' + dsVersion


def init_logger(name):
logger = logging.getLogger(name)
formatter = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logger.setLevel(logging.DEBUG)
stdout = logging.StreamHandler()
stdout.setFormatter(logging.Formatter(formatter))
stdout.setLevel(logging.DEBUG)
logger.addHandler(stdout)
logger.info('Running the Redis initialization script with the "shardKey" value for the replica\n')


def get_redis_status():
import redis
global rc
try:
rc = redis.Redis(
host=redisHost,
port=redisPort,
db=redisDBNumDSVersion,
password=redisPassword,
username=redisUser,
socket_connect_timeout=redisConnectTimeout,
retry_on_timeout=True
)
rc.ping()
except Exception as msg_redis:
logger_endpoints_ds.error('Failed to check the availability of the Redis Standalone... {}\n'.format(msg_redis))
else:
logger_endpoints_ds.info('Successful connection to Redis Standalone')
return rc.ping()


def get_redis_cluster_status():
from redis.cluster import RedisCluster as Redis
from redis.cluster import ClusterNode
global rc
try:
nodes = [ClusterNode(redisClusterNode, redisClusterPort)]
rc = Redis(
startup_nodes=nodes,
username=redisUser,
password=redisPassword,
socket_connect_timeout=redisConnectTimeout,
retry_on_timeout=True
)
rc.ping()
except Exception as msg_redis:
logger_endpoints_ds.error('Failed to check the availability of the Redis Cluster... {}\n'.format(msg_redis))
else:
logger_endpoints_ds.info('Successful connection to Redis Cluster')
return rc.ping()


def get_redis_sentinel_status():
import redis
from redis import Sentinel
global rc
try:
sentinel = Sentinel([(redisHost, redisPort)], socket_timeout=redisConnectTimeout)
master_host, master_port = sentinel.discover_master(redisSentinelGroupName)
rc = redis.Redis(
host=master_host,
port=master_port,
db=redisDBNumDSVersion,
password=redisPassword,
username=redisUser,
socket_connect_timeout=redisConnectTimeout,
retry_on_timeout=True
)
rc.ping()
except Exception as msg_redis:
logger_endpoints_ds.error('Failed to check the availability of the Redis Sentinel... {}\n'.format(msg_redis))
else:
logger_endpoints_ds.info('Successful connection to Redis Sentinel')
return rc.ping()


def add_redis_key():
if not rc.exists(shardDSVersion):
try:
rc.set(shardDSVersion, '0')
except Exception as msg_check_redis:
logger_endpoints_ds.error('Error when trying to write ShardKey with DS version to Redis... {}\n'.format(msg_check_redis))
else:
rc.close()
else:
rc.close()


def init_redis():
logger_endpoints_ds.info('Checking Redis availability...')
if redisConnectorName == 'redis' and not os.environ.get('REDIS_CLUSTER_NODES'):
if get_redis_status() is True:
add_redis_key()
elif redisConnectorName == 'redis' and os.environ.get('REDIS_CLUSTER_NODES'):
if get_redis_cluster_status() is True:
add_redis_key()
elif redisConnectorName == 'ioredis':
if get_redis_sentinel_status() is True:
add_redis_key()


def check_ds():
while True:
if not os.path.exists('/scripts/checkds.txt'):
try:
ds_status = ["/bin/bash", "-c", "curl -I -s http://localhost:8888/index.html | awk '/^HTTP/{print $2}'"]
ds_status_process = subprocess.Popen(ds_status, stdout=subprocess.PIPE)
ds_status_result = int(ds_status_process.communicate()[0])
if ds_status_result == 200:
init_redis()
build_status = open('/scripts/checkds.txt', 'w')
build_status.write('Completed')
build_status.close()
else:
time.sleep(1)
except Exception as msg_ds_status:
logger_endpoints_ds.error('Error when trying to get DS status... {}'. format(msg_ds_status))
time.sleep(1)
else:
time.sleep(5)


init_logger('endpoints')
logger_endpoints_ds = logging.getLogger('endpoints.ds')
check_ds()
32 changes: 16 additions & 16 deletions sources/scripts/remove_shardkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ def get_redis_status(redis_db):
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error('Failed to check the availability of the Redis Standalone... {}\n'.format(msg_redis))
logger_endpoints_ds.error('Failed to check the availability of the Redis Standalone... {}\n'.format(msg_redis))
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Standalone')
logger_endpoints_ds.info('Successful connection to Redis Standalone')
return rc.ping()


Expand All @@ -80,10 +80,10 @@ def get_redis_cluster_status():
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error('Failed to check the availability of the Redis Cluster... {}\n'.format(msg_redis))
logger_endpoints_ds.error('Failed to check the availability of the Redis Cluster... {}\n'.format(msg_redis))
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Cluster')
logger_endpoints_ds.info('Successful connection to Redis Cluster')
return rc.ping()


Expand All @@ -105,10 +105,10 @@ def get_redis_sentinel_status(redis_db):
)
rc.ping()
except Exception as msg_redis:
logger_test_ds.error('Failed to check the availability of the Redis Sentinel... {}\n'.format(msg_redis))
logger_endpoints_ds.error('Failed to check the availability of the Redis Sentinel... {}\n'.format(msg_redis))
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Successful connection to Redis Sentinel')
logger_endpoints_ds.info('Successful connection to Redis Sentinel')
return rc.ping()


Expand All @@ -127,17 +127,17 @@ def clear_shard_key():
rc.delete(shardDSVersion)
rc.close()
except Exception as msg_check_redis:
logger_test_ds.error('Error when trying to delete keys belonging to the {sk} shard from Redis... {em}\n'.format(sk=shardKey, em=msg_check_redis))
logger_endpoints_ds.error('Error when trying to delete keys belonging to the {sk} shard from Redis... {em}\n'.format(sk=shardKey, em=msg_check_redis))
total_result['CheckRedis'] = 'Failed'
else:
logger_test_ds.info('Keys belonging to {} have been successfully deleted from Redis\n'.format(shardKey))
logger_endpoints_ds.info('Keys belonging to {} have been successfully deleted from Redis\n'.format(shardKey))
else:
logger_test_ds.info('Endpoint shard {} was not found in Redis\n'.format(shardKey))
logger_endpoints_ds.info('Endpoint shard {} was not found in Redis\n'.format(shardKey))
rc.close()


def clear_redis():
logger_test_ds.info('Checking Redis availability...')
logger_endpoints_ds.info('Checking Redis availability...')
if redisConnectorName == 'redis' and not os.environ.get('REDIS_CLUSTER_NODES'):
if get_redis_status(redisDBNum) is True:
clear_shard_key()
Expand All @@ -160,7 +160,7 @@ def get_connect_count():
else:
return False
except Exception as msg_get_connect_count:
logger_test_ds.error('Failed when trying to get the number of connections... {}\n'.format(msg_get_connect_count))
logger_endpoints_ds.error('Failed when trying to get the number of connections... {}\n'.format(msg_get_connect_count))
total_result['GetConnectCount'] = 'Failed'
return False

Expand All @@ -171,7 +171,7 @@ def shutdown_shard():
process = subprocess.Popen(shutdown_cmd, stdout=subprocess.PIPE)
shutdown_result = process.communicate()[0].decode('utf-8')
except Exception as msg_url:
logger_test_ds.error('Failed to check the availability of the DocumentServer... {}\n'.format(msg_url))
logger_endpoints_ds.error('Failed to check the availability of the DocumentServer... {}\n'.format(msg_url))
total_result['ShutdownDS'] = 'Failed'
else:
if shutdown_result == "true":
Expand All @@ -180,7 +180,7 @@ def shutdown_shard():
build_status.write('Completed')
build_status.close()
else:
logger_test_ds.error('The {} shard could not be disabled'.format(shardKey))
logger_endpoints_ds.error('The {} shard could not be disabled'.format(shardKey))
sys.exit(1)


Expand All @@ -202,11 +202,11 @@ def prepare_for_shutdown_shard():

def total_status():
if 'Failed' in total_result.values():
logger_test_ds.error('Could not clear Redis of keys belonging to {}'.format(shardKey))
logger_endpoints_ds.error('Could not clear Redis of keys belonging to {}'.format(shardKey))
sys.exit(1)


init_logger('test')
logger_test_ds = logging.getLogger('test.ds')
init_logger('endpoints')
logger_endpoints_ds = logging.getLogger('endpoints.ds')
prepare_for_shutdown_shard()
total_status()
14 changes: 14 additions & 0 deletions templates/configmaps/add-shardkey-dsversion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: add-shardkey-dsversion
namespace: {{ include "ds.namespace" . | quote }}
{{- if .Values.commonLabels }}
labels:
{{- include "ds.labels.commonLabels" . | trim | nindent 4 }}
{{- end }}
{{- if .Values.commonAnnotations }}
annotations: {{- include "ds.annotations.commonAnnotations" ( dict "keyName" .Values.commonAnnotations "context" $ ) | nindent 4 }}
{{- end }}
data:
{{ (.Files.Glob "sources/scripts/add_shardkey_dsversion.py").AsConfig | indent 2 }}
Loading

0 comments on commit 0adaabd

Please sign in to comment.