Skip to content

Commit

Permalink
fix processState bug, and update readme
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 10, 2023
1 parent 94e216e commit e7b4e81
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 28 deletions.
36 changes: 26 additions & 10 deletions data-prepper-plugins/kafka-connect-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,26 @@ Snapshot Mode [ref](https://debezium.io/documentation/reference/stable/connector
| table_name | YES | | String | The table name to ingest, using *schemaName.tableName* format. |

### MongoDB
| Option | Required | Default | Type | Description |
|-----------------|----------|----------|--------------|---------------------------------------------------------------------------------------------------------------------|
| hostname | YES | | String | The hostname of MySQL. |
| port | NO | 27017 | String | The port of MySQL. |
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. |
| snapshot_mode | NO | initial | String | MongoDB snapshot mode. |
| credentials | YES | | Credentials | The Credentials to access the database. |
| collections | YES | | List\<Collection\> | The collections to ingest CDC data. |
| force_update | NO | FALSE | Boolean | When restarting or updating a pipeline, whether to force all connectors to update their config even if the connector name already exists. By default, if the connector name exists, the config will not be updated. The connector name is `<topic_prefix>.<table_name>`. |
| Option | Required | Default | Type | Description |
|----------------|----------|---------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hostname | YES | | String | The hostname of MySQL. |
| port | NO | 27017 | String | The port of MySQL. |
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. |
| ingestion_mode | NO | export_stream | String | MongoDB ingestion mode. Available options: export_stream, stream, export |
| export_config | NO | | ExportConfig | The Export Config |
| credentials | YES | | Credentials | The Credentials to access the database. |
| collections | YES | | List\<Collection\> | The collections to ingest CDC data. |
| force_update | NO | FALSE | Boolean | When restarting or updating a pipeline, whether to force all connectors to update their config even if the connector name already exists. By default, if the connector name exists, the config will not be updated. The connector name is `<topic_prefix>.<table_name>`. |
Snapshot Mode [ref](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-snapshot-mode)

#### ExportConfig
| Option | Required | Default | Type | Description |
|---------------------|------------|--------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| acknowledgments | No | FALSE | Boolean | When true, enables the opensearch source to receive end-to-end acknowledgments when events are received by OpenSearch sinks. Default is false. |
| items_per_partition | No | 4000 | Long | Number of Items per partition during initial export. |
| read_preference | No | secondaryPreferred | String | Operations typically read data from secondary members of the replica set. If the replica set has only one single primary member and no other members, operations read data from the primary member. |


#### Collection

| Option | Required | Default | Type | Description |
Expand Down Expand Up @@ -272,7 +281,14 @@ Each connector contains following metrics:
- `source-record-active-count-avg`: Average number of records polled by the task but not yet completely written to Kafka
- `source-record-active-count`: Most recent number of records polled by the task but not yet completely written to Kafka

## Developer Guide
## MongoDB Export Metric
MongoDB export has the following metrics:
- `exportRecordsSuccessTotal`: Number of records writes to the Buffer layer successfully.
- `exportRecordsFailedTotal`: Number of records failed to write to the Buffer layer.
- `exportPartitionSuccessTotal`: Number of partition been processed successfully
- `exportPartitionFailureTotal`: Number of partition failed to be processed.

# Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@

public class MongoDBSnapshotProgressState {
@JsonProperty("totalRecords")
private int total;
private long total;
@JsonProperty("successRecords")
private long success;
@JsonProperty("failedRecords")
private long failed;

public int getTotal() {
public long getTotal() {
return total;
}

public long getSuccess() {
return success;
}

public long getFailed() {
return failed;
}

public void setTotal(int total) {
public void setTotal(long total) {
this.total = total;
}

public void setSuccess(long successRecords) {
this.success = successRecords;
}

public void setFailure(long failedRecords) {
this.failed = failedRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class MongoDBSnapshotWorker implements Runnable {
private static final String EVENT_SOURCE_COLLECTION_ATTRIBUTE = "__collection";
private static final String EVENT_SOURCE_DB_ATTRIBUTE = "__source_db";
private static final String EVENT_SOURCE_OPERATION = "__op";
private static final String EVENT_SOURCE_TS_MS = "__source_ts_ms";
private static final String EVENT_TYPE = "EXPORT";
private static int DEFAULT_BUFFER_WRITE_TIMEOUT_MS = 5000;
private final SourceCoordinator<MongoDBSnapshotProgressState> sourceCoordinator;
Expand Down Expand Up @@ -139,36 +140,43 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
MongoCollection<Document> col = db.getCollection(collection.get(1));
Bson query = MongoDBHelper.buildQuery(gte, lte, className);
MongoCursor<Document> cursor = col.find(query).iterator();
int totalRecords = 0;
long totalRecords = 0L;
long successRecords = 0L;
long failedRecords = 0L;
try {
while (cursor.hasNext()) {
String record = cursor.next().toJson();
try {
String record = cursor.next().toJson();
Map<String, Object> data = convertToMap(record);
data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0));
data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1));
data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString());
data.putIfAbsent(EVENT_SOURCE_TS_MS, 0);
if (buffer.isByteBuffer()) {
buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
} else {
buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
}
successItemsCounter.increment();
successRecords += 1;
} catch (JsonProcessingException e) {
LOG.error("failed to add record to buffer with error {}", e.getMessage());
failureItemsCounter.increment();
continue;
failedRecords += 1;
} finally {
totalRecords += 1;
}
successItemsCounter.increment();
totalRecords += 1;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
cursor.close();
final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState();
progressState.setTotal(totalRecords);
progressState.setSuccess(successRecords);
progressState.setFailure(failedRecords);
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState);
}
final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState();
progressState.setTotal(totalRecords);
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState);
}

