Skip to content

Commit

Permalink
Fix class_cast_exception when passing int to _version and other metad…
Browse files Browse the repository at this point in the history
…ata fields in ingest simulate API (opensearch-project#10101)

* Fix class_cast_exception when passing int to _version and other metadata fields in ingest simulate API

Signed-off-by: Gao Binlong <[email protected]>

* modify change log

Signed-off-by: Gao Binlong <[email protected]>

* Add more tests

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>
Co-authored-by: Daniel (dB.) Doubrovkine <[email protected]>
(cherry picked from commit fdaa438)
  • Loading branch information
gaobinlong committed Oct 7, 2023
1 parent 4cfcedd commit b7edbe7
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated
### Removed
### Fixed
- Fix class_cast_exception when passing int to _version and other metadata fields in ingest simulate API ([#10101](https://github.com/opensearch-project/OpenSearch/pull/10101))

### Security

[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/1.3.12...HEAD
Original file line number Diff line number Diff line change
Expand Up @@ -992,3 +992,140 @@ teardown:
}
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }

---
"Test simulate with docs containing metadata fields":
- do:
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field": "field2",
"value": "foo"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_routing": "foo",
"_version": 100,
"_if_seq_no": 12333333333333333,
"_if_primary_term": 1,
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "index" }
- match: { docs.0.doc._id: "id" }
- match: { docs.0.doc._routing: "foo" }
- match: { docs.0.doc._version: "100" }
- match: { docs.0.doc._if_seq_no: "12333333333333333" }
- match: { docs.0.doc._if_primary_term: "1" }
- match: { docs.0.doc._source.foo: "bar" }

- do:
catch: bad_request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "foo"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_routing": "foo",
"_version": "bar",
"_source": {
"foo": "bar"
}
}
]
}
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Failed to parse parameter [_version], only int or long is accepted" }

- do:
catch: bad_request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "foo"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_routing": "foo",
"_if_seq_no": "123",
"_source": {
"foo": "bar"
}
}
]
}
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Failed to parse parameter [_if_seq_no], only int or long is accepted" }

- do:
catch: bad_request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "foo"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_routing": "foo",
"_if_primary_term": "1",
"_source": {
"foo": "bar"
}
}
]
}
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Failed to parse parameter [_if_primary_term], only int or long is accepted" }
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
String routing = ConfigurationUtils.readOptionalStringOrIntProperty(null, null, dataMap, Metadata.ROUTING.getFieldName());
Long version = null;
if (dataMap.containsKey(Metadata.VERSION.getFieldName())) {
version = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.VERSION.getFieldName());
Object versionFieldValue = ConfigurationUtils.readObject(null, null, dataMap, Metadata.VERSION.getFieldName());
if (versionFieldValue instanceof Integer || versionFieldValue instanceof Long) {
version = ((Number) versionFieldValue).longValue();
} else {
throw new IllegalArgumentException("Failed to parse parameter [_version], only int or long is accepted");
}
}
VersionType versionType = null;
if (dataMap.containsKey(Metadata.VERSION_TYPE.getFieldName())) {
Expand All @@ -215,12 +220,25 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
}
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, document);
if (dataMap.containsKey(Metadata.IF_SEQ_NO.getFieldName())) {
Long ifSeqNo = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_SEQ_NO.getFieldName());
ingestDocument.setFieldValue(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo);
Object ifSeqNoFieldValue = ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_SEQ_NO.getFieldName());
if (ifSeqNoFieldValue instanceof Integer || ifSeqNoFieldValue instanceof Long) {
ingestDocument.setFieldValue(Metadata.IF_SEQ_NO.getFieldName(), ((Number) ifSeqNoFieldValue).longValue());
} else {
throw new IllegalArgumentException("Failed to parse parameter [_if_seq_no], only int or long is accepted");
}
}
if (dataMap.containsKey(Metadata.IF_PRIMARY_TERM.getFieldName())) {
Long ifPrimaryTerm = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_PRIMARY_TERM.getFieldName());
ingestDocument.setFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm);
Object ifPrimaryTermFieldValue = ConfigurationUtils.readObject(
null,
null,
dataMap,
Metadata.IF_PRIMARY_TERM.getFieldName()
);
if (ifPrimaryTermFieldValue instanceof Integer || ifPrimaryTermFieldValue instanceof Long) {
ingestDocument.setFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), ((Number) ifPrimaryTermFieldValue).longValue());
} else {
throw new IllegalArgumentException("Failed to parse parameter [_if_primary_term], only int or long is accepted");
}
}
ingestDocumentList.add(ingestDocument);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,28 @@ private void innerTestParseWithProvidedPipeline(boolean useExplicitType) throws
);
for (IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Long value = randomLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
if (randomBoolean()) {
Long value = randomLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
Integer value = randomIntBetween(1, 1000000);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
}
} else if (field == VERSION_TYPE) {
String value = VersionType.toString(randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE));
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == IF_SEQ_NO || field == IF_PRIMARY_TERM) {
Long value = randomNonNegativeLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
if (randomBoolean()) {
Long value = randomNonNegativeLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
Integer value = randomIntBetween(1, 1000000);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == TYPE) {
if (useExplicitType) {
String value = randomAlphaOfLengthBetween(1, 10);
Expand Down Expand Up @@ -332,4 +343,40 @@ public void testNotValidDocs() {
);
assertThat(e3.getMessage(), containsString("required property is missing"));
}

public void testNotValidMetadataFields() {
List<IngestDocument.Metadata> fields = Arrays.asList(VERSION, IF_SEQ_NO, IF_PRIMARY_TERM);
for (IngestDocument.Metadata field : fields) {
String metadataFieldName = field.getFieldName();
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Map<String, Object> doc = new HashMap<>();
doc.put(metadataFieldName, randomAlphaOfLengthBetween(1, 10));
doc.put(Fields.SOURCE, Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
docs.add(doc);

Map<String, Object> pipelineConfig = new HashMap<>();
List<Map<String, Object>> processors = new ArrayList<>();
Map<String, Object> processorConfig = new HashMap<>();
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int j = 0; j < numOnFailureProcessors; j++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
processorConfig.put("on_failure", onFailureProcessors);
}
processors.add(Collections.singletonMap("mock_processor", processorConfig));
pipelineConfig.put("processors", processors);

requestContent.put(Fields.PIPELINE, pipelineConfig);

assertThrows(
"Failed to parse parameter [" + metadataFieldName + "], only int or long is accepted",
IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService)
);
}
}
}

0 comments on commit b7edbe7

Please sign in to comment.