Skip to content

Commit

Permalink
[issue_4153] Fixes PR review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrini-larus committed Nov 27, 2024
1 parent 8d7874a commit 631d738
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docs/asciidoc/modules/ROOT/pages/ml/openai.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ If present, they take precedence over the analogous APOC configs.
By default, is `/embeddings`, `/completions` and `/chat/completions` for respectively the `apoc.ml.openai.embedding`, `apoc.ml.openai.completion` and `apoc.ml.openai.chat` procedures.
| jsonPath | To customize https://github.com/json-path/JsonPath[JSONPath] of the response.
The default is `$` for the `apoc.ml.openai.chat` and `apoc.ml.openai.completion` procedures, and `$.data` for the `apoc.ml.openai.embedding` procedure.
| enableBackOffRetries | If set to true, enables the backoff retry strategy for handling failures. (default: false)
| backOffRetries | Sets the maximum number of retry attempts before the operation throws an exception. (default: 5)
| exponentialBackoff | If set to true, applies an exponential progression to the wait time between retries. If set to false, the wait time increases linearly. (default: false)
|===


Expand Down
10 changes: 6 additions & 4 deletions extended/src/main/java/apoc/ml/OpenAI.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public class OpenAI {
public static final String JSON_PATH_CONF_KEY = "jsonPath";
public static final String PATH_CONF_KEY = "path";
public static final String GPT_4O_MODEL = "gpt-4o";
public static final String ENABLE_BACK_OFF_RETRIES = "enableBackOffRetries";
public static final String BACK_OFF_RETRIES = "backOffRetries";
public static final String ENABLE_BACK_OFF_RETRIES_CONF_KEY = "enableBackOffRetries";
public static final String ENABLE_EXPONENTIAL_BACK_OFF_CONF_KEY = "exponentialBackoff";
public static final String BACK_OFF_RETRIES_CONF_KEY = "backOffRetries";

@Context
public ApocConfig apocConfig;
Expand All @@ -62,8 +63,9 @@ public EmbeddingResult(long index, String text, List<Double> embedding) {

static Stream<Object> executeRequest(String apiKey, Map<String, Object> configuration, String path, String model, String key, Object inputs, String jsonPath, ApocConfig apocConfig, URLAccessChecker urlAccessChecker) throws JsonProcessingException, MalformedURLException {
apiKey = (String) configuration.getOrDefault(APIKEY_CONF_KEY, apocConfig.getString(APOC_OPENAI_KEY, apiKey));
Boolean enableBackOffRetries = (Boolean) configuration.getOrDefault(ENABLE_BACK_OFF_RETRIES, Boolean.TRUE);
Integer backOffRetries = (Integer) configuration.getOrDefault(BACK_OFF_RETRIES, 5);
boolean enableBackOffRetries = Util.toBoolean( configuration.get(ENABLE_BACK_OFF_RETRIES_CONF_KEY) );
Integer backOffRetries = Util.toInteger(configuration.getOrDefault(BACK_OFF_RETRIES_CONF_KEY, 5));
boolean exponentialBackoff = Util.toBoolean( configuration.get(ENABLE_EXPONENTIAL_BACK_OFF_CONF_KEY) );
if (apiKey == null || apiKey.isBlank())
throw new IllegalArgumentException("API Key must not be empty");

Expand Down
43 changes: 26 additions & 17 deletions extended/src/main/java/apoc/util/ExtendedUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,20 @@ public static float[] listOfNumbersToFloatArray(List<? extends Number> embedding
}

public static <T> T withBackOffRetries(
Supplier<T> func, boolean retry, Integer backoffRetry,
Supplier<T> func, boolean retry, int backoffRetry,
Consumer<Exception> exceptionHandler
){
return withBackOffRetries(func, retry, backoffRetry, exceptionHandler, Boolean.FALSE);
return withBackOffRetries(func, retry, backoffRetry, exceptionHandler, false);
}


public static <T> T withBackOffRetries(
Supplier<T> func, boolean retry, Integer backoffRetry,
Consumer<Exception> exceptionHandler, Boolean exponential
Supplier<T> func, boolean retry, int backoffRetry,
Consumer<Exception> exceptionHandler, boolean exponential
) {
T result = null;
backoffRetry = Objects.requireNonNullElse(backoffRetry, 5);
Integer countDown = backoffRetry < 1 ? 5 : backoffRetry;
backoffRetry = backoffRetry < 1 ? 5 : backoffRetry;
int countDown = backoffRetry;
exceptionHandler = Objects.requireNonNullElse(exceptionHandler, exe -> {});
while (true) {
try {
Expand All @@ -377,28 +377,37 @@ public static <T> T withBackOffRetries(
if(!retry || countDown < 1) throw e;
exceptionHandler.accept(e);
countDown--;
long sleepMultiplier = Objects.requireNonNullElse(exponential, Boolean.FALSE) ?
(long) Math.pow(2, backoffRetry - countDown) : // Exponential retry progression
backoffRetry - countDown; // Linear retry progression
long delay = Math.min(
Duration.ofSeconds(1)
.multipliedBy(sleepMultiplier)
.toMillis(),
Duration.ofSeconds(30).toMillis() // Max 30s
backoffSleep(
getDelay(backoffRetry, countDown, exponential)
);
sleep(delay);
}
}
return result;
}

public static void sleep(long millis) {
private static void backoffSleep(long millis){
sleep(millis, "Operation interrupted during backoff");
}

public static void sleep(long millis, String interruptedMessage) {
try {
Thread.sleep(millis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Operation interrupted during backoff", ie);
throw new RuntimeException(interruptedMessage, ie);
}
}

private static long getDelay(Integer backoffRetry, Integer countDown, boolean exponential){
long sleepMultiplier = exponential ?
(long) Math.pow(2, backoffRetry - countDown) : // Exponential retry progression
backoffRetry - countDown; // Linear retry progression
return Math.min(
Duration.ofSeconds(1)
.multipliedBy(sleepMultiplier)
.toMillis(),
Duration.ofSeconds(30).toMillis() // Max 30s
);
}

}
3 changes: 1 addition & 2 deletions extended/src/test/java/apoc/ml/OpenAIIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public OpenAIIT() {

@Before
public void setUp() throws Exception {
openaiKey = "OPENAI_KEY";
// openaiKey = System.getenv("OPENAI_KEY");
openaiKey = System.getenv("OPENAI_KEY");
Assume.assumeNotNull("No OPENAI_KEY environment configured", openaiKey);
TestUtil.registerProcedure(db, OpenAI.class);
}
Expand Down
8 changes: 7 additions & 1 deletion extended/src/test/java/apoc/util/ExtendedUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testWithExponentialBackOffRetriesWithSuccess() {
long start = System.currentTimeMillis();
int result = ExtendedUtil.withBackOffRetries(
this::testFunction,
true, null, // test backoffRetry default value -> 5
true, 0, // test backoffRetry default value -> 5
runEx -> {
if(!runEx.getMessage().contains("Expected"))
throw new RuntimeException("Some Bad News...");
Expand Down Expand Up @@ -72,6 +72,9 @@ public void testBackOffRetriesWithError() {
);
long time = System.currentTimeMillis() - start;

// The method is configured to retry the operation twice.
// So, it will make two extra-attempts, waiting for 1 second and 2 seconds before failing and throwing an exception.
// Resulting in an approximate execution time of 3 seconds.
assertTrue(time > 2500);
assertTrue(time < 3500);
}
Expand All @@ -87,6 +90,9 @@ public void testWithoutBackOffRetriesWithError() {
runEx -> {}
)
);

// Retry strategy is not active and the testFunction is executed only once by raising an exception.
assertEquals(1, i);
}

private int testFunction() {
Expand Down

0 comments on commit 631d738

Please sign in to comment.