private Optional<AcknowledgementSet> createAcknowledgementSet(SourcePartition<MongoDBSnapshotProgressState> partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public void test_shouldProcessPartitionSuccess(final String partitionKey) throws
List<Record<Object>> capturedData = ingestDataCapture.getAllValues();
String data1 = ((Event) capturedData.get(0).getData()).jsonBuilder().includeTags(null).toJsonString();
String data2 = ((Event) capturedData.get(1).getData()).jsonBuilder().includeTags(null).toJsonString();
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
}

@Test
Expand All @@ -137,8 +137,8 @@ public void test_shouldProcessPartitionSuccess_byteBuffer() throws Exception {
List<byte[]> capturedData = ingestDataCapture.getAllValues();
String data1 = new String(capturedData.get(0));
String data2 = new String(capturedData.get(1));
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
}

@Test
Expand All @@ -161,8 +161,8 @@ public void test_shouldProcessPartitionSuccess_ackEnabled() throws InterruptedEx
List<Record<Object>> capturedData = ingestDataCapture.getAllValues();
String data1 = ((Event) capturedData.get(0).getData()).jsonBuilder().includeTags(null).toJsonString();
String data2 = ((Event) capturedData.get(1).getData()).jsonBuilder().includeTags(null).toJsonString();
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}"));
assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\",\"__source_ts_ms\":0}"));
}

@Test
Expand Down Expand Up @@ -223,7 +223,9 @@ private void mockDependencyAndProcessPartition(String partitionKey, boolean shou
final ArgumentCaptor<MongoDBSnapshotProgressState> progressStateCapture = ArgumentCaptor.forClass(MongoDBSnapshotProgressState.class);
verify(sourceCoordinator, times(1)).saveProgressStateForPartition(eq(partitionKey), progressStateCapture.capture());
List<MongoDBSnapshotProgressState> progressStates = progressStateCapture.getAllValues();
assertThat(progressStates.get(0).getTotal(), is(2));
assertThat(progressStates.get(0).getTotal(), is(2L));
assertThat(progressStates.get(0).getSuccess(), is(2L));
assertThat(progressStates.get(0).getFailed(), is(0L));
}
}
}
Expand Down

0 comments on commit e7b4e81

Please sign in to comment.