From fe1a40e2b85d3378819dc08fcf5fc974925bfe41 Mon Sep 17 00:00:00 2001 From: mhelmstetter Date: Tue, 5 Dec 2017 13:26:39 -0800 Subject: [PATCH] New document Transformer feature --- .../mongosyphon/DocumentGenerator.java | 46 +++++++++++++++++-- .../mongosyphon/IDocumentTransformer.java | 9 ++++ .../mongosyphon/MongoBulkWriter.java | 13 ++++++ 3 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/johnlpage/mongosyphon/IDocumentTransformer.java diff --git a/src/main/java/com/johnlpage/mongosyphon/DocumentGenerator.java b/src/main/java/com/johnlpage/mongosyphon/DocumentGenerator.java index ac30fed..0969913 100644 --- a/src/main/java/com/johnlpage/mongosyphon/DocumentGenerator.java +++ b/src/main/java/com/johnlpage/mongosyphon/DocumentGenerator.java @@ -3,6 +3,7 @@ import java.io.FileOutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import org.bson.Document; import org.json.JSONObject; @@ -24,6 +25,8 @@ public class DocumentGenerator { Document section = null; Document template = null; Document params = null; + List documentTransformersConfig = null; + List documentTransformers = new ArrayList(); String targetMode = null; MongoBulkWriter mongoTarget = null; Boolean hasRows = false; @@ -56,6 +59,10 @@ public class DocumentGenerator { } this.section.put("source", parentSource); } + this.documentTransformersConfig = this.section.get("documentTransformers", List.class); + if (this.documentTransformersConfig != null) { + initTransformers(); + } connectToSource(this.section.get("source", Document.class)); connectToTarget(this.section.get("target", Document.class)); } @@ -90,7 +97,8 @@ private void connectToTarget(Document target) { } if (targetMode.equalsIgnoreCase("insert") || targetMode.equalsIgnoreCase("update") - || targetMode.equalsIgnoreCase("upsert")) { + || targetMode.equalsIgnoreCase("upsert") + || targetMode.equalsIgnoreCase("save")) { String targetURI = target.getString("uri"); if (targetURI == null) { logger.error("Target needs a URI in section " + sectionName); @@ -157,11 +165,13 @@ public void runConversion() { int lastcount = 0; int currcount = 0; while (doc != null) { + for (IDocumentTransformer t : documentTransformers) { + t.transform(doc); + } if (targetMode.equalsIgnoreCase("subsection")) { String subsectionName = section.get("target", Document.class) .getString("uri"); - DocumentGenerator subgen; if (docGens.containsKey(subsectionName)) { subgen = docGens.get(subsectionName); @@ -196,7 +206,9 @@ public void runConversion() { mongoTarget.Update(doc, false); } else if (targetMode.equalsIgnoreCase("upsert")) { mongoTarget.Update(doc, true); - } else { + } else if (targetMode.equalsIgnoreCase("save")) { + mongoTarget.Save(doc); + } else { logger.error("Unknown mode " + targetMode); logger.error( "Should be one of insert,update,upsert,JSON,XML"); @@ -439,4 +451,32 @@ private Document TemplateRow(Document template, Document row) { } return rval; } + + private void initTransformers() { + for (Object transConfig : documentTransformersConfig) { + if (transConfig instanceof Document) { + Document transConfigDoc = (Document)transConfig; + String className = transConfigDoc.getString("className"); + if (className != null) { + try { + Object transformer = Class.forName(className).newInstance(); + if (transformer instanceof IDocumentTransformer) { + this.documentTransformers.add((IDocumentTransformer)transformer); + } else { + logger.warn("documentTransformer not instance of IDocumentTransformer, ignoring"); + } + } catch (Exception e) { + logger.error("Error instantiating documentTransformer " + className, e); + System.exit(1); + } + } + } else { + logger.warn( + String.format("Invalid documentTransformers config, expected Document but was %s. Ignoring.", + transConfig.getClass().getName())); + } + } + + + } } diff --git a/src/main/java/com/johnlpage/mongosyphon/IDocumentTransformer.java b/src/main/java/com/johnlpage/mongosyphon/IDocumentTransformer.java new file mode 100644 index 0000000..08b78b5 --- /dev/null +++ b/src/main/java/com/johnlpage/mongosyphon/IDocumentTransformer.java @@ -0,0 +1,9 @@ +package com.johnlpage.mongosyphon; + +import org.bson.Document; + +public interface IDocumentTransformer { + + public void transform(Document source); + +} diff --git a/src/main/java/com/johnlpage/mongosyphon/MongoBulkWriter.java b/src/main/java/com/johnlpage/mongosyphon/MongoBulkWriter.java index 64bed60..4eb1e70 100644 --- a/src/main/java/com/johnlpage/mongosyphon/MongoBulkWriter.java +++ b/src/main/java/com/johnlpage/mongosyphon/MongoBulkWriter.java @@ -17,6 +17,7 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.ReplaceOneModel; import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; @@ -79,6 +80,18 @@ public void Update(Document doc, boolean upsert) FlushOpsIfFull(); } + public void Save(Document doc) { + if (!doc.containsKey("_id")) { + Create(doc); + return; + } + Document find = new Document("_id", doc.get("_id")); + UpdateOptions uo = new UpdateOptions(); + uo.upsert(true); + ops.add(new ReplaceOneModel(find, doc, uo)); + FlushOpsIfFull(); + } + private void FlushOpsIfFull() { boolean fatalerror = false;