forked from scylladb/scylla-cluster-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
upgrade_test.py
599 lines (509 loc) · 31.6 KB
/
upgrade_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2016 ScyllaDB
import random
import time
import re
import os
from functools import wraps
from pkg_resources import parse_version
from sdcm.fill_db_data import FillDatabaseData
from sdcm import wait
from sdcm.utils.version_utils import is_enterprise
from sdcm.sct_events import DbEventsFilter
def truncate_entries(func):
@wraps(func)
def inner(self, *args, **kwargs):
# Perform validation of truncate entries in case the new version is 3.1 or more
node = args[0]
if self.truncate_entries_flag:
base_version = self.params.get('scylla_version', default='')
system_truncated = bool(parse_version(base_version) >= parse_version('3.1')
and not is_enterprise(base_version))
with self.cql_connection_patient(node, keyspace='truncate_ks') as session:
self.cql_truncate_simple_tables(session=session, rows=self.insert_rows)
self.validate_truncated_entries_for_table(session=session, system_truncated=system_truncated)
func_result = func(self, *args, **kwargs)
result = node.remoter.run('scylla --version')
new_version = result.stdout
if new_version and parse_version(new_version) >= parse_version('3.1'):
# re-new connection
with self.cql_connection_patient(node, keyspace='truncate_ks') as session:
self.validate_truncated_entries_for_table(session=session, system_truncated=True)
self.read_data_from_truncated_tables(session=session)
self.cql_insert_data_to_simple_tables(session=session, rows=self.insert_rows)
return func_result
return inner
class UpgradeTest(FillDatabaseData):
"""
Test a Scylla cluster upgrade.
"""
orig_ver = None
new_ver = None
# `major_release` (eg: 2.1 <-> 2.2, 2017.1 <-> 2018.1)
# `reinstall` (opensource <-> enterprise, enterprise <-> opensource)
# `minor_release` (eg: 2.2.1 <-> 2.2.5, 2018.1.0 <-> 2018.1.1)
upgrade_rollback_mode = None
# expected format version after upgrade and nodetool upgradesstables called
# would be recalculated after all the cluster finish upgrade
expected_sstable_format_version = 'mc'
insert_rows = None
truncate_entries_flag = False
def read_data_from_truncated_tables(self, session):
session.execute("USE truncate_ks")
truncate_query = 'SELECT COUNT(*) FROM {}'
tables_name = self.get_tables_name_of_keyspace(session=session, keyspace_name='truncate_ks')
for table_name in tables_name:
count = self.rows_to_list(session.execute(truncate_query.format(table_name)))
self.assertEqual(str(count[0][0]), '0',
msg='Expected that there is no data in the table truncate_ks.{}, but found {} rows'
.format(table_name, count[0][0]))
def validate_truncated_entries_for_table(self, session, system_truncated=False): # pylint: disable=invalid-name
tables_id = self.get_tables_id_of_keyspace(session=session, keyspace_name='truncate_ks')
for table_id in tables_id:
if system_truncated:
# validate truncation entries in the system.truncated table - expected entry
truncated_time = self.get_truncated_time_from_system_truncated(session=session, table_id=table_id)
self.assertTrue(truncated_time,
msg='Expected truncated entry in the system.truncated table, but it\'s not found')
# validate truncation entries in the system.local table - not expected entry
truncated_time = self.get_truncated_time_from_system_local(session=session)
if system_truncated:
self.assertEqual(truncated_time, [[None]],
msg='Not expected truncated entry in the system.local table, but it\'s found')
else:
self.assertTrue(truncated_time,
msg='Expected truncated entry in the system.local table, but it\'s not found')
@truncate_entries
def upgrade_node(self, node):
# pylint: disable=too-many-branches,too-many-statements
new_scylla_repo = self.params.get('new_scylla_repo', default=None)
new_version = self.params.get('new_version', default='')
upgrade_node_packages = self.params.get('upgrade_node_packages', default=None)
self.log.info('Upgrading a Node')
node.upgrade_system()
# We assume that if update_db_packages is not empty we install packages from there.
# In this case we don't use upgrade based on new_scylla_repo(ignored sudo yum update scylla...)
result = node.remoter.run('scylla --version')
self.orig_ver = result.stdout
if upgrade_node_packages:
# update_scylla_packages
node.remoter.send_files(upgrade_node_packages, '/tmp/scylla', verbose=True)
# node.remoter.run('sudo yum update -y --skip-broken', connect_timeout=900)
node.remoter.run('sudo yum install python34-PyYAML -y')
# replace the packages
node.remoter.run(r'rpm -qa scylla\*')
# flush all memtables to SSTables
node.run_nodetool("drain")
node.run_nodetool("snapshot")
node.stop_scylla_server()
# update *development* packages
node.remoter.run('sudo rpm -UvhR --oldpackage /tmp/scylla/*development*', ignore_status=True)
# and all the rest
node.remoter.run('sudo rpm -URvh --replacefiles /tmp/scylla/*.rpm | true')
node.remoter.run(r'rpm -qa scylla\*')
elif new_scylla_repo:
# backup the data
node.remoter.run('sudo cp /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml-backup')
if node.is_rhel_like():
node.remoter.run('sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup')
node.remoter.run(
r'for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ); do sudo cp -v $conf $conf.autobackup; done')
else:
node.remoter.run('sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup')
node.remoter.run(
r'for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles | grep -v init ); do sudo cp -v $conf $conf.backup; done')
assert new_scylla_repo.startswith('http')
node.download_scylla_repo(new_scylla_repo)
# flush all memtables to SSTables
node.run_nodetool("drain")
node.run_nodetool("snapshot")
node.stop_scylla_server(verify_down=False)
orig_is_enterprise = node.is_enterprise
if node.is_rhel_like():
result = node.remoter.run("sudo yum search scylla-enterprise", ignore_status=True)
new_is_enterprise = True if ('scylla-enterprise.x86_64' in result.stdout or
'No matches found' not in result.stdout) else False
else:
result = node.remoter.run("sudo apt-cache search scylla-enterprise", ignore_status=True)
new_is_enterprise = True if 'scylla-enterprise' in result.stdout else False
scylla_pkg = 'scylla-enterprise' if new_is_enterprise else 'scylla'
if orig_is_enterprise != new_is_enterprise:
self.upgrade_rollback_mode = 'reinstall'
ver_suffix = r'\*{}'.format(new_version) if new_version else ''
if self.upgrade_rollback_mode == 'reinstall':
if node.is_rhel_like():
node.remoter.run(r'sudo yum remove scylla\* -y')
node.remoter.run('sudo yum install {}{} -y'.format(scylla_pkg, ver_suffix))
node.remoter.run(
r'for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ); do sudo cp -v $conf.autobackup $conf; done')
else:
node.remoter.run(r'sudo apt-get remove scylla\* -y')
# fixme: add publick key
node.remoter.run(
r'sudo apt-get install {}{} -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" --force-yes --allow-unauthenticated'.format(scylla_pkg, ver_suffix))
node.remoter.run(r'for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles'
r' | grep -v init ); do sudo cp -v $conf $conf.backup-2.1; done')
else:
if node.is_rhel_like():
node.remoter.run(r'sudo yum update {}{}\* -y'.format(scylla_pkg, ver_suffix))
else:
node.remoter.run('sudo apt-get update')
node.remoter.run(
r'sudo apt-get dist-upgrade {} -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" --force-yes --allow-unauthenticated'.format(scylla_pkg))
if self.params.get('test_sst3', default=None):
node.remoter.run("echo 'enable_sstables_mc_format: true' |sudo tee --append /etc/scylla/scylla.yaml")
if self.params.get('test_upgrade_from_installed_3_1_0', default=None):
node.remoter.run("echo 'enable_3_1_0_compatibility_mode: true' |sudo tee --append /etc/scylla/scylla.yaml")
authorization_in_upgrade = self.params.get('authorization_in_upgrade', default=None)
if authorization_in_upgrade:
node.remoter.run("echo 'authorizer: \"%s\"' |sudo tee --append /etc/scylla/scylla.yaml" %
authorization_in_upgrade)
node.start_scylla_server()
node.wait_db_up(verbose=True)
result = node.remoter.run('scylla --version')
new_ver = result.stdout
assert self.orig_ver != self.new_ver, "scylla-server version isn't changed"
self.new_ver = new_ver
self.upgradesstables_if_command_available(node)
@truncate_entries
def rollback_node(self, node):
# pylint: disable=too-many-branches,too-many-statements
self.log.info('Rollbacking a Node')
# fixme: auto identify new_introduced_pkgs, remove this parameter
new_introduced_pkgs = self.params.get('new_introduced_pkgs', default=None)
result = node.remoter.run('scylla --version')
orig_ver = result.stdout
# flush all memtables to SSTables
node.run_nodetool("drain")
# backup the data
node.run_nodetool("snapshot")
node.stop_scylla_server(verify_down=False)
node.remoter.run('sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml')
if node.is_rhel_like():
node.remoter.run('sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo')
node.remoter.run('sudo chown root.root /etc/yum.repos.d/scylla.repo')
node.remoter.run('sudo chmod 644 /etc/yum.repos.d/scylla.repo')
else:
node.remoter.run('sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list')
node.remoter.run('sudo chown root.root /etc/apt/sources.list.d/scylla.list')
node.remoter.run('sudo chmod 644 /etc/apt/sources.list.d/scylla.list')
node.update_repo_cache()
if re.findall(r'\d+.\d+', self.orig_ver)[0] == re.findall(r'\d+.\d+', self.new_ver)[0]:
self.upgrade_rollback_mode = 'minor_release'
if self.upgrade_rollback_mode == 'reinstall' or not node.is_rhel_like():
if node.is_rhel_like():
node.remoter.run(r'sudo yum remove scylla\* -y')
node.remoter.run(r'sudo yum install %s -y' % node.scylla_pkg())
node.remoter.run(
r'for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ); do sudo cp -v $conf.autobackup $conf; done')
else:
node.remoter.run(r'sudo apt-get remove scylla\* -y')
node.remoter.run(
r'sudo apt-get install %s -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" --force-yes --allow-unauthenticated' % node.scylla_pkg())
node.remoter.run(
r'for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles | grep -v init ); do sudo cp -v $conf.backup $conf; done')
if not node.is_ubuntu14():
node.remoter.run('sudo systemctl daemon-reload')
elif self.upgrade_rollback_mode == 'minor_release':
node.remoter.run(r'sudo yum downgrade scylla\*%s -y' % self.orig_ver.split('-')[0])
else:
if new_introduced_pkgs:
node.remoter.run('sudo yum remove %s -y' % new_introduced_pkgs)
node.remoter.run(r'sudo yum downgrade scylla\* -y')
if new_introduced_pkgs:
node.remoter.run('sudo yum install %s -y' % node.scylla_pkg())
if node.is_rhel_like():
node.remoter.run(
r'for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ); do sudo cp -v $conf.autobackup $conf; done')
else:
node.remoter.run(
r'for conf in $(cat /var/lib/dpkg/info/scylla-*server.conffiles /var/lib/dpkg/info/scylla-*conf.conffiles /var/lib/dpkg/info/scylla-*jmx.conffiles | grep -v init ); do sudo cp -v $conf.backup $conf; done')
if not node.is_ubuntu14():
node.remoter.run('sudo systemctl daemon-reload')
result = node.remoter.run('sudo find /var/lib/scylla/data/system')
snapshot_name = re.findall(r"system/peers-[a-z0-9]+/snapshots/(\d+)\n", result.stdout)
# cmd = r"DIR='/var/lib/scylla/data/system'; for i in `sudo ls $DIR`;do sudo test -e $DIR/$i/snapshots/%s && sudo find $DIR/$i/snapshots/%s -type f -exec sudo /bin/cp {} $DIR/$i/ \;; done" % (snapshot_name[0], snapshot_name[0])
# recover the system tables
if self.params.get('recover_system_tables', default=None):
node.remoter.send_files('./data_dir/recover_system_tables.sh', '/tmp/')
node.remoter.run('bash /tmp/recover_system_tables.sh %s' % snapshot_name[0], verbose=True)
if self.params.get('test_sst3', default=None):
node.remoter.run(
r'sudo sed -i -e "s/enable_sstables_mc_format:/#enable_sstables_mc_format:/g" /etc/scylla/scylla.yaml')
if self.params.get('test_upgrade_from_installed_3_1_0', default=None):
node.remoter.run(
r'sudo sed -i -e "s/enable_3_1_0_compatibility_mode:/#enable_3_1_0_compatibility_mode:/g" /etc/scylla/scylla.yaml')
if self.params.get('remove_authorization_in_rollback', default=None):
node.remoter.run('sudo sed -i -e "s/authorizer:/#authorizer:/g" /etc/scylla/scylla.yaml')
node.start_scylla_server()
result = node.remoter.run('scylla --version')
new_ver = result.stdout
self.log.debug('original scylla-server version is %s, latest: %s', orig_ver, new_ver)
assert orig_ver != new_ver, "scylla-server version isn't changed"
self.upgradesstables_if_command_available(node)
def upgradesstables_if_command_available(self, node, queue=None): # pylint: disable=invalid-name
upgradesstables_available = False
upgradesstables_supported = node.remoter.run(
'nodetool help | grep -q upgradesstables && echo "yes" || echo "no"')
if "yes" in upgradesstables_supported.stdout:
upgradesstables_available = True
self.log.debug("calling upgradesstables")
node.run_nodetool(sub_cmd="upgradesstables", args="-a")
if queue:
queue.put(upgradesstables_available)
queue.task_done()
def get_highest_supported_sstable_version(self): # pylint: disable=invalid-name
"""
find the highest sstable format version supported in the cluster
:return:
"""
sstable_format_regex = re.compile(r'Feature (.*)_SSTABLE_FORMAT is enabled')
versions_set = set()
for node in self.db_cluster.nodes:
if os.path.exists(node.database_log):
for line in open(node.database_log).readlines():
match = sstable_format_regex.search(line)
if match:
versions_set.add(match.group(1).lower())
return max(versions_set)
def wait_for_sstable_upgrade(self, node, queue=None):
all_tables_upgraded = True
def wait_for_node_to_finish():
try:
result = node.remoter.run(
"sudo find /var/lib/scylla/data/system -type f ! -path '*snapshots*' | xargs -I{} basename {}")
all_sstable_files = result.stdout.splitlines()
sstable_version_regex = re.compile(r'(\w+)-\d+-(.+)\.(db|txt|sha1|crc32)')
sstable_versions = list(
set([sstable_version_regex.search(f).group(1) for f in all_sstable_files if sstable_version_regex.search(f)]))
assert len(sstable_versions) == 1, "expected all table format to be the same found {}".format(sstable_versions)
assert sstable_versions[0] == self.expected_sstable_format_version, "expected to format version to be '{}', found '{}'".format(
self.expected_sstable_format_version, sstable_versions[0])
except Exception as ex: # pylint: disable=broad-except
self.log.warning(ex)
return False
else:
return True
try:
self.log.debug("Starting to wait for upgardesstables to finish")
wait.wait_for(func=wait_for_node_to_finish, step=30, timeout=900, throw_exc=True,
text="Waiting until upgardesstables is finished")
except Exception: # pylint: disable=broad-except
all_tables_upgraded = False
finally:
if queue:
queue.put(all_tables_upgraded)
queue.task_done()
default_params = {'timeout': 650000}
def test_upgrade_cql_queries(self):
"""
Run a set of different cql queries against various types/tables before
and after upgrade of every node to check the consistency of data
"""
self.truncate_entries_flag = False # not perform truncate entries test
self.log.info('Populate DB with many types of tables and data')
self.fill_db_data()
self.log.info('Run some Queries to verify data BEFORE UPGRADE')
self.verify_db_data()
self.log.info('Starting c-s write workload to pupulate 10M paritions')
# YAML: stress_cmd: cassandra-stress write cl=QUORUM n=10000000 -schema 'replication(factor=3)' -port jmx=6868
# -mode cql3 native -rate threads=1000 -pop seq=1..10000000
stress_cmd = self._cs_add_node_flag(self.params.get('stress_cmd'))
self.run_stress_thread(stress_cmd=stress_cmd)
self.log.info('Sleeping for 360s to let cassandra-stress populate some data before the mixed workload')
time.sleep(600)
self.log.info('Starting c-s read workload for 60m')
# YAML: stress_cmd_1: cassandra-stress read cl=QUORUM duration=60m -schema 'replication(factor=3)'
# -port jmx=6868 -mode cql3 native -rate threads=100 -pop seq=1..10000000
stress_cmd_1 = self._cs_add_node_flag(self.params.get('stress_cmd_1'))
stress_queue = self.run_stress_thread(stress_cmd=stress_cmd_1)
self.log.info('Sleeping for 300s to let cassandra-stress start before the upgrade...')
time.sleep(300)
nodes_num = len(self.db_cluster.nodes)
# prepare an array containing the indexes
indexes = [x for x in range(nodes_num)]
# shuffle it so we will upgrade the nodes in a random order
random.shuffle(indexes)
# upgrade all the nodes in random order
for i in indexes:
self.db_cluster.node_to_upgrade = self.db_cluster.nodes[i]
self.log.info('Upgrade Node %s begin', self.db_cluster.node_to_upgrade.name)
self.upgrade_node(self.db_cluster.node_to_upgrade)
time.sleep(300)
self.log.info('Upgrade Node %s ended', self.db_cluster.node_to_upgrade.name)
self.log.info('Run some Queries to verify data AFTER UPGRADE')
self.verify_db_data()
self.verify_stress_thread(stress_queue)
def fill_and_verify_db_data(self, note, pre_fill=False, rewrite_data=True):
if pre_fill:
self.log.info('Populate DB with many types of tables and data')
self.fill_db_data()
self.log.info('Run some Queries to verify data %s', note)
self.verify_db_data()
if rewrite_data:
self.clean_db_data()
self.log.info('Re-Populate DB with many types of tables and data')
self.fill_db_data()
def test_rolling_upgrade(self): # pylint: disable=too-many-locals,too-many-statements
"""
Upgrade half of nodes in the cluster, and start special read workload
during the stage. Checksum method is changed to xxhash from Scylla 2.2,
we want to use this case to verify the read (cl=ALL) workload works
well, upgrade all nodes to new version in the end.
"""
filter_errors = [{'line': 'Failed to load schema', 'type': 'DATABASE_ERROR'},
{'line': 'Failed to load schema', 'type': 'SCHEMA_FAILURE'},
{'line': 'Failed to pull schema', 'type': 'DATABASE_ERROR'},
{'line': 'Backtrace:', 'type': 'BACKTRACE'}]
for error in filter_errors:
DbEventsFilter(type=error['type'], line=error['line'])
# In case the target version >= 3.1 we need to perform test for truncate entries
target_upgrade_version = self.params.get('target_upgrade_version', default='')
self.truncate_entries_flag = False
if target_upgrade_version and parse_version(target_upgrade_version) >= parse_version('3.1') and \
not is_enterprise(target_upgrade_version):
self.truncate_entries_flag = True
self.fill_and_verify_db_data('BEFORE UPGRADE', pre_fill=True)
# write workload during entire test
self.log.info('Starting c-s write workload during entire test')
write_stress_during_entire_test = self.params.get('write_stress_during_entire_test')
entire_write_cs_thread_pool = self.run_stress_thread(stress_cmd=write_stress_during_entire_test)
# complex workload: prepare write
self.log.info('Starting c-s complex workload (5M) to prepare data')
stress_cmd_complex_prepare = self.params.get('stress_cmd_complex_prepare')
complex_cs_thread_pool = self.run_stress_thread(
stress_cmd=stress_cmd_complex_prepare, profile='data_dir/complex_schema.yaml')
# wait for the complex workload to finish
self.verify_stress_thread(complex_cs_thread_pool)
# generate random order to upgrade
nodes_num = len(self.db_cluster.nodes)
# prepare an array containing the indexes
indexes = [x for x in range(nodes_num)]
# shuffle it so we will upgrade the nodes in a
# random order
random.shuffle(indexes)
# prepare write workload
self.log.info('Starting c-s prepare write workload (n=10000000)')
prepare_write_stress = self.params.get('prepare_write_stress')
prepare_write_cs_thread_pool = self.run_stress_thread(stress_cmd=prepare_write_stress)
self.log.info('Sleeping for 60s to let cassandra-stress start before the upgrade...')
time.sleep(60)
# Prepare keyspace and tables for truncate test
if self.truncate_entries_flag:
self.insert_rows = 10
self.fill_db_data_for_truncate_test(insert_rows=self.insert_rows)
# upgrade first node
self.db_cluster.node_to_upgrade = self.db_cluster.nodes[indexes[0]]
self.log.info('Upgrade Node %s begin', self.db_cluster.node_to_upgrade.name)
self.upgrade_node(self.db_cluster.node_to_upgrade)
self.log.info('Upgrade Node %s ended', self.db_cluster.node_to_upgrade.name)
self.db_cluster.node_to_upgrade.check_node_health()
# wait for the prepare write workload to finish
self.verify_stress_thread(prepare_write_cs_thread_pool)
# read workload (cl=QUORUM)
self.log.info('Starting c-s read workload (cl=QUORUM n=10000000)')
stress_cmd_read_cl_quorum = self.params.get('stress_cmd_read_cl_quorum')
read_stress_queue = self.run_stress_thread(stress_cmd=stress_cmd_read_cl_quorum)
# wait for the read workload to finish
self.verify_stress_thread(read_stress_queue)
self.fill_and_verify_db_data('after upgraded one node')
# read workload
self.log.info('Starting c-s read workload for 10m')
stress_cmd_read_10m = self.params.get('stress_cmd_read_10m')
read_10m_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_read_10m)
self.log.info('Sleeping for 60s to let cassandra-stress start before the upgrade...')
time.sleep(60)
# upgrade second node
self.db_cluster.node_to_upgrade = self.db_cluster.nodes[indexes[1]]
self.log.info('Upgrade Node %s begin', self.db_cluster.node_to_upgrade.name)
self.upgrade_node(self.db_cluster.node_to_upgrade)
self.log.info('Upgrade Node %s ended', self.db_cluster.node_to_upgrade.name)
self.db_cluster.node_to_upgrade.check_node_health()
# wait for the 10m read workload to finish
self.verify_stress_thread(read_10m_cs_thread_pool)
self.fill_and_verify_db_data('after upgraded two nodes')
# read workload (80m)
self.log.info('Starting c-s read workload for 80m')
stress_cmd_read_80m = self.params.get('stress_cmd_read_80m')
read_80m_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_read_80m)
self.log.info('Sleeping for 60s to let cassandra-stress start before the rollback...')
time.sleep(60)
# rollback second node
self.log.info('Rollback Node %s begin', self.db_cluster.nodes[indexes[1]].name)
self.rollback_node(self.db_cluster.nodes[indexes[1]])
self.log.info('Rollback Node %s ended', self.db_cluster.nodes[indexes[1]].name)
self.db_cluster.nodes[indexes[1]].check_node_health()
self.fill_and_verify_db_data('after rollback the second node')
for i in indexes[1:]:
self.db_cluster.node_to_upgrade = self.db_cluster.nodes[i]
self.log.info('Upgrade Node %s begin', self.db_cluster.node_to_upgrade.name)
self.upgrade_node(self.db_cluster.node_to_upgrade)
self.log.info('Upgrade Node %s ended', self.db_cluster.node_to_upgrade.name)
self.db_cluster.node_to_upgrade.check_node_health()
self.fill_and_verify_db_data('after upgraded %s' % self.db_cluster.node_to_upgrade.name)
# wait for the 80m read workload to finish
self.verify_stress_thread(read_80m_cs_thread_pool)
self.verify_stress_thread(entire_write_cs_thread_pool)
# figure out what is the last supported sstable version
self.expected_sstable_format_version = self.get_highest_supported_sstable_version()
# run 'nodetool upgradesstables' on all nodes and check/wait for all file to be upgraded
upgradesstables_available = self.db_cluster.run_func_parallel(func=self.upgradesstables_if_command_available)
# only check sstable format version if all nodes had 'nodetool upgradesstables' available
if all(upgradesstables_available):
tables_upgraded = self.db_cluster.run_func_parallel(func=self.wait_for_sstable_upgrade)
assert all(tables_upgraded), "Some nodes failed to upgrade the sstable format {}".format(tables_upgraded)
verify_stress_after_cluster_upgrade = self.params.get( # pylint: disable=invalid-name
'verify_stress_after_cluster_upgrade')
verify_stress_cs_thread_pool = self.run_stress_thread(stress_cmd=verify_stress_after_cluster_upgrade)
self.verify_stress_thread(verify_stress_cs_thread_pool)
# complex workload: verify data by simple read cl=ALL
self.log.info('Starting c-s complex workload to verify data by simple read')
stress_cmd_complex_verify_read = self.params.get('stress_cmd_complex_verify_read')
complex_cs_thread_pool = self.run_stress_thread(
stress_cmd=stress_cmd_complex_verify_read, profile='data_dir/complex_schema.yaml')
# wait for the read complex workload to finish
self.verify_stress_thread(complex_cs_thread_pool)
# After adjusted the workloads, there is a entire write workload, and it uses a fixed duration for catching the data lose.
# But the execute time of workloads are not exact, so let only use basic prepare write & read verify for complex workloads,
# and comment two complex workloads.
#
# TODO: retest commented workloads and decide to enable or delete them.
#
# complex workload: verify data by multiple ops
#self.log.info('Starting c-s complex workload to verify data by multiple ops')
#stress_cmd_complex_verify_more = self.params.get('stress_cmd_complex_verify_more')
#complex_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_complex_verify_more, profile='data_dir/complex_schema.yaml')
# wait for the complex workload to finish
# self.verify_stress_thread(complex_cs_thread_pool)
# complex workload: verify data by delete 1/10 data
#self.log.info('Starting c-s complex workload to verify data by delete')
#stress_cmd_complex_verify_delete = self.params.get('stress_cmd_complex_verify_delete')
#complex_cs_thread_pool = self.run_stress_thread(stress_cmd=stress_cmd_complex_verify_delete, profile='data_dir/complex_schema.yaml')
# wait for the complex workload to finish
# self.verify_stress_thread(complex_cs_thread_pool)
error_factor = 3
schema_load_error_num = 0
for node in self.db_cluster.nodes:
errors = node.search_database_log(search_pattern='Failed to load schema version',
start_from_beginning=True,
publish_events=False)
schema_load_error_num += len(errors)
self.log.debug('schema_load_error_num: %d', schema_load_error_num)
assert schema_load_error_num <= error_factor * 8 * \
len(self.db_cluster.nodes), 'Only allowing shards_num * %d schema load errors per host during the entire test, actual: %d' % (
error_factor, schema_load_error_num)
self.log.debug('start sstabledump verify')
self.db_cluster.nodes[0].remoter.run('for i in `sudo find /var/lib/scylla/data/keyspace_complex/ -type f |grep -v manifest.json |grep -v snapshots'
' |head -n 1`; do echo $i; sudo sstabledump $i 1>/tmp/sstabledump.output || exit 1; done', verbose=True)
self.log.info('all nodes were upgraded, and last workaround is verified.')