Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datarate dependent compressor #358

Merged
merged 8 commits into from
Feb 8, 2021
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions bin/bootstrax
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ what bootstrax is thinking of at the moment.
- **disk_used**: used part of the disk whereto this bootstrax instance
is writing to (in percent).
"""
__version__ = '1.0.3'
__version__ = '1.0.4'

import argparse
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -161,8 +161,8 @@ print(f'---\n bootstrax version {__version__}\n---')

# The folder that can be used for testing bootstrax (i.e. non production
# mode). It will be written to:
test_data_folder = ('/nfs/scratch/bootstrax/' if
os.path.exists('/nfs/scratch/bootstrax/')
test_data_folder = ('/data/test_processed/' if
os.path.exists('/data/test_processed/')
else './bootstrax/')

# Timeouts in seconds
Expand Down Expand Up @@ -202,7 +202,9 @@ timeouts = {
# Bootstrax writes it's state to the daq-database. To have a backlog we store this
# state using a TTL collection. To prevent too many entries in this backlog, only
# create new entries if the previous entry is at least this old (in seconds).
'min_status_interval': 60
'min_status_interval': 60,
# Minimum time we can take to can infer the datarate (s).
'max_data_rate_infer_time': 30,
}

# The disk that the eb is writing to may fill up at some point. The data should
Expand Down Expand Up @@ -709,19 +711,24 @@ def infer_mode(rd):
uncompressed redax rate. Estimating save parameters for running
bootstrax from:
https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:eb_speed_tests_2021update
:returns: dictionary of how many cores and max_messages should be used based on an
estimated data rate.
:returns: dictionary of how many cores, max_messages and compressor
should be used based on an estimated data rate.
"""
# Get data rate from dispatcher
try:
docs = ag_stat_coll.aggregate([
{'$match': {'number': rd['number']}},
{'$group': {'_id': '$detector', 'rate': {'$max': '$rate'}}}
])
data_rate = int(sum([d['rate'] for d in docs]))
data_rate = None
started_looking = time.time()
while data_rate is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darrylmasson I think this is why we might have seen more than usual failures at the eb lately. Before this nice aggregation we had a check to see if the data_rate actually returned something. Now we should again be in the good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternately, if we make sure the run hasn't started within the last 10 or 15 seconds, we can be sure that the dispatcher has been through a few update cycles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this is what I had first but then decided against it because it would lead to unnecessary waiting time: 96364e9

This was also because I set the time to 1 minute rather than 10 s :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved in
b9c6220

docs = ag_stat_coll.aggregate([
{'$match': {'number': rd['number']}},
{'$group': {'_id': '$detector', 'rate': {'$max': '$rate'}}}
])
data_rate = int(sum([d['rate'] for d in docs]))
if time.time() - started_looking > timeouts['max_data_rate_infer_time']:
raise RuntimeError
except Exception as e:
log_warning(f'infer_mode ran into {e}. Cannot infer mode, using default mode.',
run_id=f'{rd["number"]:06}', priority='info')
log_warning(f'infer_mode ran into {e}. Cannot infer datarate, using default mode.',
run_id=f'{rd["number"]:06}', priority='warning')
data_rate = None

# Find out if eb is new (eb3-eb5):
Expand Down Expand Up @@ -754,16 +761,48 @@ def infer_mode(rd):
if n_fails:
# Exponentially lower resources & increase timeout
result = dict(
cores=np.clip(result['cores']/(1.1**n_fails), 4, 40),
max_messages=np.clip(result['max_messages']/(1.1**n_fails), 4, 100),
timeout=np.clip(result['timeout']*(1.1**n_fails), 500, 3600)
cores=np.clip(result['cores']/(1.1**n_fails), 4, 40).astype(int),
max_messages=np.clip(result['max_messages']/(1.1**n_fails), 4, 100).astype(int),
timeout=np.clip(result['timeout']*(1.1**n_fails), 500, 3600).astype(int),
)
log_warning(f'infer_mode::\tRepeated failures on {rd["number"]}@{hostname}. '
f'Lowering to {result}',
priority='info',
run_id=f'{rd["number"]:06}')
else:
result = {k: int(v) for k, v in result.items()}
result['records_compressor'] = infer_records_compressor(rd, data_rate, n_fails)
log.info(f'infer_mode::\tInferred mode for {rd["number"]}\t{result}')
return {k: int(v) for k, v in result.items()}
return result


def infer_records_compressor(rd, datarate, n_fails):
"""
Get a compressor for the (raw)records. This takes two things in consideration:
1. Do we store the data fast enough (high write speed)
2. Does the data fit into the buffer

Used compressors:
bz2: slow but very good compression -> use for low datarate
zstd: fast & decent compression, max chunk size of ??? GB
lz4: fast & not no chunk size limit, use if all ese fails
"""
if n_fails or datarate is None:
# Cannot infer datarate or failed before, go for fast & safe
return 'lz4' if n_fails > 1 else 'zstd'

chunk_length = (rd['daq_config']['strax_chunk_overlap'] +
rd['daq_config']['strax_chunk_length'])
chunk_size_mb = datarate*chunk_length
if datarate < 50:
# Low datarate, we can do very large compression
return 'bz2'
if chunk_size_mb > 1000:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very conservative value. We should be able to go to 1.8G and still have 15% overhead between us and the 31-bit issue. Given that zstd is squeezier (and has higher throughput), I think we should try to use that as much as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, you are right:

import strax
a = np.zeros(int(2e8), dtype=np.int64)
print(f'Buffer of {a.nbytes/(1e9)} GB')
strax.save_file('test.test', a, compressor='zstd')

However, we need to keep in mind that we don't want to be running into issues where just one chunk is more chunky than the others, thereby disallowing us to save the file.

Nevertheless, I agree, let's set it to 1.8 GB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved in
ee5277a

# Extremely large chunks, let's use LZ4 because we know that it
# can handle this.
return 'lz4'
# High datarate and reasonable chunk size.
return 'zstd'


##
Expand Down Expand Up @@ -1169,7 +1208,7 @@ def manual_fail(*, mongo_id=None, number=None, reason=''):
def run_strax(run_id, input_dir, targets, readout_threads, compressor,
run_start_time, samples_per_record, cores, max_messages, timeout,
daq_chunk_duration, daq_overlap_chunk_duration, post_processing,
debug=False):
records_compressor, debug=False):
# Check mongo connection
ping_dbs()
# Clear the swap memory used by npshmmex
Expand All @@ -1193,6 +1232,10 @@ def run_strax(run_id, input_dir, targets, readout_threads, compressor,
timeout=timeout,
targets=targets)

for t in ('raw_records', 'records'):
# Set the (raw)records processor to the inferred one
st._plugin_class_registry[t].compressor = records_compressor

# Make a function for running strax, call the function to process the run
# This way, it can also be run inside a wrapper to profile strax
def st_make():
Expand Down