Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change filestore to be indexed by unique ID #720

Merged
merged 11 commits into from
Jul 23, 2019
8 changes: 6 additions & 2 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,11 +1365,15 @@ def __init__(self, cfg):
self.cfg = cfg
self.environment_name = cfg.opts("system", "env.name")
self.trial_timestamp = cfg.opts("system", "time.start")
self.trial_id = cfg.opts("system", "trial.id")
self.current_race = None

def find_by_timestamp(self, timestamp):
raise NotImplementedError("abstract method")

def find_by_trial_id(self, uid):
raise NotImplementedError("abstract method")

def list(self):
raise NotImplementedError("abstract method")

Expand Down Expand Up @@ -1434,8 +1438,8 @@ def list(self):
all_races = self._to_races(results)
return all_races[:self._max_results()]

def find_by_timestamp(self, timestamp):
race_file = "%s/race.json" % paths.race_root(cfg=self.cfg, start=time.from_is8601(timestamp))
def find_by_trial_id(self, trial_id):
race_file = "%s/race.json" % paths.race_root(cfg=self.cfg, trial_id=trial_id)
if io.exists(race_file):
races = self._to_races([race_file])
if races:
Expand Down
10 changes: 4 additions & 6 deletions esrally/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os


Expand All @@ -26,9 +25,8 @@ def races_root(cfg):
return "%s/races" % cfg.opts("node", "root.dir")


def race_root(cfg=None, start=None):
if not start:
start = cfg.opts("system", "time.start")
ts = "%04d-%02d-%02d-%02d-%02d-%02d" % (start.year, start.month, start.day, start.hour, start.minute, start.second)
return "%s/%s" % (races_root(cfg), ts)
def race_root(cfg=None, trial_id=None):
if not trial_id:
trial_id = cfg.opts("system", "trial.id")
return "%s/%s" % (races_root(cfg), trial_id)

13 changes: 7 additions & 6 deletions tests/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def __init__(self, hosts):
def test_config_opts_parsing(self, client_esclientfactory):
cfg = config.Config()

_datastore_host = ".".join([str(random.randint(1,254)) for _ in range(4)])
_datastore_port = random.randint(1024,65535)
_datastore_host = ".".join([str(random.randint(1, 254)) for _ in range(4)])
_datastore_port = random.randint(1024, 65535)
_datastore_secure = random.choice(["True", "true"])
_datastore_user = "".join([random.choice(string.ascii_letters) for _ in range(8)])
_datastore_password = "".join([random.choice(string.ascii_letters + string.digits + "_-@#$/") for _ in range(12)])
Expand Down Expand Up @@ -250,15 +250,15 @@ def test_transport_error_retries(side_effect, expected_logging_calls, expected_s

# The sec to sleep for 10 transport errors is
# [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] ~> 17.05min in total
sleep_slots = [float(2**i) for i in range(0, max_retry)]
sleep_slots = [float(2 ** i) for i in range(0, max_retry)]
mocked_sleep_calls = [mock.call(sleep_slots[i]) for i in range(0, max_retry)]

for rnd_err_idx, rnd_err_code in enumerate(rnd_err_codes):
# List of logger.debug calls to expect
rnd_mocked_logger_calls.append(
mock.call("%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.",
all_err_codes[rnd_err_code], rnd_err_code,
rnd_err_idx+1, max_retry+1, sleep_slots[rnd_err_idx])
rnd_err_idx + 1, max_retry + 1, sleep_slots[rnd_err_idx])
)

test_transport_error_retries(rnd_side_effects,
Expand Down Expand Up @@ -809,6 +809,7 @@ def setUp(self):
self.cfg = config.Config()
self.cfg.add(config.Scope.application, "system", "env.name", "unittest-env")
self.cfg.add(config.Scope.application, "system", "time.start", EsRaceStoreTests.TRIAL_TIMESTAMP)
self.cfg.add(config.Scope.application, "system", "trial.id", FileRaceStoreTests.TRIAL_ID)
self.race_store = metrics.EsRaceStore(self.cfg,
client_factory_class=MockClientFactory,
index_template_provider_class=DummyIndexTemplateProvider,
Expand Down Expand Up @@ -1286,10 +1287,10 @@ def setUp(self):
self.cfg.add(config.Scope.application, "system", "env.name", "unittest-env")
self.cfg.add(config.Scope.application, "system", "list.races.max_results", 100)
self.cfg.add(config.Scope.application, "system", "time.start", FileRaceStoreTests.TRIAL_TIMESTAMP)
self.cfg.add(config.Scope.application, "system", "trial.id", FileRaceStoreTests.TRIAL_ID)
self.race_store = metrics.FileRaceStore(self.cfg)

def test_store_race(self):
from esrally import time
schedule = [
track.Task("index #1", track.Operation("index", track.OperationType.Bulk))
]
Expand Down Expand Up @@ -1332,6 +1333,6 @@ def test_store_race(self):

self.race_store.store_race(race)

retrieved_race = self.race_store.find_by_timestamp(timestamp=time.to_iso8601(FileRaceStoreTests.TRIAL_TIMESTAMP))
retrieved_race = self.race_store.find_by_trial_id(trial_id=FileRaceStoreTests.TRIAL_ID)
self.assertEqual(race.trial_timestamp, retrieved_race.trial_timestamp)
self.assertEqual(1, len(self.race_store.list()))