Skip to content

Commit

Permalink
Add a unique race id
Browse files Browse the repository at this point in the history
With this commit we add a new property 'trial-id' in addition to
'trial-timestamp'. The main reason for this change is that we set the
same effective start date in some environments. If we run the same
combination of track/challenge/car but use track-params or team-params,
Rally will mix up results. If we use a unique id per race, this cannot
happen.

Closes elastic#431
  • Loading branch information
danielmitterdorfer committed Mar 9, 2018
1 parent 10f36bd commit 34e32f1
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 142 deletions.
16 changes: 11 additions & 5 deletions docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ Here is a typical metrics record::

{
"environment": "nightly",
"trial-timestamp": "20160421T042749Z",
"trial-id": "6ebc6e53-ee20-4b0c-99b4-09697987e9f4",
"@timestamp": 1461213093093,
"relative-time": 10507328,
"track": "geonames",
"track-params": {
"shard-count": 3
},
"challenge": "append-no-conflicts",
"car": "defaults",
"sample-type": "normal",
"trial-timestamp": "20160421T042749Z",
"@timestamp": 1461213093093,
"relative-time": 10507328,
"name": "throughput",
"value": 27385,
"unit": "docs/s",
Expand All @@ -38,7 +39,7 @@ Here is a typical metrics record::
"node_name": "rally-node0",
"source_revision": "a6c0a81",
"distribution_version": "5.0.0-SNAPSHOT",
"tag_reference": "Github ticket 1234",
"tag_reference": "Github ticket 1234"
}
}

Expand Down Expand Up @@ -66,7 +67,12 @@ Rally runs warmup trials but records all samples. Normally, we are just interest
trial-timestamp
~~~~~~~~~~~~~~~

A constant timestamp (always in UTC) that is determined when Rally is invoked. It is intended to group all samples of a benchmark trial.
A constant timestamp (always in UTC) that is determined when Rally is invoked.

trial-id
~~~~~~~~

A UUID that changes on every invocation of Rally. It is intended to group all samples of a benchmark trial.

@timestamp
~~~~~~~~~~
Expand Down
19 changes: 9 additions & 10 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,12 @@ def start_benchmark(self, t, lap, metrics_meta_info):
self.challenge = select_challenge(self.config, self.track)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
self.throughput_calculator = ThroughputCalculator()
# create - but do not yet open - the metrics store as an internal timer starts when we open it.
cls = metrics.metrics_store_class(self.config)
self.metrics_store = cls(cfg=self.config, meta_info=metrics_meta_info, lap=lap)
self.metrics_store = metrics.metrics_store(cfg=self.config,
track=self.track.name,
challenge=self.challenge.name,
meta_info=metrics_meta_info,
lap=lap,
read_only=False)
for host in self.config.opts("driver", "load_driver_hosts"):
if host != "localhost":
self.load_driver_hosts.append(net.resolve(host))
Expand All @@ -339,13 +342,9 @@ def start_benchmark(self, t, lap, metrics_meta_info):
self.target.on_prepare_track(preps, self.config, self.track)

def after_track_prepared(self):
track_name = self.track.name
challenge_name = self.challenge.name
car_name = self.config.opts("mechanic", "car.names")

logger.info("Benchmark for track [%s], challenge [%s] and car %s is about to start." % (track_name, challenge_name, car_name))
trial_timestamp = self.config.opts("system", "time.start")
self.metrics_store.open(trial_timestamp, track_name, challenge_name, car_name)
logger.info("Benchmark is about to start.")
# ensure relative time starts when the benchmark starts.
self.reset_relative_time()

allocator = Allocator(self.challenge.schedule)
self.allocations = allocator.allocations
Expand Down
68 changes: 30 additions & 38 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class MetaInfoScope(Enum):
"""


def metrics_store(cfg, read_only=True, track=None, challenge=None, car=None):
def metrics_store(cfg, read_only=True, track=None, challenge=None, car=None, meta_info=None, lap=None):
"""
Creates a proper metrics store based on the current configuration.
Expand All @@ -199,13 +199,14 @@ def metrics_store(cfg, read_only=True, track=None, challenge=None, car=None):
:return: A metrics store implementation.
"""
cls = metrics_store_class(cfg)
store = cls(cfg)
store = cls(cfg=cfg, meta_info=meta_info, lap=lap)
logger.info("Creating %s" % str(store))

trial_id = cfg.opts("system", "trial.id")
trial_timestamp = cfg.opts("system", "time.start")
selected_car = cfg.opts("mechanic", "car.names") if car is None else car

store.open(trial_timestamp, track, challenge, selected_car, create=not read_only)
store.open(trial_id, trial_timestamp, track, challenge, selected_car, create=not read_only)
return store


Expand Down Expand Up @@ -267,6 +268,7 @@ def __init__(self, cfg, clock=time.Clock, meta_info=None, lap=None):
:param lap: This parameter is optional and intended for creating a metrics store with a previously serialized lap.
"""
self._config = cfg
self._trial_id = None
self._trial_timestamp = None
self._track = None
self._track_params = cfg.opts("track", "params")
Expand All @@ -286,11 +288,12 @@ def __init__(self, cfg, clock=time.Clock, meta_info=None, lap=None):
self._clock = clock
self._stop_watch = self._clock.stop_watch()

