Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into subshard
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 20, 2024
2 parents 94cd039 + 06ae3ca commit 2962832
Show file tree
Hide file tree
Showing 46 changed files with 253 additions and 142 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504 - Line break occurred after a binary operator
ignore = E265,E402,E999,W293,W504
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*,*/build/*

# F401 - Unused imports -- this is the only way to have a file-wide rule exception
per-file-ignores =
21 changes: 21 additions & 0 deletions .github/ISSUE_TEMPLATE/BUG_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
name: 🐛 Bug Report
about: Create a report to help us improve
title: '[BUG]'
labels: 'bug, untriaged'
assignees: ''
---
### What is the bug?
_A clear and concise description of the bug._

### What are your migration environments?
_What versions of Elasticsearch / Opensearch, what version of the Migration Assistant._

### How can one reproduce the bug?
_Steps to reproduce the behavior._

### What is the expected behavior?
_A clear and concise description of what you expected to happen._

### Do you have any additional context?
_Add any other context about the problem, such as log files our console output._
15 changes: 4 additions & 11 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
### Description
[Describe what this change achieves]
* Category (Enhancement, New feature, Bug fix, Test fix, Refactoring, Maintenance, Documentation)
* Why these changes are required?
* What is the old behavior before changes and new behavior after changes?
<!-- Describe what this change achieves -->

### Issues Resolved
[List any issues this PR will resolve]

Is this a backport? If so, please add backport PR # and/or commits #
<!-- List any GitHub or Jira issues this PR will resolve -->

### Testing
[Please provide details of testing done: unit testing, integration testing and manual testing]
<!-- Please provide details of testing done: unit testing, integration testing and manual testing -->

### Check List
- [ ] New functionality includes testing
- [ ] All tests pass, including unit test, integration test and doctest
- [ ] New functionality has been documented
- [ ] Commits are signed per the DCO using --signoff
- [ ] Public documentation issue/PR [created](https://github.com/opensearch-project/documentation-website/issues/new/choose), if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).
18 changes: 18 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ jobs:
env:
OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED: ''

- name: Detect Memory Dumps
if: failure()
run: |
if find . -type f -name "*.hprof" | grep -q '.'; then
echo "::group::Memory Dumps Detected"
echo "::warning::Memory dumps were found and uploaded as artifacts. Review these files to diagnose OOM issues."
echo "To download and inspect these files, navigate to 'Actions' -> 'Artifacts'."
echo "::endgroup::"
fi
- name: Upload memory dump
if: failure()
uses: actions/upload-artifact@v4
with:
if-no-files-found: ignore
name: memory-dumps
path: ./**/*.hprof

- uses: actions/upload-artifact@v4
if: always()
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSec
}
}

