Skip to content

Commit

Permalink
feat(emm): Add .ndjsonld.gz downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
olovy committed Nov 27, 2024
1 parent 2ae578d commit ffe7048
Showing 1 changed file with 105 additions and 10 deletions.
115 changes: 105 additions & 10 deletions emm/src/main/java/whelk/Dump.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import whelk.util.Unicode;
import whelk.util.http.HttpTools;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -26,12 +31,16 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;

import static whelk.EmmServlet.AS2_CONTENT_TYPE;
import static whelk.util.Jackson.mapper;

public class Dump {
/* Here is how these dumps work:
Expand All @@ -56,8 +65,10 @@ public class Dump {
* dump from one that is still being generated but just haven't gotten any further yet.
*/
private static final Logger logger = LogManager.getLogger(Dump.class);
private static final String DUMP_END_MARKER = "_DUMP_END_MARKER\n"; // Must be 17 bytes
private static final String DUMP_END_MARKER_NO_NEWLINE = "_DUMP_END_MARKER";
private static final String DUMP_END_MARKER = DUMP_END_MARKER_NO_NEWLINE + "\n"; // Must be 17 bytes
private static final String JSON_CONTENT_TYPE = "application/json";
private static final int GZIP_BUF_SIZE = 64 * 1024;

public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException {
String selection = req.getParameter("selection");
Expand All @@ -67,23 +78,29 @@ public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletR
return;
}

boolean isDownload = Collections.list(req.getParameterNames()).contains("download");
String offset = req.getParameter("offset");
if (offset == null) {
if (offset == null && !isDownload) {
sendDumpEntryPoint(apiBaseUrl, selection, res);
return;
}

long offsetNumeric = Long.parseLong(offset);

String tmpDir = System.getProperty("java.io.tmpdir");
Path dumpsPath = Paths.get(tmpDir, "dumps");
Files.createDirectories(dumpsPath);
Path dumpFilePath = dumpsPath.resolve(selection+".dump");

invalidateIfOld(dumpFilePath);
if (!Files.exists(dumpFilePath))
if (!Files.exists(dumpFilePath)) {
generateDump(whelk, selection, dumpFilePath);
sendDumpPageResponse(whelk, apiBaseUrl, selection, dumpFilePath, offsetNumeric, res);
}

if (isDownload) {
sendDumpDownloadResponse(whelk, apiBaseUrl, selection, dumpFilePath, res);
} else {
long offsetNumeric = Long.parseLong(offset);
sendDumpPageResponse(whelk, apiBaseUrl, selection, dumpFilePath, offsetNumeric, res);
}
}

private static void sendDumpEntryPoint(String apiBaseUrl, String selection, HttpServletResponse res) throws IOException {
Expand All @@ -93,6 +110,7 @@ private static void sendDumpEntryPoint(String apiBaseUrl, String selection, Http
responseObject.put("@context", contexts);
responseObject.put("type", "Collection");
responseObject.put("id", apiBaseUrl + "?selection=" + selection);
responseObject.put("url", apiBaseUrl + "?selection=" + selection + "&download");
var first = new LinkedHashMap<>();
first.put("type", "CollectionPage");
first.put("id", apiBaseUrl + "?selection=" + selection + "&offset=0");
Expand Down Expand Up @@ -225,10 +243,7 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String

var contextDoc = contextDoc(whelk);
if (offset == 0) {
items.add(Map.of(
JsonLd.ID_KEY, contextDoc.getRecordIdentifiers().getFirst(),
JsonLd.CONTEXT_KEY, contextDoc.data.get(JsonLd.CONTEXT_KEY)
));
items.add(wrapContextDoc(contextDoc));
}

Map<String, Document> idsAndRecords = whelk.bulkLoad(recordIdsOnPage);
Expand Down Expand Up @@ -268,6 +283,79 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String
HttpTools.sendResponse(res, responseObject, JSON_CONTENT_TYPE);
}

private static void sendDumpDownloadResponse(Whelk whelk, String apiBaseUrl, String dump, Path dumpFilePath, HttpServletResponse res) {
String filename = Unicode.stripSuffix(dumpFilePath.getFileName().toString(), ".dump") + ".ndjsonld.gz";
res.setHeader("Content-Disposition", "attachment; filename=" + filename);
res.setHeader("Content-Type", "application/octet-stream");

int batchSize = EmmChangeSet.TARGET_HITS_PER_PAGE;
try (GZIPOutputStream os = new GZIPOutputStream(new BufferedOutputStream(res.getOutputStream()), GZIP_BUF_SIZE)) {
res.flushBuffer();

var contextDoc = contextDoc(whelk);
writeJsonLdLines(wrapContextDoc(contextDoc), os);

// Has the dump not begun being written yet ?
var t = new Timeout(60 * 1000);
while (!Files.exists(dumpFilePath)) {
t.sleep();
}

try (BufferedReader r = new BufferedReader(new FileReader(dumpFilePath.toFile()))) {
var batch = new ArrayList<String>(batchSize);
while(true) {
String line = r.readLine();

// Caught up with the writer
if (line == null) {
t = new Timeout(60 * 1000);
do {
t.sleep();
line = r.readLine();
} while(line == null);
}

// Reached end of dump
if(DUMP_END_MARKER_NO_NEWLINE.equals(line)) {
break;
}

// Got ID
batch.add(line.trim());

if (batch.size() >= batchSize) {
writeJsonLdLines(whelk, batch, contextDoc, os);
batch = new ArrayList<>(batchSize);
}
}
writeJsonLdLines(whelk, batch, contextDoc, os);
res.flushBuffer();
}
} catch (Exception e) {
logger.info("Error sending dump download: {}", e.getMessage());
}
}

private static void writeJsonLdLines(Whelk whelk, Collection<String> ids, Document contextDoc, OutputStream os) throws IOException {
Map<String, Document> idsAndRecords = whelk.bulkLoad(ids);
for (Document doc : idsAndRecords.values()) {
if (doc.getDeleted()) {
continue;
}

writeJsonLdLines(wrapDoc(doc, contextDoc), os);
}
os.flush();
}

// TODO jackson2 can do json lines natively - upgrade?
// https://fasterxml.github.io/jackson-databind/javadoc/2.6/com/fasterxml/jackson/databind/SequenceWriter.html
private static void writeJsonLdLines(Object object, OutputStream os) throws IOException {
String json = mapper.writeValueAsString(object).replaceAll("\n", "");
os.write(json.getBytes(StandardCharsets.UTF_8));
os.write("\n".getBytes(StandardCharsets.UTF_8));
}

private static Object wrapDoc(Document doc, Document contextDoc) {
var context = new ArrayList<>();
context.add(null);
Expand All @@ -279,6 +367,13 @@ private static Object wrapDoc(Document doc, Document contextDoc) {
);
}

private static Object wrapContextDoc(Document contextDoc) {
return Map.of(
JsonLd.ID_KEY, contextDoc.getRecordIdentifiers().getFirst(),
JsonLd.CONTEXT_KEY, contextDoc.data.get(JsonLd.CONTEXT_KEY)
);
}

private static Document contextDoc(Whelk whelk) {
// FIXME whelk load by IRI
var docs = whelk.bulkLoad(List.of(whelk.getSystemContextUri()));
Expand Down

0 comments on commit ffe7048

Please sign in to comment.