Skip to content

Commit

Permalink
Provision and test against remote ES instance (#184)
Browse files Browse the repository at this point in the history
* provision and test against remote ES instance

This commit adds the functionality to allow the testing framework to
provision and run the tests against a remotely running Elasticsearch
instance.
The '-p' command line option has been modified to take an optional
argument, the URL - optionally with credentials - of the ES instance to
use.

* mask actual used password in stdout message

Replace password with star character.

(cherry picked from commit eec7dd7)
  • Loading branch information
bpintea committed Sep 30, 2019
1 parent cc0ccc7 commit b53074b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 56 deletions.
35 changes: 18 additions & 17 deletions test/integration/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

from elasticsearch import Elasticsearch

REQ_AUTH = ("elastic", Elasticsearch.AUTH_PASSWORD)

TABLEAU_DATASET_BASE_URL = "https://raw.githubusercontent.com/elastic/connector-plugin-sdk/120fe213c4bce30d9424c155fbd9b2ad210239e0/tests/datasets/TestV1/"

CALCS_TEMPLATE =\
Expand Down Expand Up @@ -282,14 +280,16 @@ class TestData(object):
_csv_header = None
_csv_lines = None

_es = None
_offline_dir = None
_mode = None

def __init__(self, mode=MODE_INDEX, offline_dir=None):
def __init__(self, es, mode=MODE_INDEX, offline_dir=None):
self._csv_md5 = {}
self._csv_header = {}
self._csv_lines = {}

self._es = es
self._offline_dir = offline_dir
self._mode = mode

Expand Down Expand Up @@ -376,8 +376,8 @@ def _prepare_tableau_load(self, file_name, index_name, index_template):
ndjson = self._get_csv_as_ndjson(TABLEAU_DATASET_BASE_URL, file_name, index_name)

if self.MODE_NOINDEX < self._mode:
with requests.put("http://localhost:%s/_template/%s_template" % (Elasticsearch.ES_PORT, index_name),
json=index_template, auth=REQ_AUTH) as req:
with requests.put("%s/_template/%s_template" % (self._es.base_url(), index_name),
json=index_template, auth=self._es.credentials()) as req:
if req.status_code != 200:
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
req.status_code, req.text))
Expand All @@ -386,16 +386,17 @@ def _prepare_tableau_load(self, file_name, index_name, index_template):

def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
print("Indexing data for index '%s'." % index_name)
url = "http://localhost:%s/%s/_doc/_bulk" % (Elasticsearch.ES_PORT, index_name)
url = "%s/%s/_doc/_bulk" % (self._es.base_url(), index_name)
if pipeline_name:
url += "?pipeline=%s" % pipeline_name
if type(ndjsons) is not list:
ndjsons = [ndjsons]
for n in ndjsons:
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"}, auth=REQ_AUTH) as req:
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"},
auth=self._es.credentials()) as req:
if req.status_code != 200:
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name, req.status_code,
req.text))
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name,
req.status_code, req.text))
reply = json.loads(req.text)
if reply["errors"]:
raise Exception("bulk POST to %s failed with content: %s" % (index_name, req.text))
Expand All @@ -405,8 +406,8 @@ def _wait_for_results(self, index_name):
hits = 0
waiting_since = time.time()
while hits < MIN_INDEXED_DOCS:
url = "http://localhost:%s/%s/_search" % (Elasticsearch.ES_PORT, index_name)
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=REQ_AUTH)
url = "%s/%s/_search" % (self._es.base_url(), index_name)
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials())
if req.status_code != 200:
raise Exception("failed to _search %s: code: %s, body: %s" % (index_name, req.status_code, req.text))
answer = json.loads(req.text)
Expand All @@ -420,8 +421,8 @@ def _delete_if_needed(self, index_name):
return
print("Deleting any old index '%s'." % index_name);

url = "http://localhost:%s/%s" % (Elasticsearch.ES_PORT, index_name)
with requests.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=REQ_AUTH) as req:
url = "%s/%s" % (self._es.base_url(), index_name)
with requests.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials()) as req:
if req.status_code != 200 and req.status_code != 404:
raise Exception("Deleting index %s failed; code=%s, body: %s." %
(index_name, req.status_code, req.text))
Expand All @@ -433,8 +434,8 @@ def _load_tableau_sample(self, file_name, index_name, template, pipeline=None):
self._delete_if_needed(index_name)