public static BulkDocSection fromMap(Map<String, Object> map) {
public static BulkDocSection fromMap(Object map) {
BulkIndex bulkIndex = OBJECT_MAPPER.convertValue(map, BulkIndex.class);
return new BulkDocSection(bulkIndex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.opensearch.migrations.bulkload.common;

import lombok.ToString;

@ToString
public class S3Uri {
public final String bucketName;
public final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public TransformerToIJsonTransformerAdapter(IJsonTransformer transformer) {
this(transformer, LoggerFactory.getLogger(OUTPUT_TRANSFORMATION_JSON_LOGGER));
}

private void logTransformation(Map<String, Object> before, Map<String, Object> after) {
private void logTransformation(Map<String, Object> before, Object after) {
if (transformerLogger.isInfoEnabled()) {
try {
var transformationTuple = toTransformationMap(before, after);
Expand All @@ -55,7 +55,7 @@ private void logTransformation(Map<String, Object> before, Map<String, Object> a
}
}

private Map<String, Object> toTransformationMap(Map<String, Object> before, Map<String, Object> after) {
private Map<String, Object> toTransformationMap(Map<String, Object> before, Object after) {
var transformationMap = new LinkedHashMap<String, Object>();
transformationMap.put("before", before);
transformationMap.put("after", after);
Expand All @@ -69,15 +69,15 @@ private static Map<String, Object> objectNodeToMap(Object node) {
}

@SneakyThrows
private static String printMap(Map<String, Object> map) {
private static String printMap(Object map) {
return MAPPER.writeValueAsString(map);
}

@SuppressWarnings("unchecked")
private MigrationItem transformMigrationItem(MigrationItem migrationItem) {
// Keep untouched original for logging
final Map<String, Object> originalMap = MAPPER.convertValue(migrationItem, Map.class);
var transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
Object transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedMigrationItem);
return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class);
}
Expand All @@ -100,7 +100,7 @@ void updateTemplates(Collection<? extends MigrationItem> transformedItems, Objec
public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
var inputJson = objectNodeToMap(globalData.toObjectNode());
log.atInfo().setMessage("BeforeJsonGlobal: {}").addArgument(() -> printMap(inputJson)).log();
var afterJson = transformer.transformJson(inputJson);
Object afterJson = transformer.transformJson(inputJson);
log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log();


Expand Down Expand Up @@ -154,7 +154,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
final Map<String, Object> originalInput = MAPPER.convertValue(indexData, Map.class);
final Map<String, Object> inputJson = MAPPER.convertValue(indexData, Map.class);
var afterJson = transformer.transformJson(inputJson);
Object afterJson = transformer.transformJson(inputJson);
logTransformation(originalInput, afterJson);
return MAPPER.convertValue(inputJson, IndexMetadata.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM amazonlinux:2023
ENV PIP_ROOT_USER_ACTION ignore
ENV LANG C.UTF-8

# procps-ng used for enabling 'watch' command on console
RUN dnf install -y --setopt=install_weak_deps=False \
curl-minimal \
diffutils \
Expand All @@ -26,6 +27,7 @@ RUN dnf install -y --setopt=install_weak_deps=False \
vim \
wget \
zlib-devel \
procps-ng \
&& \
dnf clean all && \
rm -rf /var/cache/dnf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ client_options=$(IFS=,; echo "${options[*]}")

set -o xtrace

# Fix OSB commit on latest tested version
workload_revision="fc64258a9b2ed2451423d7758ca1c5880626c520"

echo "Running opensearch-benchmark workloads against ${endpoint}"
echo "Running opensearch-benchmark w/ 'geonames' workload..." &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=geonames --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --exclude-tasks=check-cluster-health --workload-revision=$workload_revision --target-host=$endpoint --workload=geonames --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'http_logs' workload..." &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=http_logs --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --exclude-tasks=check-cluster-health --workload-revision=$workload_revision --target-host=$endpoint --workload=http_logs --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nested' workload..." &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nested --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --exclude-tasks=check-cluster-health --workload-revision=$workload_revision --target-host=$endpoint --workload=nested --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nyc_taxis' workload..." &&
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nyc_taxis --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options
pipenv run opensearch-benchmark execute-test --distribution-version=1.0.0 --exclude-tasks=check-cluster-health --workload-revision=$workload_revision --target-host=$endpoint --workload=nyc_taxis --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,29 @@ def send_request(session, index_suffix, url_base, auth, headers, no_refresh):
return None, str(e), None


def send_multi_type_request(session, index_name, type_name, payload, url_base, auth, headers, no_refresh):
"""Send a request to the specified URL with the given payload."""
timestamp = datetime.now().isoformat()
refresh_param = "false" if no_refresh else "true"
url = f"{url_base}/{index_name}/{type_name}/{timestamp}?refresh={refresh_param}"
try:
response = session.put(url, json=payload, auth=auth, headers=headers, verify=False, timeout=0.5)
return response.status_code, timestamp, response.json()
except requests.RequestException as e:
return None, str(e), None


def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", help="Cluster endpoint e.g. http://test.elb.us-west-2.amazonaws.com:9200.")
parser.add_argument("--username", help="Cluster username.")
parser.add_argument("--password", help="Cluster password.")
parser.add_argument("--enable_multi_type", action='store_true',
help="Flag to enable sending documents to a multi-type index.")
parser.add_argument("--no-clear-output", action='store_true',
help="Flag to not clear the output before each run. " +
"Helpful for piping to a file or other utility.")
"Helpful for piping to a file or other utility.")
parser.add_argument("--requests-per-sec", type=float, default=10.0, help="Target requests per second to be sent.")
parser.add_argument("--no-refresh", action='store_true', help="Flag to disable refresh after each request.")
return parser.parse_args()
Expand Down Expand Up @@ -97,13 +111,13 @@ def calculate_sleep_time(request_timestamps, target_requests_per_sec):
"""
if not request_timestamps:
return 0

target_time_per_iteration = 1.0 / target_requests_per_sec
average_time_per_iteration = (datetime.now() -
request_timestamps[0]).total_seconds() / (len(request_timestamps) + 1)

sleep_time = (target_time_per_iteration - average_time_per_iteration) * len(request_timestamps)

return max(0, sleep_time)


Expand All @@ -119,14 +133,29 @@ def main():
total_counts = {'2xx': 0, '4xx': 0, '5xx': 0, 'error': 0}
start_time = time.time()
request_timestamps = deque()
total_requests = 0

while True:
total_requests = total_requests + 1
request_timestamps.append(datetime.now())
current_index = get_current_date_index()

response_code, request_timestamp_or_error, response_json = send_request(
session, current_index, url_base, auth, keep_alive_headers, args.no_refresh
)
# Alternate between sending multi-type requests of 'type1' and 'type2'
if args.enable_multi_type:
if total_requests % 2 != 0:
type_name = "type1"
payload = {"title": "This is title of type 1"}
else:
type_name = "type2"
payload = {"content": "This is content of type 2", "contents": "This is contents of type 2"}
response_code, request_timestamp_or_error, response_json = send_multi_type_request(
session, "multi_type_index", type_name, payload, url_base, auth, keep_alive_headers, args.no_refresh
)
# Send simple document request
else:
response_code, request_timestamp_or_error, response_json = send_request(
session, current_index, url_base, auth, keep_alive_headers, args.no_refresh
)
update_counts(response_code, total_counts)

if response_code is not None:
Expand All @@ -137,7 +166,7 @@ def main():
response_pretty = "Response: N/A"

throughput = calculate_throughput(request_timestamps)

summary_message = (
f"Summary: 2xx responses = {total_counts['2xx']}, 4xx responses = {total_counts['4xx']}, "
f"5xx responses = {total_counts['5xx']}, Error requests = {total_counts['error']}"
Expand All @@ -151,20 +180,20 @@ def main():
f"{response_pretty}\n" +
f"{summary_message}\n" +
f"{throughput_message}")

sleep_time = calculate_sleep_time(request_timestamps, args.requests_per_sec)

# Flush the stdout buffer to ensure the log messages are displayed immediately and in sync
sys.stdout.flush()

if (sleep_time > 0):
time.sleep(sleep_time)

if time.time() - start_time >= 5:
session.close()
session = requests.Session()
start_time = time.time()

# Remove timestamps older than 5 seconds
while request_timestamps and (datetime.now() - request_timestamps[0]).total_seconds() > 5:
request_timestamps.popleft()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ RUN dnf install -y bash-completion --setopt=install_weak_deps=False && \
RUN echo '. /etc/profile.d/bash_completion.sh' >> ~/.bashrc && \
echo '. /etc/profile.d/venv.sh' >> ~/.bashrc && \
echo 'echo Welcome to the Migration Assistant Console' >> ~/.bashrc && \
echo 'eval "$(register-python-argcomplete cluster_tools)"' >> ~/.bashrc
echo 'eval "$(register-python-argcomplete cluster_tools)"' >> ~/.bashrc && \
echo 'PS1="(\t) \[\e[92m\]migration-console \[\e[0m\](\w) -> "' >> ~/.bashrc

# Set ENV to control startup script in /bin/sh mode
ENV ENV=/root/.bashrc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,19 @@ def execute_benchmark_workload(self, workload: str,
raise NotImplementedError(f"Auth type {self.auth_type} is not currently support for executing "
f"benchmark workloads")
# Note -- we should censor the password when logging this command
logger.info(f"Running opensearch-benchmark with '{workload}' workload")
command = (f"opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host={self.endpoint} "
f"--workload={workload} --pipeline=benchmark-only --test-mode --kill-running-processes "
f"--workload-params={workload_params} --client-options={client_options}")
# While a little wordier, this apprach prevents us from censoring the password if it appears in other contexts,
# Fix commit used for OSB on latest verified working commit
workload_revision = "fc64258a9b2ed2451423d7758ca1c5880626c520"
logger.info(f"Running opensearch-benchmark with '{workload}' workload and revision '{workload_revision}'")
command = (f"opensearch-benchmark execute-test --distribution-version=1.0.0 "
f"--exclude-tasks=check-cluster-health "
f"--workload-revision={workload_revision} "
f"--target-host={self.endpoint} "
f"--workload={workload} "
f"--pipeline=benchmark-only "
"--test-mode --kill-running-processes "
f"--workload-params={workload_params} "
f"--client-options={client_options}")
# While a little wordier, this approach prevents us from censoring the password if it appears in other contexts,
# e.g. username:admin,password:admin.
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********")
logger.info(f"Executing command: {display_command}")
Expand Down
Loading

0 comments on commit 2962832

Please sign in to comment.