Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify tsdb track to allow ingesting into a data stream #275

Merged
merged 22 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions tsdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,33 @@ rm -rf tmp
head -n 1000 documents-sorted.json > documents-sorted-1k.json
```

Finally you'll also need a deduped version of the data in order to to support the `ingest_mode` that
benchmarks ingesting into a tsdb data stream (`data_stream`). Use the `dedupe.py` tool in the
`_tools` directory. This tool needs `documents-sorted.json` as input via standard in and generates a
deduped variant via standard out.

```
cat documents-sorted.json | dedupe.py > documents-sorted-deduped.json
```

The `dedupe.py` tool also generates other files started with `dupes-` prefix.
These files contain the duplicates that are filtered out of the lines being
redirected to standard out. These files can optionally be manually checked for
whether these files contain lines that are truely duplicates.

Also generate a `documents-sorted-deduped-1k.json` file for easy testing:
```
head -n 1000 documents-sorted-deduped.json > documents-sorted-deduped-1k.json
```

Now zip everything up:
```
pbzip2 documents-1k.json
pbzip2 documents-sorted-1k.json
pbzip2 documents.json
pbzip2 documents-sorted.json
documents-sorted-deduped.json
documents-sorted-deduped-1k.json
```

Now upload all of that to the AWS location from `track.json`.
Expand All @@ -145,6 +166,7 @@ This track allows to overwrite the following parameters using `--track-params`:
* `codec` (default: default): The codec to use compressing the index. `default` uses more space and less cpu. `best_compression` uses less space and more cpu.
* `ingest_order` (default: jumbled): Should the data be loaded in `sorted` order or a more `jumbled`, mostly random order.
* `synthetic_source` (default: false): Should we enable `synthetic` _source to save space?
* `ingest_mode` (default: index) Should be `data_stream` to benchmark ingesting into a tsdb data stream.

### License

Expand Down
111 changes: 111 additions & 0 deletions tsdb/_tools/dedupe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env python3

####################################################################
#
# A tool that dedupes a sorted anonymized metricbeat dump.
#
####################################################################
#
# Expects sorted anonymized metricbeat dump as input via standard
# in and returns a deduped sorted anonymized metric beat output via
# standard out. Also seperately generates 'dupes-' prefixed files
# per metric set name containing the dupes for manual inspection.
#
####################################################################

import json
import sys

def generate_event_key(parsed_line):
return parsed_line['kubernetes']['event']['involved_object']['uid']

def generate_state_container_key(parsed_line):
key = parsed_line['kubernetes']['container']['name']
key += parsed_line['kubernetes']['pod']['name']
key += parsed_line['kubernetes']['node']['name']
container_id = parsed_line.get('kubernetes',{}).get('container', {}).get('id')
if (container_id is not None):
key += container_id
return key

def generate_state_pod_key(parsed_line):
return parsed_line['kubernetes']['pod']['name'] + generate_node_key(parsed_line)

def generate_container_key(parsed_line):
return parsed_line['kubernetes']['container']['name'] + parsed_line['kubernetes']['pod']['name'] + generate_node_key(parsed_line)

def generate_volume_key(parsed_line):
return parsed_line['kubernetes']['volume']['name'] + parsed_line['kubernetes']['pod']['name'] + generate_node_key(parsed_line)

def generate_pod_key(parsed_line):
return parsed_line['kubernetes']['pod']['name'] + generate_node_key(parsed_line)

def generate_node_key(parsed_line):
Copy link
Contributor

@michaelbaamonde michaelbaamonde Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor thing, but this function and generate_state_node_key are identical. There's also several places where you could replace parsed_line['kubernetes']['node']['name'] with generate_node_key(parsed_line) just to cut down on duplication if you'd like. Not a big deal, though.

return parsed_line['kubernetes']['node']['name']

def generate_system_key(parsed_line):
return generate_node_key(parsed_line) + parsed_line['kubernetes']['system']['container']

def generate_state_node_key(parsed_line):
return generate_node_key(parsed_line)

generate_key_functions = {
'event': generate_event_key,
'state_container': generate_state_container_key,
'state_pod': generate_state_pod_key,
'container': generate_container_key,
'volume': generate_volume_key,
'pod': generate_pod_key,
'node': generate_node_key,
'system': generate_system_key,
'state_node': generate_state_node_key
}

in_count = 0
error_count = 0
out_count = 0
current_timestamp = None
keys = set()

dupe_files = {}

with open('error_lines.json', 'a') as error_file:
for line in sys.stdin:
in_count += 1
try:
parsed = json.loads(line)
line_timestamp = parsed['@timestamp']
metric_set_name = parsed['metricset']['name']
if parsed.get('error') is not None:
error_count += 1
print(line, file=error_file)
continue

generate_key_function = generate_key_functions[metric_set_name]
key = metric_set_name + generate_key_function(parsed)
if (current_timestamp == line_timestamp):
if key in keys:
dupe_file_name = f"dupes-{metric_set_name}.json"
dupe_file = dupe_files.get(dupe_file_name)
if dupe_file is None:
dupe_file = open(dupe_file_name, 'a')
dupe_files[dupe_file_name] = dupe_file

print(line, file=dupe_file)
continue
else:
keys.add(key)
else:
current_timestamp = line_timestamp
keys = set()
keys.add(key)

print(line, end='')
out_count += 1
if out_count % 100000 == 0:
print(f"in {in_count:012d} docs, out {out_count:012d} docs, errors {error_count:012d}", file=sys.stderr)
except Exception as e:
raise Exception(f"Error processing {line}") from e

for dupe_file in dupe_files:
dupe_files[dupe_file].close()
50 changes: 50 additions & 0 deletions tsdb/challenges/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,55 @@
"description": "Indexes the whole document corpus.",
"default": true,
"schedule": [
{%- if ingest_mode is defined and ingest_mode == "data_stream" %}
{
"name": "put-timestamp-pipeline",
"operation": {
"operation-type": "put-pipeline",
"id": "timestamp_pipeline",
"body": {
"processors": [
{
"set": {
"field": "now",
"value": {{'"{{_ingest.timestamp}}"'}}
}
},
{
"script": {
"source": "ZonedDateTime base = ZonedDateTime.parse('2021-04-28T17:18:23.410Z');long numDays = Duration.between(base, ZonedDateTime.parse(ctx['now'])).toDays();ZonedDateTime timestamp = ZonedDateTime.parse(ctx['@timestamp']);ctx['@timestamp']=timestamp.plusDays(numDays).truncatedTo(ChronoUnit.MILLIS).toString();"
}
},
{
"remove": {
"field": "now"
}
}
]
}
}
},
{
"name": "create-all-templates",
"operation": {
"operation-type": "create-composable-template",
"request-params": {
"create": "true"
}
}
},
{
"name": "check-cluster-health",
"operation": {
"operation-type": "cluster-health",
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{%- else %}
{
"operation": "delete-index"
},
Expand All @@ -24,6 +73,7 @@
"retry-until-success": true
}
},
{%- endif %}
{
"operation": "index",
"warmup-time-period": 240,
Expand Down
Loading