Skip to content

Commit

Permalink
Fix stream
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi committed Jan 30, 2025
1 parent b484146 commit 85b334e
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,6 @@ public ReducedQueryPhase reducedFromStream(
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Error processing tickets in parallel", e);
} finally {
streamManager.close();
}

TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void finish() throws IOException {
if (currentRow[0] > 0) {
flushBatch();
}
root.close();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private boolean searchWithCollector(
throw new RuntimeException("StreamManager not setup");
}
final boolean[] isCancelled = { false };
final Schema[] schema = { null };
StreamTicket ticket = streamManager.registerStream(new StreamProducer() {

@Override
Expand Down Expand Up @@ -174,7 +175,7 @@ public void onCancel() {

@Override
public boolean isCancelled() {
return searchContext.isCancelled() || isCancelled();
return searchContext.isCancelled() || isCancelled[0];
}
};
}
Expand All @@ -185,8 +186,8 @@ public VectorSchemaRoot createRoot(BufferAllocator allocator) {
Field countField = new Field("count", FieldType.nullable(new ArrowType.Int(64, false)), null);
arrowFields.put("count", countField);
arrowFields.put("ord", new Field("ord", FieldType.nullable(new ArrowType.Utf8()), null));
Schema schema = new Schema(arrowFields.values());
return VectorSchemaRoot.create(schema, allocator);
schema[0] = new Schema(arrowFields.values());
return VectorSchemaRoot.create(schema[0], allocator);
}

@Override
Expand Down

0 comments on commit 85b334e

Please sign in to comment.