Skip to content

Commit

Permalink
Updates per PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma committed May 1, 2024
1 parent c3e13a1 commit 75691b9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,10 @@ public static void main(String[] args) throws InterruptedException {
Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;

int targetShardId = shardId; // Define in local context for the lambda
final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetConnection)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + targetShardId))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
// shouldn't be done quite this way in the real RFS Worker.
.block();
Expand Down
6 changes: 5 additions & 1 deletion RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;

import lombok.Lombok;
import reactor.core.publisher.Flux;


public class LuceneDocumentsReader {
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class);

Expand All @@ -35,7 +38,8 @@ public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName,
try {
reader.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close IndexReader", e);
logger.error("Failed to close IndexReader", e);
Lombok.sneakyThrow(e);
}
}
);
Expand Down
5 changes: 4 additions & 1 deletion RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public boolean hasBadStatusCode() {
}

public boolean hasFailedOperations() {
// The OpenSearch Bulk API response body is JSON and contains a top-level "errors" field that indicates
// whether any of the individual operations in the bulk request failed. Rather than marshalling the entire
// response as JSON, just check for the string value.
return body.contains("\"errors\":true");
}

Expand Down Expand Up @@ -83,7 +86,7 @@ private boolean createObjectIdempotent(String objectPath, ObjectNode settings){
client.put(objectPath, settings.toString(), false);
return true;
} else if (response.code == HttpURLConnection.HTTP_OK) {
logger.warn(objectPath + " already exists. Skipping creation.");
logger.info(objectPath + " already exists. Skipping creation.");
} else {
logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation.");
}
Expand Down

0 comments on commit 75691b9

Please sign in to comment.