if pipeline:
with requests.put("http://localhost:%s/_ingest/pipeline/parse_%s" % (Elasticsearch.ES_PORT,
index_name), json=pipeline, auth=REQ_AUTH) as req:
with requests.put("%s/_ingest/pipeline/parse_%s" % (self._es.base_url(), index_name),
json=pipeline, auth=self._es.credentials()) as req:
if req.status_code != 200:
raise Exception("PUT %s pipeline failed with code: %s (content: %s) " % (index_name,
req.status_code, req.text))
Expand Down Expand Up @@ -483,8 +484,8 @@ def _put_sample_template(self, sample_name, index_name):
# turn it to JSON (to deal with trailing commas past last member on a level
mapping = eval(mapping)
# PUT the built template
url = "http://localhost:%s/_template/%s_template" % (Elasticsearch.ES_PORT, index_name)
with requests.put(url, json=mapping, auth=REQ_AUTH, timeout=Elasticsearch.REQ_TIMEOUT) as req:
url = "%s/_template/%s_template" % (self._es.base_url(), index_name)
with requests.put(url, json=mapping, auth=self._es.credentials(), timeout=Elasticsearch.REQ_TIMEOUT) as req:
if req.status_code != 200:
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
req.status_code, req.text))
Expand Down
55 changes: 36 additions & 19 deletions test/integration/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import signal
import subprocess
from subprocess import PIPE
from urllib.parse import urlparse

ARTIF_URL = "https://artifacts-api.elastic.co/v1/versions"
ES_PROJECT = "elasticsearch"
Expand All @@ -33,19 +34,39 @@ class Elasticsearch(object):
TERM_TIMEOUT = 5 # how long to wait for processes to die (before KILLing)
REQ_TIMEOUT = 20 # default GET request timeout
ES_PORT = 9200
ES_CREDENTIALS = ("elastic", "elastic") # user, pwd
ES_BASE_URL = "http://localhost:%s" % ES_PORT
ES_START_TIMEOUT = 60 # how long to wait for Elasticsearch to come online
ES_401_RETRIES = 8 # how many "starting" 401 answers to accept before giving up (.5s waiting inbetween)
AUTH_PASSWORD = "elastic"

_offline_dir = None
_credentials = None
_base_url = None

def __init__(self, offline_dir=None):
def __init__(self, offline_dir=None, url=None):
self._offline_dir = offline_dir
if not url:
self._port = self.ES_PORT
self._base_url = self.ES_BASE_URL
self._credentials = self.ES_CREDENTIALS
else:
u = urlparse(url)
self._port = u.port if u.port else self.ES_PORT
self._base_url = "%s://%s:%s" % (u.scheme, u.hostname, self._port)
self._credentials = (u.username, u.password) if u.username else self.ES_CREDENTIALS
print("Using Elasticsearch instance at %s, credentials (%s, %s)" % (self._base_url, self._credentials[0],
"*" * len(self._credentials[1])))

@staticmethod
def elasticsearch_distro_filename(version):
return "%s-%s%s.%s" % (ES_PROJECT, version, ES_ARCH, PACKAGING)

def credentials(self):
return self._credentials

def base_url(self):
return self._base_url

def _latest_build(self, version):
req = requests.get(ARTIF_URL, timeout=self.REQ_TIMEOUT)
vers = req.json()["versions"]
Expand Down Expand Up @@ -124,7 +145,7 @@ def _update_es_yaml(self, es_dir):
with open(yaml, mode="a", newline="\n") as f:
f.write("#\n# ODBC Integration Test\n#\n")
f.write("xpack.security.enabled: True\n")
f.write("http.port: %s\n" % self.ES_PORT) # don't bind on next avail port
f.write("http.port: %s\n" % self._port) # don't bind on next avail port
f.write("cluster.routing.allocation.disk.threshold_enabled: False\n")

