Skip to content

Commit

Permalink
Added dumping and restoring associated docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
payammeyer committed Apr 4, 2020
1 parent 75ed670 commit 92b2edc
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 27 deletions.
65 changes: 59 additions & 6 deletions zulia-client/src/main/java/io/zulia/client/ZuliaRESTClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zulia.client;

import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.client.DefaultHttpClient;
Expand All @@ -12,11 +14,13 @@
import io.zulia.util.StreamHelper;
import org.bson.Document;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
Expand All @@ -26,6 +30,8 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

public class ZuliaRESTClient {
private String server;
Expand Down Expand Up @@ -75,12 +81,44 @@ public void fetchAssociated(String uniqueId, String indexName, String fileName,
}
}

protected HashMap<String, Object> createParameters(String uniqueId, String indexName, String fileName) {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put(ZuliaConstants.ID, uniqueId);
parameters.put(ZuliaConstants.FILE_NAME, fileName);
parameters.put(ZuliaConstants.INDEX, indexName);
return parameters;
public void fetchAssociated(String uniqueId, String indexName, OutputStream destination) throws IOException {
HttpURLConnection conn = null;
InputStream source = null;

try {
HashMap<String, Object> parameters = createParameters(uniqueId, indexName);

String url = HttpHelper.createRequestUrl(server, restPort, ZuliaConstants.ASSOCIATED_DOCUMENTS_ALL_FOR_ID_URL, parameters);
conn = createGetConnection(url);

handlePossibleError(conn);

BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
String output;

ZipOutputStream zipOutputStream = (ZipOutputStream) destination;

while ((output = br.readLine()) != null) {
JsonArray filenames = new JsonParser().parse(output).getAsJsonObject().getAsJsonArray("filenames");
for (int i = 0; i < filenames.size(); i++) {
String filename = filenames.get(0).getAsString();
parameters = createParameters(uniqueId, indexName, filename);

url = HttpHelper.createRequestUrl(server, restPort, ZuliaConstants.ASSOCIATED_DOCUMENTS_URL, parameters);
conn = createGetConnection(url);

handlePossibleError(conn);

source = conn.getInputStream();

zipOutputStream.putNextEntry(new ZipEntry(filename));
zipOutputStream.write(StreamHelper.getBytesFromStream(source));
}
}
}
finally {
closeStreams(source, destination, conn);
}
}

public void storeAssociated(String uniqueId, String indexName, String fileName, File fileToStore) throws Exception {
Expand Down Expand Up @@ -109,6 +147,21 @@ public void storeAssociated(String uniqueId, String indexName, String fileName,

}

private HashMap<String, Object> createParameters(String uniqueId, String indexName) {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put(ZuliaConstants.ID, uniqueId);
parameters.put(ZuliaConstants.INDEX, indexName);
return parameters;
}

private HashMap<String, Object> createParameters(String uniqueId, String indexName, String fileName) {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put(ZuliaConstants.ID, uniqueId);
parameters.put(ZuliaConstants.FILE_NAME, fileName);
parameters.put(ZuliaConstants.INDEX, indexName);
return parameters;
}

private void handlePossibleError(HttpURLConnection conn) throws IOException {
if (conn.getResponseCode() != ZuliaConstants.SUCCESS) {
byte[] bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@ public class FetchLargeAssociated extends RESTCommand<FetchLargeAssociatedResult

public FetchLargeAssociated(String uniqueId, String indexName, String fileName, File outputFile) {
this.uniqueId = uniqueId;
this.indexName = indexName;
this.fileName = fileName;
this.outputFile = outputFile;
this.indexName = indexName;
}

public FetchLargeAssociated(String uniqueId, String indexName, String fileName, OutputStream destination) {
this.uniqueId = uniqueId;
this.indexName = indexName;
this.fileName = fileName;
this.destination = destination;
}

public FetchLargeAssociated(String uniqueId, String indexName, OutputStream destination) {
this.uniqueId = uniqueId;
this.indexName = indexName;
this.destination = destination;
}

@Override
Expand All @@ -35,7 +41,12 @@ public FetchLargeAssociatedResult execute(ZuliaRESTClient zuliaRESTClient) throw
zuliaRESTClient.fetchAssociated(uniqueId, indexName, fileName, outputFile);
}
else if (destination != null) {
zuliaRESTClient.fetchAssociated(uniqueId, indexName, fileName, destination);
if (fileName != null) {
zuliaRESTClient.fetchAssociated(uniqueId, indexName, fileName, destination);
}
else {
zuliaRESTClient.fetchAssociated(uniqueId, indexName, destination);
}
}
else {
throw new Exception("A writer must be set");
Expand Down
1 change: 1 addition & 0 deletions zulia-common/src/main/java/io/zulia/ZuliaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public interface ZuliaConstants {
String POST = "POST";

String ASSOCIATED_DOCUMENTS_URL = "/associatedDocs";
String ASSOCIATED_DOCUMENTS_ALL_FOR_ID_URL = "/associatedDocs/allForId";
String QUERY_URL = "query";
String FETCH_URL = "fetch";
String FIELDS_URL = "fields";
Expand Down
29 changes: 27 additions & 2 deletions zulia-server/src/main/java/io/zulia/server/cmd/ZuliaCmdUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.mongodb.client.MongoClients;
import io.zulia.client.command.Query;
import io.zulia.client.command.Store;
import io.zulia.client.command.StoreLargeAssociated;
import io.zulia.client.pool.WorkPool;
import io.zulia.client.pool.ZuliaWorkPool;
import io.zulia.doc.ResultDocBuilder;
Expand All @@ -25,13 +26,18 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

public class ZuliaCmdUtil {

Expand Down Expand Up @@ -68,7 +74,8 @@ public static NodeService getNodeService(ZuliaConfig zuliaConfig) {
}
}

public static void writeOutput(String recordsFilename, String index, String q, int rows, ZuliaWorkPool workPool, AtomicInteger count) throws Exception {
public static void writeOutput(String recordsFilename, String index, String q, int rows, ZuliaWorkPool workPool, AtomicInteger count, String idField,
Set<String> uniqueIds) throws Exception {
try (FileWriter fileWriter = new FileWriter(new File(recordsFilename), Charsets.UTF_8)) {
Query zuliaQuery = new io.zulia.client.command.Query(index, q, rows);

Expand All @@ -78,6 +85,9 @@ public static void writeOutput(String recordsFilename, String index, String q, i

queryResult.getDocuments().forEach(doc -> {
try {
if (uniqueIds != null) {
uniqueIds.add(doc.getString(idField));
}
fileWriter.write(doc.toJson());
fileWriter.write(System.lineSeparator());

Expand All @@ -96,7 +106,8 @@ public static void writeOutput(String recordsFilename, String index, String q, i
}
}

public static void index(String recordsFilename, String idField, String index, ZuliaWorkPool workPool, AtomicInteger count) throws Exception {
public static void index(String inputDir, String recordsFilename, String idField, String index, ZuliaWorkPool workPool, AtomicInteger count)
throws Exception {
WorkPool threadPool = new WorkPool(4);
try (BufferedReader b = new BufferedReader(new FileReader(recordsFilename))) {
String line;
Expand Down Expand Up @@ -125,6 +136,20 @@ public static void index(String recordsFilename, String idField, String index, Z
store.setResultDocument(new ResultDocBuilder().setDocument(document));
workPool.store(store);

if (Files.exists(Paths.get(inputDir + File.separator + id.replaceAll("/", "_") + ".zip"))) {
try (ZipFile zipFile = new ZipFile(Paths.get(inputDir + File.separator + id.replaceAll("/", "_") + ".zip").toFile())) {
while (zipFile.entries().hasMoreElements()) {
ZipEntry entry = zipFile.entries().nextElement();
try {
workPool.storeLargeAssociated(new StoreLargeAssociated(id, index, entry.getName(), zipFile.getInputStream(entry)));
}
catch (Throwable t) {
LOG.log(Level.SEVERE, "Could not restore associated file <" + entry.getName() + ">", t);
}
}
}
}

int i = count.incrementAndGet();
if (i % 10000 == 0) {
LOG.info("So far indexed <" + i + "> for index <" + index + ">");
Expand Down
54 changes: 48 additions & 6 deletions zulia-server/src/main/java/io/zulia/server/cmd/ZuliaDump.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import com.beust.jcommander.ParameterException;
import com.google.common.base.Charsets;
import com.google.protobuf.util.JsonFormat;
import io.zulia.client.command.FetchLargeAssociated;
import io.zulia.client.command.GetIndexConfig;
import io.zulia.client.config.ZuliaPoolConfig;
import io.zulia.client.pool.ZuliaWorkPool;
import io.zulia.client.result.GetIndexesResult;
import io.zulia.log.LogUtil;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.ZipOutputStream;

public class ZuliaDump {

Expand All @@ -36,6 +41,12 @@ public static class ZuliaDumpArgs extends ZuliaBaseArgs {

@Parameter(names = "--rows", description = "Number of records to return. [Defaults to 1000]")
private Integer rows = 1000;

@Parameter(names = "--includeAssociatedDocs", description = "Include Associated Documents in the dump.")
private boolean includeAssociatedDocs = false;

@Parameter(names = "--idField", description = "ID Field Name. [Defaults to id]")
private String idField = "id";
}

public static void main(String[] args) {
Expand All @@ -55,6 +66,7 @@ public static void main(String[] args) {

String index = zuliaDumpArgs.index;
String indexes = zuliaDumpArgs.indexes;
boolean includeAssociatedDocs = zuliaDumpArgs.includeAssociatedDocs;

if (index == null && indexes == null) {
LOG.log(Level.SEVERE, "Please pass in an index name.");
Expand All @@ -65,25 +77,37 @@ public static void main(String[] args) {
String q = zuliaDumpArgs.q;
Integer rows = zuliaDumpArgs.rows;
String out = zuliaDumpArgs.out;
String idField = zuliaDumpArgs.idField;

Set<String> uniqueIds = new HashSet<>();

if (indexes != null) {

if (indexes.contains(",")) {
for (String ind : indexes.split(",")) {
queryAndWriteOutput(workPool, ind, q, rows, out);
queryAndWriteOutput(workPool, ind, q, rows, out, idField, uniqueIds);
if (includeAssociatedDocs) {
fetchAssociatedDocs(workPool, ind, out, uniqueIds);
}
}
}
else if (indexes.contains("*")) {
GetIndexesResult indexesResult = workPool.getIndexes();
for (String ind : indexesResult.getIndexNames()) {
if (ind.startsWith(indexes.replace("*", ""))) {
queryAndWriteOutput(workPool, ind, q, rows, out);
queryAndWriteOutput(workPool, ind, q, rows, out, idField, uniqueIds);
if (includeAssociatedDocs) {
fetchAssociatedDocs(workPool, ind, out, uniqueIds);
}
}
}
}
}
else {
queryAndWriteOutput(workPool, index, q, rows, out);
queryAndWriteOutput(workPool, index, q, rows, out, idField, uniqueIds);
if (includeAssociatedDocs) {
fetchAssociatedDocs(workPool, index, out, uniqueIds);
}
}

}
Expand All @@ -104,10 +128,11 @@ else if (indexes.contains("*")) {

}

private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, String q, Integer rows, String out) throws Exception {
private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, String q, Integer rows, String outputDir, String idField,
Set<String> uniqueIds) throws Exception {

// create zuliadump dir first
String zuliaDumpDir = out + File.separator + "zuliadump";
String zuliaDumpDir = outputDir + File.separator + "zuliadump";
if (!Files.exists(Paths.get(zuliaDumpDir))) {
Files.createDirectory(Paths.get(zuliaDumpDir));
}
Expand All @@ -123,7 +148,7 @@ private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, St

AtomicInteger count = new AtomicInteger();
LOG.info("Dumping index <" + index + ">");
ZuliaCmdUtil.writeOutput(recordsFilename, index, q, rows, workPool, count);
ZuliaCmdUtil.writeOutput(recordsFilename, index, q, rows, workPool, count, idField, uniqueIds);
LOG.info("Finished dumping index <" + index + ">, total: " + count);

try (FileWriter fileWriter = new FileWriter(new File(settingsFilename), Charsets.UTF_8)) {
Expand All @@ -135,4 +160,21 @@ private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, St

}

private static void fetchAssociatedDocs(ZuliaWorkPool workPool, String index, String outputDir, Set<String> uniqueIds) throws Exception {

String zuliaDumpDir = outputDir + File.separator + "zuliadump";
String indOutputDir = zuliaDumpDir + File.separator + index;

LOG.info("Starting to dump associated docs for <" + uniqueIds.size() + " documents.");
AtomicInteger count = new AtomicInteger(0);
for (String uniqueId : uniqueIds) {
workPool.fetchLargeAssociated(new FetchLargeAssociated(uniqueId, index,
new ZipOutputStream(new FileOutputStream(Paths.get(indOutputDir + File.separator + uniqueId.replaceAll("/", "_") + ".zip").toFile()))));
if (count.incrementAndGet() % 1000 == 0) {
LOG.info("Associated docs dumped so far: " + count);
}
}
LOG.info("Finished dumping associated docs for <" + uniqueIds.size() + " documents.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -101,7 +102,11 @@ else if (indexes.contains("*")) {
}

private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, String q, Integer rows, String out) throws Exception {
io.zulia.client.command.Query zuliaQuery;
queryAndWriteOutput(workPool, index, q, rows, out, null, null);
}

private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, String q, Integer rows, String out, String idField, Set<String> uniqueIds)
throws Exception {

// create zuliaexport dir first
String zuliaExportDir = out + File.separator + "zuliaexport";
Expand All @@ -119,7 +124,7 @@ private static void queryAndWriteOutput(ZuliaWorkPool workPool, String index, St

AtomicInteger count = new AtomicInteger();
LOG.info("Exporting from index <" + index + ">");
ZuliaCmdUtil.writeOutput(recordsFilename, index, q, rows, workPool, count);
ZuliaCmdUtil.writeOutput(recordsFilename, index, q, rows, workPool, count, idField, uniqueIds);
LOG.info("Finished exporting from index <" + index + ">, total: " + count);

}
Expand Down
Loading

0 comments on commit 92b2edc

Please sign in to comment.