Skip to content

Commit

Permalink
fix(scala): add chunked batch helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Fluf22 committed Jun 18, 2024
1 parent 966acf9 commit 05ac1d1
Showing 1 changed file with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,40 @@ package object extension {
Future.successful(true)
}

def chunkedBatch(
indexName: String,
records: Seq[Any],
action: Action = Action.AddObject,
waitForTasks: Boolean,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
var futures = Seq.empty[Future[BatchResponse]]
records.grouped(batchSize).foreach { chunk =>
val requests = chunk.map { record =>
BatchRequest(action = action, body = record)
}
val future = client.batch(
indexName = indexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions
)
futures = futures :+ future
}

val responses = Future.sequence(futures)

if (waitForTasks) {
responses.foreach { tasks =>
tasks.foreach { task =>
client.waitTask(indexName, task.taskID, requestOptions = requestOptions)
}
}
}

responses
}

/** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime. Internally, this method copies the existing index
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
Expand All @@ -211,10 +245,9 @@ package object extension {
def replaceAllObjects(
indexName: String,
records: Seq[Any],
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[Long]] = {
if (records.isEmpty) return Future.successful(Seq.empty)

)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = records.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
Expand All @@ -231,12 +264,15 @@ package object extension {
requestOptions = requestOptions
)

batch <- client.batch(
batchResponses <- chunkedBatch(
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
records = records,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions)

_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
Expand All @@ -250,13 +286,17 @@ package object extension {
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

replace <- client.operationIndex(
move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions)
} yield Seq(copy.taskID, batch.taskID, replace.taskID)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
}
}
}

0 comments on commit 05ac1d1

Please sign in to comment.