@staticmethod
Expand Down Expand Up @@ -196,22 +217,21 @@ def _enable_xpack(self, es_dir):
# setup passwords to random generated ones first...
pwd = self._gen_passwords(es_dir)
# ...then change passwords, easier to restart with failed tests
req = requests.post("http://localhost:%s/_security/user/_password" % self.ES_PORT, auth=("elastic", pwd),
json={"password": self.AUTH_PASSWORD})
req = requests.post("%s/_security/user/_password" % self._base_url, auth=(self._credentials[0], pwd),
json={"password": self._credentials[1]})
if req.status_code != 200:
raise Exception("attempt to change elastic's password failed with code %s" % req.status_code)
# kibana too (debug convenience)
req = requests.post("http://localhost:%s/_security/user/kibana/_password" % self.ES_PORT,
auth=("elastic", self.AUTH_PASSWORD), json={"password": self.AUTH_PASSWORD})
req = requests.post("%s/_security/user/kibana/_password" % self._base_url, auth=self._credentials,
json={"password": self._credentials[1]})
if req.status_code != 200:
print("ERROR: kibana user password change failed with code: %s" % req.status_code)

# start trial mode
auth = ("elastic", self.AUTH_PASSWORD)
url = "http://localhost:%s/_license/start_trial?acknowledge=true" % self.ES_PORT
url = "%s/_license/start_trial?acknowledge=true" % self._base_url
failures = 0
while True:
req = requests.post(url, auth=auth, timeout=self.REQ_TIMEOUT)
req = requests.post(url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
if req.status_code == 200:
# TODO: check content?
break
Expand Down Expand Up @@ -246,7 +266,7 @@ def reset(self, es_dir):
raise Exception()
except:
raise Exception("port %s is active; if Elasticsearch is running it needs to be shut down first" %
self.ES_PORT)
self._port)

data_path = os.path.join(es_dir, "data")
if os.path.isdir(data_path):
Expand All @@ -260,24 +280,21 @@ def reset(self, es_dir):
self._start_elasticsearch(es_dir)
self._enable_xpack(es_dir)

@staticmethod
def cluster_name(password=None):
auth = ("elastic", password) if password else None
def cluster_name(self, fail_on_non200=True):
try:
resp = requests.get("http://localhost:%s" % Elasticsearch.ES_PORT, auth=auth, timeout=.5)
resp = requests.get(self._base_url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
except (requests.Timeout, requests.ConnectionError):
return None
if resp.status_code != 200:
if password:
if fail_on_non200:
raise Exception("unexpected ES response code received: %s" % resp.status_code)
else:
return ""
if "cluster_name" not in resp.json():
raise Exception("unexpected ES answer received: %s" % resp.text)
return resp.json().get("cluster_name")

@staticmethod
def is_listening(password=None):
return Elasticsearch.cluster_name(password) is not None
def is_listening(self):
return self.cluster_name(False) is not None

# vim: set noet fenc=utf-8 ff=dos sts=0 sw=4 ts=4 tw=118 :
26 changes: 13 additions & 13 deletions test/integration/ites.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
import argparse
import os, sys, re, time

from elasticsearch import Elasticsearch #spawn_elasticsearch, reset_elasticsearch, es_is_listening, AUTH_PASSWORD
from elasticsearch import Elasticsearch
from data import TestData
from install import Installer
from testing import Testing


def ites(args):
es = Elasticsearch(args.offline_dir)
es = Elasticsearch(args.offline_dir, args.url)

# create a running instance of Elasticsearch if needed
if not args.pre_staged:
if args.url is None:
if args.es_reset:
es_dir = os.path.abspath(args.es_reset)
es.reset(es_dir)
Expand All @@ -39,7 +39,7 @@ def ites(args):
"version: %s)" % (args.driver, args.version))

es.spawn(version, root_dir, args.ephemeral)
elif not es.is_listening(Elasticsearch.AUTH_PASSWORD):
elif not es.is_listening():
raise Exception("no running prestaged Elasticsearch instance found.")
else:
print("Using pre-staged Elasticsearch.")
Expand All @@ -53,7 +53,7 @@ def ites(args):
else:
test_mode = TestData.MODE_INDEX

data = TestData(test_mode, args.offline_dir)
data = TestData(es, test_mode, args.offline_dir)
data.load()

