Skip to content

Commit

Permalink
New document Transformer feature
Browse files Browse the repository at this point in the history
  • Loading branch information
mhelmstetter committed Dec 5, 2017
1 parent 420f7ee commit fe1a40e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
46 changes: 43 additions & 3 deletions src/main/java/com/johnlpage/mongosyphon/DocumentGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.bson.Document;
import org.json.JSONObject;
Expand All @@ -24,6 +25,8 @@ public class DocumentGenerator {
Document section = null;
Document template = null;
Document params = null;
List<?> documentTransformersConfig = null;
List<IDocumentTransformer> documentTransformers = new ArrayList<IDocumentTransformer>();
String targetMode = null;
MongoBulkWriter mongoTarget = null;
Boolean hasRows = false;
Expand Down Expand Up @@ -56,6 +59,10 @@ public class DocumentGenerator {
}
this.section.put("source", parentSource);
}
this.documentTransformersConfig = this.section.get("documentTransformers", List.class);
if (this.documentTransformersConfig != null) {
initTransformers();
}
connectToSource(this.section.get("source", Document.class));
connectToTarget(this.section.get("target", Document.class));
}
Expand Down Expand Up @@ -90,7 +97,8 @@ private void connectToTarget(Document target) {
}
if (targetMode.equalsIgnoreCase("insert")
|| targetMode.equalsIgnoreCase("update")
|| targetMode.equalsIgnoreCase("upsert")) {
|| targetMode.equalsIgnoreCase("upsert")
|| targetMode.equalsIgnoreCase("save")) {
String targetURI = target.getString("uri");
if (targetURI == null) {
logger.error("Target needs a URI in section " + sectionName);
Expand Down Expand Up @@ -157,11 +165,13 @@ public void runConversion() {
int lastcount = 0;
int currcount = 0;
while (doc != null) {
for (IDocumentTransformer t : documentTransformers) {
t.transform(doc);
}
if (targetMode.equalsIgnoreCase("subsection")) {

String subsectionName = section.get("target", Document.class)
.getString("uri");

DocumentGenerator subgen;
if (docGens.containsKey(subsectionName)) {
subgen = docGens.get(subsectionName);
Expand Down Expand Up @@ -196,7 +206,9 @@ public void runConversion() {
mongoTarget.Update(doc, false);
} else if (targetMode.equalsIgnoreCase("upsert")) {
mongoTarget.Update(doc, true);
} else {
} else if (targetMode.equalsIgnoreCase("save")) {
mongoTarget.Save(doc);
} else {
logger.error("Unknown mode " + targetMode);
logger.error(
"Should be one of insert,update,upsert,JSON,XML");
Expand Down Expand Up @@ -439,4 +451,32 @@ private Document TemplateRow(Document template, Document row) {
}
return rval;
}

private void initTransformers() {
for (Object transConfig : documentTransformersConfig) {
if (transConfig instanceof Document) {
Document transConfigDoc = (Document)transConfig;
String className = transConfigDoc.getString("className");
if (className != null) {
try {
Object transformer = Class.forName(className).newInstance();
if (transformer instanceof IDocumentTransformer) {
this.documentTransformers.add((IDocumentTransformer)transformer);
} else {
logger.warn("documentTransformer not instance of IDocumentTransformer, ignoring");
}
} catch (Exception e) {
logger.error("Error instantiating documentTransformer " + className, e);
System.exit(1);
}
}
} else {
logger.warn(
String.format("Invalid documentTransformers config, expected Document but was %s. Ignoring.",
transConfig.getClass().getName()));
}
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.johnlpage.mongosyphon;

import org.bson.Document;

public interface IDocumentTransformer {

public void transform(Document source);

}
13 changes: 13 additions & 0 deletions src/main/java/com/johnlpage/mongosyphon/MongoBulkWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
Expand Down Expand Up @@ -79,6 +80,18 @@ public void Update(Document doc, boolean upsert)
FlushOpsIfFull();
}

public void Save(Document doc) {
if (!doc.containsKey("_id")) {
Create(doc);
return;
}
Document find = new Document("_id", doc.get("_id"));
UpdateOptions uo = new UpdateOptions();
uo.upsert(true);
ops.add(new ReplaceOneModel<Document>(find, doc, uo));
FlushOpsIfFull();
}

private void FlushOpsIfFull()
{
boolean fatalerror = false;
Expand Down

0 comments on commit fe1a40e

Please sign in to comment.