Skip to content

Commit

Permalink
Continue after the ingestion of a product fails (#88)
Browse files Browse the repository at this point in the history
* Fixed overwrite option

* Ignore invalid product. Added failed product counter.

* Failed files counter

* Simplified if/else statement

* Changed '-o' option help text

Co-authored-by: Eugene <[email protected]>
Co-authored-by: thomas loubrieu <[email protected]>
  • Loading branch information
3 people authored Mar 31, 2022
1 parent ab55caf commit b8bb150
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
3 changes: 2 additions & 1 deletion src/main/java/gov/nasa/pds/harvest/HarvestCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public static void printHelp()
System.out.println(" -V, --version Print Harvest version");
System.out.println();
System.out.println("Optional parameters:");
System.out.println(" -o <dir> Output directory. Default is /tmp/harvest/out");
System.out.println(" -o <dir> Output directory (applies to supplemental products only).");
System.out.println(" Default is /tmp/harvest/out");
System.out.println(" -l <file> Log file. Default is /tmp/harvest/harvest.log");
System.out.println(" -v <level> Logger verbosity: DEBUG, INFO (default), WARNING, ERROR");
System.out.println(" -O, --overwrite Overwrite registered products");
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,18 @@ private void printSummary()
int processedCount = counter.prodCounters.getTotal();

log.log(LogUtils.LEVEL_SUMMARY, "Skipped files: " + counter.skippedFileCount);
log.log(LogUtils.LEVEL_SUMMARY, "Processed files: " + processedCount);
log.log(LogUtils.LEVEL_SUMMARY, "Loaded files: " + processedCount);

if(processedCount > 0)
{
log.log(LogUtils.LEVEL_SUMMARY, "File counts by type:");
for(CounterMap.Item item: counter.prodCounters.getCounts())
{
log.log(LogUtils.LEVEL_SUMMARY, " " + item.name + ": " + item.count);
}

log.log(LogUtils.LEVEL_SUMMARY, "Package ID: " + PackageIdGenerator.getInstance().getPackageId());
}

log.log(LogUtils.LEVEL_SUMMARY, "Failed files: " + counter.failedFileCount);
log.log(LogUtils.LEVEL_SUMMARY, "Package ID: " + PackageIdGenerator.getInstance().getPackageId());
}

}
1 change: 1 addition & 0 deletions src/main/java/gov/nasa/pds/harvest/crawler/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class Counter
{
public CounterMap prodCounters;
public int skippedFileCount;
public int failedFileCount;

public Counter()
{
Expand Down
40 changes: 25 additions & 15 deletions src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ private void onFile(File file) throws Exception
}
catch(Exception ex)
{
log.warn(ex.getMessage());
counter.skippedFileCount++;
log.error(ex.getMessage());
counter.failedFileCount++;
return;
}

Expand All @@ -173,7 +173,16 @@ else if(config.filters.prodClassExclude != null)
if(config.filters.prodClassExclude.contains(rootElement)) return;
}

processMetadata(file, doc);
// Process metadata
try
{
processMetadata(file, doc);
}
catch(Exception ex)
{
log.error(ex.getMessage());
counter.failedFileCount++;
}
}


Expand All @@ -193,22 +202,11 @@ private void processMetadata(File file, Document doc) throws Exception

String rootElement = doc.getDocumentElement().getNodeName();

// Process Collection specific data
if("Product_Collection".equals(rootElement))
{
processInventoryFiles(file, doc, meta);
}
// Process Bundle specific data
else if("Product_Bundle".equals(rootElement))
if("Product_Bundle".equals(rootElement))
{
addCollectionRefs(meta, doc);
}
// Process supplemental products
else if("Product_Metadata_Supplemental".equals(rootElement))
{
SupplementalWriter swriter = WriterManager.getInstance().getSupplementalWriter();
swriter.write(file);
}

// Internal references
refExtractor.addRefs(meta.intRefs, doc);
Expand All @@ -227,6 +225,18 @@ else if("Product_Metadata_Supplemental".equals(rootElement))

// Save data
save(meta, nsInfo);

// Process Collection inventory
if("Product_Collection".equals(rootElement))
{
processInventoryFiles(file, doc, meta);
}
// Process supplemental products
else if("Product_Metadata_Supplemental".equals(rootElement))
{
SupplementalWriter swriter = WriterManager.getInstance().getSupplementalWriter();
swriter.write(file);
}
}


Expand Down
13 changes: 11 additions & 2 deletions src/main/java/gov/nasa/pds/harvest/crawler/ProductProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void onFile(File file) throws Exception
catch(Exception ex)
{
log.warn(ex.getMessage());
counter.skippedFileCount++;
counter.failedFileCount++;
return;
}

Expand All @@ -156,7 +156,16 @@ else if(config.filters.prodClassExclude != null)
if(config.filters.prodClassExclude.contains(rootElement)) return;
}

processMetadata(file, doc);
// Process metadata
try
{
processMetadata(file, doc);
}
catch(Exception ex)
{
log.error(ex.getMessage());
counter.failedFileCount++;
}
}


Expand Down
22 changes: 19 additions & 3 deletions src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -89,6 +90,7 @@ private void writeBatch() throws Exception
}
}

// Build JSON documents for Elasticsearch
List<String> data = new ArrayList<>();

for(RegistryDocBatch.NJsonItem item: docBatch.getItems())
Expand All @@ -111,9 +113,25 @@ private void writeBatch() throws Exception
}
}

totalRecords += loader.loadBatch(data);
// Load batch
Set<String> failedIds = new TreeSet<>();
totalRecords += loader.loadBatch(data, failedIds);
log.info("Wrote " + totalRecords + " product(s)");

// Update failed counter
counter.failedFileCount += failedIds.size();

// Update product counters
for(RegistryDocBatch.NJsonItem item: docBatch.getItems())
{
if((nonRegisteredIds == null && !failedIds.contains(item.lidvid)) ||
(nonRegisteredIds != null && nonRegisteredIds.contains(item.lidvid) && !failedIds.contains(item.lidvid)))
{
counter.prodCounters.inc(item.prodClass);
}
}

// Clear batch
docBatch.clear();
}

Expand All @@ -122,8 +140,6 @@ private void addItem(List<String> data, RegistryDocBatch.NJsonItem item)
{
data.add(item.pkJson);
data.add(item.dataJson);

counter.prodCounters.inc(item.prodClass);
}


Expand Down

0 comments on commit b8bb150

Please sign in to comment.