# install the driver
Expand All @@ -65,13 +65,13 @@ def ites(args):
# run the tests
if not args.skip_tests:
assert(data is not None)
cluster_name = es.cluster_name(Elasticsearch.AUTH_PASSWORD)
cluster_name = es.cluster_name()
assert(len(cluster_name))
if args.dsn:
Testing(data, cluster_name, args.dsn).perform()
Testing(es, data, cluster_name, args.dsn).perform()
else:
Testing(data, cluster_name, "Packing=JSON;").perform()
Testing(data, cluster_name, "Packing=CBOR;").perform()
Testing(es, data, cluster_name, "Packing=JSON;").perform()
Testing(es, data, cluster_name, "Packing=CBOR;").perform()

def main():
parser = argparse.ArgumentParser(description='Integration Testing with Elasticsearch.')
Expand All @@ -80,8 +80,8 @@ def main():
stage_grp.add_argument("-r", "--root-dir", help="Root directory to [temporarily] stage Elasticsearch into.")
stage_grp.add_argument("-s", "--es-reset", help="Path to an already configured Elasticsearch folder to "
"use; data directory content will be removed; 'ephemeral' will be ignored.")
stage_grp.add_argument("-p", "--pre-staged", help="Use a pre-staged and running Elasticsearch instance",
action="store_true", default=False)
stage_grp.add_argument("-p", "--url", help="Use a pre-staged and running Elasticsearch instance. If no URL is "
"provided, %s is assumed." % Elasticsearch.ES_BASE_URL, nargs="?", const="")

parser.add_argument("-d", "--driver", help="The path to the driver file to test; if not provided, the driver "
"is assumed to have been installed.")
Expand All @@ -103,10 +103,10 @@ def main():
"default.")

args = parser.parse_args()
if not (args.root_dir or args.es_reset or args.pre_staged):
if not (args.root_dir or args.es_reset or args.url is not None):
parser.error("no Elasticsearch instance or root/staged directory provided.")

if not (args.driver or args.version or args.es_reset or args.pre_staged):
if not (args.driver or args.version or args.es_reset or args.url is not None):
parser.error("don't know what Elasticsearch version to test against.")

if args.driver and args.dsn and "Driver=" in args.dsn:
Expand Down
18 changes: 11 additions & 7 deletions test/integration/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,32 @@
from elasticsearch import Elasticsearch
from data import TestData, BATTERS_TEMPLATE

UID = "elastic"
CONNECT_STRING = 'Driver={Elasticsearch Driver};UID=%s;PWD=%s;Secure=0;' % (UID, Elasticsearch.AUTH_PASSWORD)
CATALOG = "distribution_run" # source built, "elasticsearch": nightly builds
DRIVER_NAME = "Elasticsearch Driver"

class Testing(unittest.TestCase):

_uid = None
_data = None
_dsn = None
_pyodbc = None
_catalog = None

def __init__(self, test_data, catalog=CATALOG, dsn=None):
def __init__(self, es, test_data, catalog, dsn=None):
super().__init__()
uid, pwd = es.credentials()

self._uid = uid
self._data = test_data
self._catalog = catalog

conn_str = "Driver={%s};UID=%s;PWD=%s;Secure=0;" % (DRIVER_NAME, uid, pwd)
if dsn:
if "Driver=" not in dsn:
self._dsn = CONNECT_STRING + dsn
self._dsn = conn_str + dsn
else:
self._dsn = dsn
else:
self._dsn = CONNECT_STRING
self._dsn = conn_str
print("Using DSN: '%s'." % self._dsn)

# only import pyODBC if running tests (vs. for instance only loading test data in ES)
Expand Down Expand Up @@ -324,7 +328,7 @@ def _proto_tests(self):
cnxn.clear_output_converters()

def perform(self):
self._check_info(self._pyodbc.SQL_USER_NAME, UID)
self._check_info(self._pyodbc.SQL_USER_NAME, self._uid)
self._check_info(self._pyodbc.SQL_DATABASE_NAME, self._catalog)

# simulate catalog querying as apps do in ES/GH#40775 do
Expand Down

0 comments on commit b53074b

Please sign in to comment.