Skip to content

Commit

Permalink
Further work to support bulk delete, index create with type mapping m…
Browse files Browse the repository at this point in the history
…igration, and regex target index patterns.

Fix regex replace to use java's regex classes instead of the builtin ones.  The builtin filter uses Google's re2j library, which doesn't do backreferences.  Now both capture and replace use custom filters that use the builtin java regex library.  Tests are in place to do some index remapping w/ backreferenced captures.
For bulk APIs, only the delete command is fully supported, but the structure for the others should be in place.
I've also worked on improving the test for creating an index and fixed numerous issues there.

It's also CRITICAL to note that constructing dictionaries in jinjava with keys that are defined via variables is NOT SUPPORTED.  See HubSpot/jinjava#379.  The workaround that I'm currently using is to construct a map as a string in json and parse it into a map - or to construct maps dynamically.  Also note that python dictionary operations like 'update' or 'delete' are NOT present since we don't have the luxury of overriding the dictionary implementation easily.  This is also something that has a significant impact on the ease of use for jinjava and to maintain compatibility to the python implementation.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Dec 2, 2024
1 parent 4b32d42 commit 0712eae
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.util.function.Function;

import org.opensearch.migrations.transform.jinjava.DynamicMacroFunction;
import org.opensearch.migrations.transform.jinjava.JavaRegexCaptureFilter;
import org.opensearch.migrations.transform.jinjava.JavaRegexReplaceFilter;
import org.opensearch.migrations.transform.jinjava.NameMappingClasspathResourceLocator;
import org.opensearch.migrations.transform.jinjava.RegexCaptureFilter;
import org.opensearch.migrations.transform.jinjava.ThrowTag;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hubspot.jinjava.Jinjava;
Expand Down Expand Up @@ -38,17 +40,18 @@ public JinjavaTransformer(String templateString,
jinjava = new Jinjava();
this.createContextWithSourceFunction = createContextWithSource;
jinjava.setResourceLocator(resourceLocator);
jinjava.getGlobalContext().registerFilter(new RegexCaptureFilter());
var dynamicMacroFunction = new ELFunctionDefinition(
jinjava.getGlobalContext().registerFilter(new JavaRegexCaptureFilter());
jinjava.getGlobalContext().registerFilter(new JavaRegexReplaceFilter());

jinjava.getGlobalContext().registerFunction(new ELFunctionDefinition(
"",
"invoke_macro",
DynamicMacroFunction.class,
"invokeMacro",
String.class,
Object[].class
);

jinjava.getGlobalContext().registerFunction(dynamicMacroFunction);
));
jinjava.getGlobalContext().registerTag(new ThrowTag());
this.templateStr = templateString;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.hubspot.jinjava.lib.filter.Filter;
import lombok.SneakyThrows;

public class RegexCaptureFilter implements Filter {
public class JavaRegexCaptureFilter implements Filter {

private static LoadingCache<String, Pattern> regexCache =
CacheBuilder.newBuilder().build(CacheLoader.from((Function<String, Pattern>)Pattern::compile));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.migrations.transform.jinjava;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Function;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.hubspot.jinjava.interpret.JinjavaInterpreter;
import com.hubspot.jinjava.lib.filter.Filter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JavaRegexReplaceFilter implements Filter {

private static LoadingCache<String, Pattern> regexCache =
CacheBuilder.newBuilder().build(CacheLoader.from((Function<String, Pattern>)Pattern::compile));

@SneakyThrows
private static Pattern getCompiledPattern(String pattern) {
return regexCache.get(pattern);
}

@Override
public String getName() {
return "regex_replace";
}

@Override
public Object filter(Object var, JinjavaInterpreter interpreter, String... args) {
if (var == null || args.length < 2) {
return null;
}

String input = var.toString();
String pattern = args[0];
String replacement = args[1];

try {
Matcher matcher = getCompiledPattern(pattern).matcher(input);
var rval = matcher.replaceAll(replacement);
log.atError().setMessage("replaced value {}").addArgument(rval).log();
return rval;
} catch (Exception e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opensearch.migrations.transform.jinjava;

import com.hubspot.jinjava.doc.annotations.JinjavaDoc;
import com.hubspot.jinjava.doc.annotations.JinjavaParam;
import com.hubspot.jinjava.doc.annotations.JinjavaSnippet;
import com.hubspot.jinjava.interpret.JinjavaInterpreter;
import com.hubspot.jinjava.lib.tag.Tag;
import com.hubspot.jinjava.tree.TagNode;

@JinjavaDoc(
value = "Throws a runtime exception with the specified message",
params = {
@JinjavaParam(value = "message", type = "string", desc = "The error message to throw")
},
snippets = {
@JinjavaSnippet(
code = "{% throw 'Invalid input provided' %}"
)
}
)

public class ThrowTag implements Tag {
private static final String TAG_NAME = "throw";

@Override
public String getName() {
return TAG_NAME;
}

@Override
public String interpret(TagNode tagNode, JinjavaInterpreter interpreter) {
String message = interpreter.render(tagNode.getHelpers().trim());
throw new JinjavaThrowTagException(message);
}

public static class JinjavaThrowTagException extends RuntimeException {
public JinjavaThrowTagException(String message) {
super(message);
}
}

@Override
public String getEndTagName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
{%- set ns = namespace(value=features) -%}
{%- set debug = namespace(log=[]) -%}
{%- for key in (path | split('.')) -%}
{% set debug.log = debug.log + ["k:"+key] -%}
{% set debug.log = debug.log + ["ismapping?:"+(ns.value is mapping)] -%}
{%- set debug.log = debug.log + ["k:"+key] -%}
{%- set debug.log = debug.log + ["ismapping?:"+(ns.value is mapping)] -%}
{%- if ns.value is mapping and key in ns.value -%}
{%- set ns.value = ns.value[key] -%}
{%- else -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{% import "common/featureEnabled.j2" as fscope %}
{% macro route(input, field_to_match, feature_flags, default_action, routes) %}
{%- import "common/featureEnabled.j2" as fscope -%}
{%- import "common/featureEnabled.j2" as fscope -%}
{%- macro route(input, field_to_match, feature_flags, default_action, routes) -%}
{%- set ns = namespace(result=none, matched=false) -%}
{%- for pattern, action_fn, feature_name_param in routes if not ns.matched -%}
{% set feature_name = feature_name_param | default(action_fn) %}
{%- set feature_name = feature_name_param | default(action_fn) -%}
{%- if not ns.matched -%} {# we haven't found a match yet, otherwise skip the rest #}
{%- set match = field_to_match | regex_capture(pattern) -%}
{%- if match is not none -%}
Expand All @@ -18,4 +19,4 @@
{%- else -%}
{{- ns.result -}}
{%- endif -%}
{% endmacro %}
{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ dependencies {
testImplementation testFixtures(project(path: ':testHelperFixtures'))
testImplementation testFixtures(project(path: ':TrafficCapture:trafficReplayer'))

testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
testImplementation group: 'com.google.guava', name: 'guava'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-params'
testImplementation group: 'org.slf4j', name: 'slf4j-api'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{%- macro make_request() -%}
{ "method": "GET", "URI": "/" }
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{%- macro make_keep_json(source_and_mappings) -%}
{ "preserve": "*" }
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -1,99 +1,20 @@
{%- include "common/featureEnabled.j2" -%}

{%- macro preserve_original(source_and_mappings) -%}
{ "preserve": "*" }
{%- endmacro -%}

{%- macro make_no_op() -%}
{ "method": "GET", "URI": "/" }
{%- endmacro -%}

{%- macro rewrite_uri_to_strip_types(source_index, source_type, regex_index_mappings) -%}
{%- set ns = namespace(target_index=none) -%}
{%- for entry in regex_index_mappings -%}
{%- set idx_regex = entry.get(0) -%}
{%- set type_regex = entry[1] -%}
{%- set target_idx_pattern = entry[2] -%}

{%- if ns.target_index is none -%}
{%- set conjoined_source = source_index + "/" + source_type -%}
{%- set conjoined_regex = idx_regex + "/" + type_regex -%}
{%- set didMatch = conjoined_source | regex_capture(conjoined_regex) -%}
{%- if didMatch is not none -%}
{%- set ns.target_index = conjoined_source | regex_replace(conjoined_regex, target_idx_pattern) -%}
{%- endif -%}
{%- endif -%}
{%- endfor -%}
{{- ns.target_index -}}
{%- endmacro -%}

{%- macro rewrite_uri_for_types(match, input_map) -%}
{%- set target_index = (input_map.index_mappings[match.group1] | default({}))[match.group2] -%}
{%- if target_index is none %} {# not sure if default arguments would be eagerly evaluated #}
{%- set target_index = invoke_macro('rewrite_uri_to_strip_types', match.group1, match.group2, input_map.regex_index_mappings) -%}
{%- endif -%}
{%- if target_index is none -%}
{{ make_no_op() }}
{%- else -%}
{
"method": "{{ input_map.request.method }}",
"URI": "/{{ target_index }}/_doc/{{ match.group3 }}",
"preserve": ["headers","payload"]
}
{%- endif -%}
{%- endmacro -%}

{% macro rewrite_create_index_excise(match, input_map) -%}
{
"method": "{{ input_map.request.method }}",
"URI": "{{ input_map.index_mappings[match.group1] }}",
"payload": {
"inlinedJsonBody": {
{%- for key, value in input_map.request.body.items() -%}
{%- if key != "mappings" -%}
"{{ key }}": {{ value | tojson }},
{%- endif -%}
{%- endfor -%}
"mappings": {
"properties": {
"type": {
"type": "keyword"
}
{%- for type_name, type_props in input_map.request.body.mappings.items() -%}
{%- for prop_name, prop_def in type_props.properties.items() -%}
,
"{{- prop_name -}}": {{- prop_def | tojson -}}
{%- endfor -%}
{%- endfor -%}
}
}
}
}
}
{%- endmacro -%}
{%- import "common/route.j2" as rscope -%}
{%- import "typeMappings/preserveAll.j2" as preserve -%}
{%- include "typeMappings/rewriteDocumentRequest.j2" -%}
{%- include "typeMappings/rewriteBulkRequest.j2" -%}
{%- include "typeMappings/rewriteCreateIndexRequest.j2" -%}

{% macro rewrite_create_index(match, input_map) -%}
{% set target_mappings = input_map.index_mappings[match.group1] | default({}) | length %}
{% if target_mappings == 0 %}
{{ make_no_op() }}
{% elif num_mappings == 1 %}
{{ rewrite_create_index_excise(match, input_map) }}
{% elif num_mappings > 1 %}
{% else %}
{{ preserve_original(input_mappings) }}
{% endif %}
{%- endmacro -%}

{% set source_and_mappings = {
{%- set source_and_mappings = {
'request': request,
'index_mappings': index_mappings,
'regex_index_mappings': regex_index_mappings}
%}
{%- import "common/route.j2" as rscope -%}
{{- rscope.route(source_and_mappings, request.method + " " + request.URI, flags, 'preserve_original',
-%}
{{- rscope.route(source_and_mappings, request.method + " " + request.URI, flags, 'make_keep_json',
[
('(?:PUT|POST) /([^/]*)/([^/]*)/(.*)', 'rewrite_uri_for_types', 'rewrite_add_request_to_strip_types'),
( 'GET /([^/]*)/([^/]*)/.*', 'rewrite_uri_for_types', 'rewrite_get_request_to_strip_types'),
('(?:PUT|POST) /([^/]*)/([^/]*)/(.*)', 'rewrite_doc_request', 'rewrite_add_request_to_strip_types'),
( 'GET /([^/]*)/([^/]*)/.*', 'rewrite_doc_request', 'rewrite_get_request_to_strip_types'),
('(?:PUT|POST) /_bulk', 'rewrite_bulk', 'rewrite_bulk'),
('(?:PUT|POST) /([^/]*)', 'rewrite_create_index', 'rewrite_create_index')
])
-}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{%- include "typeMappings/rewriteIndexForTarget.j2" -%}
{%- import "typeMappings/rewriteIndexForTarget.j2" as transidx -%}

{%- macro run_create(command, target_index, doc) -%}
{%- endmacro -%}
{%- macro run_index(command, target_index, doc) -%}
{%- endmacro -%}

{%- macro run_delete(command, target_index) -%}
{%- if target_index -%}
{%- set ns = namespace(delete_inner={}) -%}
{%- for key, value in command.items() -%}
{%- if key != '_type' and key != '_index' -%}
{%- set inner_json = value | tojson -%}
{%- set jsonblob = ("{\"" + key + "\":" + inner_json + "}") | fromjson -%}
{%- set ns.delete_inner = ns.delete_inner + jsonblob -%}
{%- endif -%}
{%- endfor -%}
{%- set index_json = target_index | tojson -%}
{%- set index_blob = ("{\"_index\":" + index_json + "}") | fromjson -%}
{%- set ns.delete_inner = ns.delete_inner + index_blob -%}
{%- set final_json = ("{\"delete\":" + (ns.delete_inner | tojson) + "}") | fromjson -%}
{{ final_json | tojson }}
{%- endif -%}
{%- endmacro -%}

{%- macro run_update(command, target_index, doc) -%}
{%- endmacro -%}
{%- macro rewrite_bulk_for_default_source_index(uri_match, input_map, source_index) -%}
{
"preserve": ["headers","method","URI","protocol"],
"payload": {
"inlinedJsonSequenceBodies": [
{%- set operation_types = ['delete', 'update', 'index', 'create'] -%}
{%- for item in input_map.request.payload.inlinedJsonSequenceBodies -%}
{%- set operation = namespace(type=None) -%}
{%- for type in operation_types -%}
{%- if item is mapping and type in item -%}
{%- set operation.type = type -%}
{%- endif -%}
{%- endfor -%}

{%- if operation.type is not none -%}
{%- set command = item[operation.type] -%}
{%- set target_index = transidx.convert_source_index_to_target(command['_index'], command['_type'], input_map.index_mappings, input_map.regex_index_mappings) -%}
{# command['_index'] {{ command['_index'] }}, command['_type'] = {{ command['_type'] }}, input_map.index_mappings = {{ input_map.index_mappings }}, input_map.regex_index_mappings = {{ input_map.regex_index_mappings }})#}
{%- if operation.type == 'delete' -%}
{{ run_delete(command, target_index) }}
{%- else -%}
{%- if loop.index < operations|length -%}
{%- set next_item = operations[loop.index] -%}
{%- if operation.type == 'create' -%}
{{ run_create(command, target_index, next_item) }}
{%- elif operation.type == 'update' -%}
{{ run_update(command, target_index, next_item) }}
{%- elif operation.type == 'index' -%}
{{ run_index(command, target_index, next_item) }}
{%- endif -%}
{%- set loop.index = loop.index + 1 -%}
{%- else -%}
Handle case where there's no next item but one was expected
{# {{ throw_error('Expected document after ' + operation.type + ' operation') }}#}
{%- endif -%}
{%- endif -%}
{%- else -%}
Handle case where no valid operation type was found
{# {{ throw_error('Invalid operation type in item: ' + item|string) }}#}
{%- endif -%}
{%- endfor -%}
]
}
}
{%- endmacro -%}
{%- macro rewrite_bulk(match, input_map) -%}
{{ rewrite_bulk_for_default_source_index(match, input_map, none) }}
{%- endmacro -%}
Loading

0 comments on commit 0712eae

Please sign in to comment.