def open(self, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
def open(self, trial_id=None, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
"""
Opens a metrics store for a specific trial timestamp, track, challenge and car.
Opens a metrics store for a specific trial, track, challenge and car.
:param trial_timestamp: The trial (timestamp).
:param trial_id: The trial id. This attribute is sufficient to uniquely identify a challenge.
:param trial_timestamp: The trial timestamp as a datetime.
:param track_name: Track name.
:param challenge_name: Challenge name.
:param car_name: Car name.
Expand All @@ -299,15 +302,18 @@ def open(self, trial_timestamp=None, track_name=None, challenge_name=None, car_n
False when it is just opened for reading (as we can assume all necessary indices exist at this point).
"""
if ctx:
self._trial_id = ctx["trial-id"]
self._trial_timestamp = ctx["trial-timestamp"]
self._track = ctx["track"]
self._challenge = ctx["challenge"]
self._car = ctx["car"]
else:
self._trial_id = trial_id
self._trial_timestamp = time.to_iso8601(trial_timestamp)
self._track = track_name
self._challenge = challenge_name
self._car = car_name
assert self._trial_id is not None, "Attempting to open metrics store without a trial id"
assert self._trial_timestamp is not None, "Attempting to open metrics store without a trial timestamp"
assert self._track is not None, "Attempting to open metrics store without a track"
assert self._challenge is not None, "Attempting to open metrics store without a challenge"
Expand Down Expand Up @@ -412,6 +418,7 @@ def meta_info(self, meta_info):
@property
def open_context(self):
return {
"trial-id": self._trial_id,
"trial-timestamp": self._trial_timestamp,
"track": self._track,
"challenge": self._challenge,
Expand Down Expand Up @@ -526,6 +533,7 @@ def _put(self, level, level_key, name, value, unit, task, operation, operation_t
doc = {
"@timestamp": time.to_epoch_millis(absolute_time),
"relative-time": int(relative_time * 1000 * 1000),
"trial-id": self._trial_id,
"trial-timestamp": self._trial_timestamp,
"environment": self._environment_name,
"track": self._track,
Expand Down Expand Up @@ -728,9 +736,9 @@ def __init__(self,
self._index_template_provider = index_template_provider_class(cfg)
self._docs = None

def open(self, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
def open(self, trial_id=None, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
self._docs = []
MetricsStore.open(self, trial_timestamp, track_name, challenge_name, car_name, ctx, create)
MetricsStore.open(self, trial_id, trial_timestamp, track_name, challenge_name, car_name, ctx, create)
self._index = self.index_name()
# reduce a bit of noise in the metrics cluster log
if create:
Expand Down Expand Up @@ -862,27 +870,7 @@ def _query_by_name(self, name, task, operation_type, sample_type, lap):
"filter": [
{
"term": {
"trial-timestamp": self._trial_timestamp
}
},
{
"term": {
"environment": self._environment_name
}
},
{
"term": {
"track": self._track
}
},
{
"term": {
"challenge": self._challenge
}
},
{
"term": {
"car": self._car_name
"trial-id": self._trial_id
}
},
{
Expand Down Expand Up @@ -1076,26 +1064,28 @@ def format_dict(d):

def create_race(cfg, track, challenge):
car = cfg.opts("mechanic", "car.names")
environment_name = cfg.opts("system", "env.name")
environment = cfg.opts("system", "env.name")
trial_id = cfg.opts("system", "trial.id")
trial_timestamp = cfg.opts("system", "time.start")
total_laps = cfg.opts("race", "laps")
user_tags = extract_user_tags_from_config(cfg)
pipeline = cfg.opts("race", "pipeline")
track_params = cfg.opts("track", "params")
rally_version = version.version()

return Race(rally_version, environment_name, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car, total_laps)
return Race(rally_version, environment, trial_id, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car, total_laps)


class Race:
def __init__(self, rally_version, environment_name, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car,
def __init__(self, rally_version, environment_name, trial_id, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car,
total_laps, cluster=None, lap_results=None, results=None):
if results is None:
results = {}
if lap_results is None:
lap_results = []
self.rally_version = rally_version
self.environment_name = environment_name
self.trial_id = trial_id
self.trial_timestamp = trial_timestamp
self.pipeline = pipeline
self.user_tags = user_tags
Expand All @@ -1104,7 +1094,7 @@ def __init__(self, rally_version, environment_name, trial_timestamp, pipeline, u
self.challenge = challenge
self.car = car
self.total_laps = total_laps
# will be set later - contains hosts, revision, distribution_version, ...s
# will be set later - contains hosts, revision, distribution_version, ...
self.cluster = cluster
self.lap_results = lap_results
self.results = results
Expand Down Expand Up @@ -1143,6 +1133,7 @@ def as_dict(self):
d = {
"rally-version": self.rally_version,
"environment": self.environment_name,
"trial-id": self.trial_id,
"trial-timestamp": time.to_iso8601(self.trial_timestamp),
"pipeline": self.pipeline,
"user-tags": self.user_tags,
Expand All @@ -1164,6 +1155,7 @@ def to_result_dicts(self):
result_template = {
"rally-version": self.rally_version,
"environment": self.environment_name,
"trial-id": self.trial_id,
"trial-timestamp": time.to_iso8601(self.trial_timestamp),
"distribution-version": self.cluster.distribution_version,
"distribution-major-version": versions.major_version(self.cluster.distribution_version),
Expand Down Expand Up @@ -1206,7 +1198,7 @@ def from_dict(cls, d):

# Don't restore a few properties like cluster because they (a) cannot be reconstructed easily without knowledge of other modules
# and (b) it is not necessary for this use case.
return Race(d["rally-version"], d["environment"], time.from_is8601(d["trial-timestamp"]), d["pipeline"], user_tags,
return Race(d["rally-version"], d["environment"], d["trial-id"], time.from_is8601(d["trial-timestamp"]), d["pipeline"], user_tags,
d["track"], d.get("track-params"), d["challenge"], d["car"], d["total-laps"], results=d["results"])


Expand Down Expand Up @@ -1359,10 +1351,10 @@ def list(self):

def find_by_timestamp(self, timestamp):
filters = [{
"term": {
"environment": self.environment_name
}
},
"term": {
"environment": self.environment_name
}
},
{
"term": {
"trial-timestamp": timestamp
Expand Down
2 changes: 2 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import time
import uuid
import faulthandler
import signal

Expand Down Expand Up @@ -666,6 +667,7 @@ def main():
cfg.add(config.Scope.application, "system", "time.start.user_provided", False)

cfg.add(config.Scope.applicationOverride, "system", "quiet.mode", args.quiet)
cfg.add(config.Scope.applicationOverride, "system", "trial.id", str(uuid.uuid4()))

# per node?
cfg.add(config.Scope.applicationOverride, "system", "offline.mode", args.offline)
Expand Down
3 changes: 3 additions & 0 deletions esrally/resources/metrics-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
"relative-time": {
"type": "long"
},
"trial-id": {
"type": "keyword"
},
"trial-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
Expand Down
3 changes: 3 additions & 0 deletions esrally/resources/races-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
"enabled": true
},
"properties": {
"trial-id": {
"type": "keyword"
},
"trial-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
Expand Down
3 changes: 3 additions & 0 deletions esrally/resources/results-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
"enabled": true
},
"properties": {
"trial-id": {
"type": "keyword"
},
"trial-timestamp": {
"type": "date",
"format": "basic_date_time_no_millis",
Expand Down
1 change: 1 addition & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def setUp(self):
self.cfg = config.Config()
self.cfg.add(config.Scope.application, "system", "env.name", "unittest")
self.cfg.add(config.Scope.application, "system", "time.start", datetime(year=2017, month=8, day=20, hour=1, minute=0, second=0))
self.cfg.add(config.Scope.application, "system", "trial.id", "6ebc6e53-ee20-4b0c-99b4-09697987e9f4")
self.cfg.add(config.Scope.application, "track", "challenge.name", "default")
self.cfg.add(config.Scope.application, "track", "params", {})
self.cfg.add(config.Scope.application, "track", "test.mode.enabled", True)
Expand Down
Loading

0 comments on commit 34e32f1

Please sign in to comment.