diff --git a/distribution/lib/Standard/Base/0.0.0-dev/src/Enso_Cloud/Internal/Audit_Log.enso b/distribution/lib/Standard/Base/0.0.0-dev/src/Enso_Cloud/Internal/Audit_Log.enso index dc3882a15cb3..6998494e2f30 100644 --- a/distribution/lib/Standard/Base/0.0.0-dev/src/Enso_Cloud/Internal/Audit_Log.enso +++ b/distribution/lib/Standard/Base/0.0.0-dev/src/Enso_Cloud/Internal/Audit_Log.enso @@ -1,10 +1,13 @@ +import project.Any.Any import project.Data.Json.JS_Object import project.Data.Text.Text import project.Errors.Illegal_Argument.Illegal_Argument import project.Nothing.Nothing +import project.Panic.Panic from project.Data.Boolean import Boolean, False, True polyglot java import org.enso.base.enso_cloud.audit.AuditLog +polyglot java import org.enso.base.enso_cloud.audit.AuditLog.AuditLogError ## PRIVATE type Audit_Log @@ -21,6 +24,10 @@ type Audit_Log automatically. - async: Whether to submit the event asynchronously. Defaults to True. + If `async` is True, the operation returns immediately. There is no way + to know if the log message was successfully submitted. + If `async` is False, the operation blocks until the log message is + successfully submitted, and will raise a panic if submitting fails. ? Restricted Fields @@ -32,7 +39,19 @@ type Audit_Log - `projectName` - `projectSessionId` report_event event_type:Text message:Text (metadata:JS_Object = JS_Object.from_pairs []) (async : Boolean = True) -> Nothing = - Illegal_Argument.handle_java_exception <| + Illegal_Argument.handle_java_exception <| Audit_Log_Error.handle_java_exception <| case async of True -> AuditLog.logAsync event_type message metadata.object_node False -> AuditLog.logSynchronously event_type message metadata.object_node + +## PRIVATE +type Audit_Log_Error + ## PRIVATE + Error message:Text cause:Any + + ## PRIVATE + handle_java_exception = + on_error caught_panic = + cause = caught_panic.payload + Panic.throw (Audit_Log_Error.Error cause.getMessage cause) + Panic.catch AuditLogError handler=on_error diff --git a/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java b/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java index 793f42ff0d03..fdffbfef1ffe 100644 --- a/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java +++ b/lib/scala/downloader/src/test/java/org/enso/downloader/http/HttpDownloaderTest.java @@ -48,7 +48,7 @@ public class HttpDownloaderTest { @BeforeClass public static void initServer() throws URISyntaxException, IOException { serverExecutor = Executors.newSingleThreadExecutor(); - server = HTTPTestHelperServer.createServer("localhost", port, serverExecutor, false); + server = HTTPTestHelperServer.createServer("localhost", port, serverExecutor, false, null); server.addHandler("/texts", new TextHandler()); server.addHandler("/redirect", new RedirectHandler()); server.addHandler("/files", new BigFileHandler()); diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/CloudAPI.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/CloudAPI.java index edb1dc2ce04e..da55de0fd35a 100644 --- a/std-bits/base/src/main/java/org/enso/base/enso_cloud/CloudAPI.java +++ b/std-bits/base/src/main/java/org/enso/base/enso_cloud/CloudAPI.java @@ -1,6 +1,7 @@ package org.enso.base.enso_cloud; import org.enso.base.Environment_Utils; +import org.enso.base.enso_cloud.audit.AuditLog; public final class CloudAPI { /** @@ -38,5 +39,6 @@ public static void flushCloudCaches() { CloudRequestCache.clear(); AuthenticationProvider.reset(); EnsoSecretReader.flushCache(); + AuditLog.resetCache(); } } diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLog.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLog.java index 27941dbfe669..9e3800fd2c90 100644 --- a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLog.java +++ b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLog.java @@ -1,15 +1,42 @@ package org.enso.base.enso_cloud.audit; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +/** + * The high-level API for logging audit events. + * + *

The messages are sent on a single background thread in batches, but also as soon as possible. + * That means that we are never waiting for more messages to be batched to avoid delaying logs. + * However, if sending the previous batch took enough time that many messages have been scheduled in + * the meantime, all waiting messages (up to some limit) will be sent in a single request. + */ public final class AuditLog { + /** Schedules the log message to be sent in the next batch, and returns immediately. */ public static void logAsync(String type, String message, ObjectNode metadata) { var event = new AuditLogMessage(type, message, metadata); - AuditLogAPI.INSTANCE.logAsync(event); + AuditLogApiAccess.INSTANCE.logWithoutConfirmation(event); } + /** Schedules the log message to be sent in the next batch, and waits until it has been sent. */ public static void logSynchronously(String type, String message, ObjectNode metadata) { var event = new AuditLogMessage(type, message, metadata); - AuditLogAPI.INSTANCE.logSync(event); + Future future = AuditLogApiAccess.INSTANCE.logWithConfirmation(event); + try { + future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new AuditLogError("Failed to send log message: " + e, e); + } + } + + public static class AuditLogError extends RuntimeException { + public AuditLogError(String message, Throwable cause) { + super(message, cause); + } + } + + public static void resetCache() { + AuditLogApiAccess.INSTANCE.resetCache(); } } diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogAPI.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogAPI.java deleted file mode 100644 index 9383eee86735..000000000000 --- a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogAPI.java +++ /dev/null @@ -1,113 +0,0 @@ -package org.enso.base.enso_cloud.audit; - -import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import org.enso.base.enso_cloud.AuthenticationProvider; -import org.enso.base.enso_cloud.CloudAPI; - -/** - * Gives access to the low-level log event API in the Cloud and manages asynchronously submitting - * the logs. - */ -class AuditLogAPI { - private static final Logger logger = Logger.getLogger(AuditLogAPI.class.getName()); - public static AuditLogAPI INSTANCE = new AuditLogAPI(); - private HttpClient httpClient; - private final ExecutorService executorService; - - private AuditLogAPI() { - // A thread pool that creates at most one thread, only when it is needed, and shuts it down - // after 60 seconds of inactivity. - // TODO we may want to set a maximum capacity to the queue and think how we deal with situations - // when logs are added faster than they can be processed - executorService = - new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); - } - - public void logSync(LogMessage message) { - var request = buildRequest(message); - sendLogRequest(request, 5); - } - - public Future logAsync(LogMessage message) { - // We build the request on the main thread where the Enso Context is readily accessible - as we - // need to access the `Authentication_Service`. - var request = buildRequest(message); - return executorService.submit( - () -> { - try { - sendLogRequest(request, 5); - } catch (RequestFailureException e) { - throw e; - } catch (Exception e) { - logger.severe("Unexpected exception when sending a log message: " + e.getMessage()); - throw e; - } - return null; - }); - } - - private HttpRequest buildRequest(LogMessage message) { - var apiUri = CloudAPI.getAPIRootURI() + "logs"; - return HttpRequest.newBuilder() - .uri(URI.create(apiUri)) - .header("Authorization", "Bearer " + AuthenticationProvider.getAccessToken()) - .POST(HttpRequest.BodyPublishers.ofString(message.payload(), StandardCharsets.UTF_8)) - .build(); - } - - private void sendLogRequest(HttpRequest request, int retryCount) throws RequestFailureException { - try { - try { - if (httpClient == null) { - httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.ALWAYS).build(); - } - HttpResponse response = - httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - if (response.statusCode() < 200 || response.statusCode() >= 300) { - throw new RequestFailureException( - "Unexpected status code: " + response.statusCode() + " " + response.body(), null); - } - } catch (IOException | InterruptedException e) { - // Promote a checked exception to a runtime exception to simplify the code. - var errorMessage = e.getMessage() != null ? e.getMessage() : e.toString(); - throw new RequestFailureException("Failed to send log message: " + errorMessage, e); - } - } catch (RequestFailureException e) { - if (retryCount < 0) { - logger.severe("Failed to send log message after retrying: " + e.getMessage()); - failedLogCount++; - throw e; - } else { - logger.warning("Exception when sending a log message: " + e.getMessage() + ". Retrying..."); - sendLogRequest(request, retryCount - 1); - } - } - } - - public interface LogMessage { - String payload(); - } - - public static class RequestFailureException extends RuntimeException { - public RequestFailureException(String message, Throwable cause) { - super(message, cause); - } - } - - private int failedLogCount = 0; - - public int getFailedLogCount() { - return failedLogCount; - } -} diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogApiAccess.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogApiAccess.java new file mode 100644 index 000000000000..c65b88e689d2 --- /dev/null +++ b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogApiAccess.java @@ -0,0 +1,233 @@ +package org.enso.base.enso_cloud.audit; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.enso.base.enso_cloud.AuthenticationProvider; +import org.enso.base.enso_cloud.CloudAPI; + +/** + * Gives access to the low-level log event API in the Cloud and manages asynchronously submitting + * the logs. + */ +class AuditLogApiAccess { + private static final Logger logger = Logger.getLogger(AuditLogApiAccess.class.getName()); + + /** + * We still want to limit the batch size to some reasonable number - sending too many logs in one + * request could also be problematic. + */ + private static final int MAX_BATCH_SIZE = 100; + + private static final int MAX_RETRIES = 5; + + public static AuditLogApiAccess INSTANCE = new AuditLogApiAccess(); + + private HttpClient httpClient; + private final LogJobsQueue logQueue = new LogJobsQueue(); + private final ThreadPoolExecutor backgroundThreadService; + + private AuditLogApiAccess() { + // We set-up a thread 'pool' that will contain at most one thread. + // If the thread is idle for 60 seconds, it will be shut down. + backgroundThreadService = + new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + + public Future logWithConfirmation(LogMessage message) { + var currentRequestConfig = getRequestConfig(); + CompletableFuture completionNotification = new CompletableFuture<>(); + enqueueJob(new LogJob(message, completionNotification, currentRequestConfig)); + return completionNotification; + } + + public void logWithoutConfirmation(LogMessage message) { + var currentRequestConfig = getRequestConfig(); + enqueueJob(new LogJob(message, null, currentRequestConfig)); + } + + private void enqueueJob(LogJob job) { + int queuedJobs = logQueue.enqueue(job); + if (queuedJobs == 1 && backgroundThreadService.getQueue().isEmpty()) { + // If we are the first message in the queue, we need to start the background thread. + // It is possible that a job was already running, but adding a new one will not hurt - once + // the queue is empty, the currently running job will finish and any additional jobs will also + // terminate immediately. + backgroundThreadService.execute(this::logThreadEntryPoint); + } + + /* + * Liveness is guaranteed, because the queue size always increments exactly by 1, + * so `enqueue` returns 1 if and only if the queue was empty beforehand. + * + * If the queue was empty before adding a message, we always schedule a `logThreadEntryPoint` to run, + * unless it was already pending on the job queue. + * + * Any running `logThreadEntryPoint` will not finish until the queue is empty. + * So after every append, either a job is already running or scheduled to be run. + */ + } + + /** Runs as long as there are any pending log messages queued and sends them in batches. */ + private void logThreadEntryPoint() { + while (true) { + List pendingMessages = logQueue.popEnqueuedJobs(MAX_BATCH_SIZE); + if (pendingMessages.isEmpty()) { + // If there are no more pending messages, we can stop the thread for now. + // If during this teardown a new message is added, it will see no elements on `logQueue` and + // thus, + // `logQueue.enqueue` will return 1, thus ensuring that at least one new job is scheduled. + return; + } + + try { + // We use the request config from the first message. + // The request config may only change in test scenarios which just must take this into + // account. + var requestConfig = pendingMessages.get(0).requestConfig(); + var request = buildRequest(requestConfig, pendingMessages); + sendLogRequest(request, MAX_RETRIES); + notifyJobsAboutSuccess(pendingMessages); + } catch (RequestFailureException e) { + notifyJobsAboutFailure(pendingMessages, e); + } + } + } + + private void notifyJobsAboutSuccess(List jobs) { + for (var job : jobs) { + if (job.completionNotification() != null) { + job.completionNotification().complete(null); + } + } + } + + private void notifyJobsAboutFailure(List jobs, RequestFailureException e) { + for (var job : jobs) { + if (job.completionNotification() != null) { + job.completionNotification().completeExceptionally(e); + } + } + } + + private HttpRequest buildRequest(RequestConfig requestConfig, List messages) { + assert requestConfig != null + : "The request configuration must be set before building a request."; + var payload = buildPayload(messages); + return HttpRequest.newBuilder() + .uri(requestConfig.apiUri) + .header("Authorization", "Bearer " + requestConfig.accessToken) + .POST(HttpRequest.BodyPublishers.ofString(payload, StandardCharsets.UTF_8)) + .build(); + } + + private String buildPayload(List messages) { + var payload = new StringBuilder(); + payload.append("{\"logs\": ["); + for (var message : messages) { + payload.append(message.message().payload()).append(","); + } + + // Remove the trailing comma. + payload.deleteCharAt(payload.length() - 1); + payload.append("]}"); + return payload.toString(); + } + + private RequestConfig cachedRequestConfig = null; + + /** + * Builds a request configuration based on runtime information. + * + *

This method must be called from the main thread. + * + *

The same instance is returned every time after the first call, unless the caches were + * flushed (which is mostly used in tests). + */ + private RequestConfig getRequestConfig() { + if (cachedRequestConfig != null) { + return cachedRequestConfig; + } + + var uri = URI.create(CloudAPI.getAPIRootURI() + "logs"); + var config = new RequestConfig(uri, AuthenticationProvider.getAccessToken()); + cachedRequestConfig = config; + return config; + } + + /** + * Contains information needed to build a request to the Cloud Logs API. + * + *

This information must be gathered on the main Enso thread, as only there we have access to + * the {@link AuthenticationProvider}. + * + *

We associate an instance with every message to be sent. When sending multiple messages in a + * batch, we will use the config from one of them. This should not matter as in normal operations + * the configs will be the same, they only change during testing. Tests should this into account, + * by sending the last message in synchronous mode. + */ + private record RequestConfig(URI apiUri, String accessToken) {} + + private void sendLogRequest(HttpRequest request, int retryCount) throws RequestFailureException { + try { + try { + if (httpClient == null) { + httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.ALWAYS).build(); + } + HttpResponse response = + httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() < 200 || response.statusCode() >= 300) { + throw new RequestFailureException( + "Unexpected status code: " + response.statusCode() + " " + response.body(), null); + } + } catch (IOException | InterruptedException e) { + // Promote a checked exception to a runtime exception to simplify the code. + var errorMessage = e.getMessage() != null ? e.getMessage() : e.toString(); + throw new RequestFailureException("Failed to send log messages: " + errorMessage, e); + } + } catch (RequestFailureException e) { + if (retryCount < 0) { + logger.severe("Failed to send log messages after retrying: " + e.getMessage()); + throw e; + } else { + logger.warning("Exception when sending log messages: " + e.getMessage() + ". Retrying..."); + sendLogRequest(request, retryCount - 1); + } + } + } + + interface LogMessage { + String payload(); + } + + static class RequestFailureException extends RuntimeException { + public RequestFailureException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * A record that represents a single log to be sent. + * + *

It may contain the `completionNotification` future that will be completed when the log is + * sent. If no-one is listening for confirmation, that field will be `null`. + */ + record LogJob( + LogMessage message, + CompletableFuture completionNotification, + RequestConfig requestConfig) {} + + void resetCache() { + cachedRequestConfig = null; + } +} diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogMessage.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogMessage.java index 4cde39fd00e4..7273661744cd 100644 --- a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogMessage.java +++ b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/AuditLogMessage.java @@ -8,7 +8,7 @@ import org.enso.base.CurrentEnsoProject; import org.enso.base.enso_cloud.CloudAPI; -public class AuditLogMessage implements AuditLogAPI.LogMessage { +class AuditLogMessage implements AuditLogApiAccess.LogMessage { /** * A reserved field that is currently added by the cloud backend. Duplicating it will lead to diff --git a/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/LogJobsQueue.java b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/LogJobsQueue.java new file mode 100644 index 000000000000..22861df02ae8 --- /dev/null +++ b/std-bits/base/src/main/java/org/enso/base/enso_cloud/audit/LogJobsQueue.java @@ -0,0 +1,36 @@ +package org.enso.base.enso_cloud.audit; + +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; + +class LogJobsQueue { + + private final Deque queue = new LinkedList<>(); + + /** Enqueues a log message to be sent and returns the number of messages in the queue. */ + synchronized int enqueue(AuditLogApiAccess.LogJob job) { + int previousSize = queue.size(); + queue.addLast(job); + int newSize = queue.size(); + assert newSize == previousSize + 1 + : "Appending to queue is synchronized, so the size is always incremented exactly by 1."; + return newSize; + } + + /** Removes and returns up to {@code limit} enqueued jobs. */ + synchronized List popEnqueuedJobs(int limit) { + assert limit > 0; + if (queue.isEmpty()) { + return List.of(); + } + + int n = Math.min(limit, queue.size()); + List result = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + result.add(queue.removeFirst()); + } + return result; + } +} diff --git a/test/Base_Tests/README.md b/test/Base_Tests/README.md index 7b91d6caf23a..23e2b103eb4d 100644 --- a/test/Base_Tests/README.md +++ b/test/Base_Tests/README.md @@ -36,3 +36,13 @@ following 3 environment variables: Note that some cloud tests (e.g. testing secrets in HTTP requests) still require the `ENSO_HTTP_TEST_HTTPBIN_URL` setup, even if running against a real cloud deployment. + +### Testing audit log batching + +Currently, we only have a manual scenario for testing that log messages are +actually batched. + +To test it, launch the `http-test-helper` with additional +`--enable-manual-log-batching-test` flag. For more information, see the comment +on `batchingTestModeEnabled` in +[PostLogHandler.java](../../tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/PostLogHandler.java). diff --git a/test/Base_Tests/src/Network/Enso_Cloud/Audit_Log_Spec.enso b/test/Base_Tests/src/Network/Enso_Cloud/Audit_Log_Spec.enso index 86778e234deb..27bfc3d1d1cf 100644 --- a/test/Base_Tests/src/Network/Enso_Cloud/Audit_Log_Spec.enso +++ b/test/Base_Tests/src/Network/Enso_Cloud/Audit_Log_Spec.enso @@ -53,6 +53,18 @@ add_specs suite_builder = group_builder.specify "does not allow restricted fields in metadata" <| setup.with_prepared_environment <| Audit_Log.report_event "TestEventType" "Message" (JS_Object.from_pairs [["type", "my type override?"]]) . should_fail_with Illegal_Argument + group_builder.specify "should be able to send many messages quickly" <| setup.with_prepared_environment <| + random_payload = "payload-" + Random.uuid + 0.up_to 120 . each ix-> + Audit_Log.report_event "TestEvent" "Message" (JS_Object.from_pairs [["my_field", random_payload], ["seq", ix]]) async=True . should_succeed + + # With the last log message we do `async=False`, so that we wait until this message (and all previous ones) are processed. + Audit_Log.report_event "TestEvent" "Message" (JS_Object.from_pairs [["my_field", random_payload]]) async=False . should_succeed + + Test.with_retries <| + events = get_audit_log_events . filter ev-> (ev.metadata.get "my_field") == random_payload + events.length . should_equal 121 + main filter=Nothing = suite = Test.build suite_builder-> add_specs suite_builder diff --git a/test/Base_Tests/src/Network/Enso_Cloud/Cloud_Tests_Setup.enso b/test/Base_Tests/src/Network/Enso_Cloud/Cloud_Tests_Setup.enso index 80edb7b239a4..0cee906ae2ae 100644 --- a/test/Base_Tests/src/Network/Enso_Cloud/Cloud_Tests_Setup.enso +++ b/test/Base_Tests/src/Network/Enso_Cloud/Cloud_Tests_Setup.enso @@ -20,10 +20,10 @@ type Cloud_Tests_Setup with_prepared_environment self ~action = case self of Cloud_Tests_Setup.Mock _ _ -> - Cloud_Tests_Setup.reset Panic.with_finalizer Cloud_Tests_Setup.reset <| Test_Environment.unsafe_with_environment_override "ENSO_CLOUD_API_URI" self.api_url.to_text <| Test_Environment.unsafe_with_environment_override "ENSO_CLOUD_CREDENTIALS_FILE" self.credentials_location.absolute.normalize.path <| + Cloud_Tests_Setup.reset action _ -> action diff --git a/tools/http-test-helper/src/main/java/org/enso/shttp/HTTPTestHelperServer.java b/tools/http-test-helper/src/main/java/org/enso/shttp/HTTPTestHelperServer.java index cea2574bbcd2..a1f222706199 100644 --- a/tools/http-test-helper/src/main/java/org/enso/shttp/HTTPTestHelperServer.java +++ b/tools/http-test-helper/src/main/java/org/enso/shttp/HTTPTestHelperServer.java @@ -5,6 +5,7 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; @@ -12,6 +13,7 @@ import org.enso.shttp.auth.BasicAuthTestHandler; import org.enso.shttp.auth.TokenAuthTestHandler; import org.enso.shttp.cloud_mock.CloudAuthRenew; +import org.enso.shttp.cloud_mock.CloudMockSetup; import org.enso.shttp.cloud_mock.CloudRoot; import org.enso.shttp.cloud_mock.ExpiredTokensCounter; import org.enso.shttp.test_helpers.*; @@ -21,16 +23,18 @@ public class HTTPTestHelperServer { public static void main(String[] args) { - if (args.length != 2) { - System.err.println("Usage: http-test-helper "); + if (args.length < 2) { + System.err.println("Usage: http-test-helper [additional test options]"); System.exit(1); } String host = args[0]; int port = Integer.parseInt(args[1]); + String[] remainingArgs = Arrays.copyOfRange(args, 2, args.length); final Semaphore stopNotification = new Semaphore(0, false); HybridHTTPServer server = null; try { - server = createServer(host, port, null, true); + CloudMockSetup cloudMockSetup = CloudMockSetup.fromArgs(remainingArgs); + server = createServer(host, port, null, true, cloudMockSetup); } catch (URISyntaxException | IOException e) { e.printStackTrace(); System.exit(1); @@ -67,16 +71,21 @@ public static void main(String[] args) { * @return The created server */ public static HybridHTTPServer createServer( - String host, int port, Executor executor, boolean withSSLServer) + String host, + int port, + Executor executor, + boolean withSSLServer, + CloudMockSetup cloudMockSetup) throws URISyntaxException, IOException { Path projectRoot = findProjectRoot(); Path keyStorePath = projectRoot.resolve("tools/http-test-helper/target/keystore.jks"); var server = new HybridHTTPServer(host, port, port + 1, keyStorePath, executor, withSSLServer); - setupEndpoints(server, projectRoot); + setupEndpoints(server, projectRoot, cloudMockSetup); return server; } - private static void setupEndpoints(HybridHTTPServer server, Path projectRoot) { + private static void setupEndpoints( + HybridHTTPServer server, Path projectRoot, CloudMockSetup cloudMockSetup) { for (HttpMethod method : HttpMethod.values()) { String path = "/" + method.toString().toLowerCase(); server.addHandler(path, new TestHandler(method)); @@ -91,11 +100,13 @@ private static void setupEndpoints(HybridHTTPServer server, Path projectRoot) { server.addHandler("/test_redirect", new RedirectTestHandler("/testfiles/js.txt")); // Cloud mock - var expiredTokensCounter = new ExpiredTokensCounter(); - server.addHandler("/COUNT-EXPIRED-TOKEN-FAILURES", expiredTokensCounter); - CloudRoot cloudRoot = new CloudRoot(expiredTokensCounter); - server.addHandler(cloudRoot.prefix, cloudRoot); - server.addHandler("/enso-cloud-auth-renew", new CloudAuthRenew()); + if (cloudMockSetup != null) { + var expiredTokensCounter = new ExpiredTokensCounter(); + server.addHandler("/COUNT-EXPIRED-TOKEN-FAILURES", expiredTokensCounter); + CloudRoot cloudRoot = new CloudRoot(expiredTokensCounter, cloudMockSetup); + server.addHandler(cloudRoot.prefix, cloudRoot); + server.addHandler("/enso-cloud-auth-renew", new CloudAuthRenew()); + } // Data link helpers server.addHandler("/dynamic-datalink", new GenerateDataLinkHandler(true)); diff --git a/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudMockSetup.java b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudMockSetup.java new file mode 100644 index 000000000000..aacef3b9e314 --- /dev/null +++ b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudMockSetup.java @@ -0,0 +1,11 @@ +package org.enso.shttp.cloud_mock; + +import java.util.List; + +public record CloudMockSetup(boolean logBatchingTestModeEnabled) { + public static CloudMockSetup fromArgs(String[] remainingArgs) { + List args = List.of(remainingArgs); + boolean logBatchingTestModeEnabled = args.contains("--enable-manual-log-batching-test"); + return new CloudMockSetup(logBatchingTestModeEnabled); + } +} diff --git a/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudRoot.java b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudRoot.java index 27124896f525..921ebf99a89b 100644 --- a/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudRoot.java +++ b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/CloudRoot.java @@ -12,7 +12,7 @@ public class CloudRoot extends HandlerWithTokenAuth { private final ExpiredTokensCounter expiredTokensCounter; private final CloudHandler[] handlers; - public CloudRoot(ExpiredTokensCounter expiredTokensCounter) { + public CloudRoot(ExpiredTokensCounter expiredTokensCounter, CloudMockSetup setup) { this.expiredTokensCounter = expiredTokensCounter; AssetStore assetStore = new AssetStore(); UsersService usersService = new UsersService(); @@ -26,7 +26,7 @@ public CloudRoot(ExpiredTokensCounter expiredTokensCounter) { new PathResolver(assetStore), new DirectoriesHandler(assetStore), new GetLogsHandler(eventsService), - new PostLogHandler(usersService, eventsService) + new PostLogHandler(usersService, eventsService, setup.logBatchingTestModeEnabled()) }; } diff --git a/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/PostLogHandler.java b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/PostLogHandler.java index 963c01487752..e2fdebc769ca 100644 --- a/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/PostLogHandler.java +++ b/tools/http-test-helper/src/main/java/org/enso/shttp/cloud_mock/PostLogHandler.java @@ -5,16 +5,34 @@ import java.io.IOException; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; import org.enso.shttp.HttpMethod; public class PostLogHandler implements CloudHandler { - private final UsersService users; - private final EventsService events; + private final UsersService usersService; + private final EventsService eventsService; private final ObjectMapper jsonMapper = new ObjectMapper(); - public PostLogHandler(UsersService users, EventsService events) { - this.users = users; - this.events = events; + /** + * Parameter indicating if manual batching test mode is enabled. + * + *

In this mode, the server will sleep during processing of log messages, delaying the + * processing, giving the Enso test process time to gather more messages that need to be sent. The + * process will also log how many log messages were received in each batch. Enabling this flag and + * inspecting these logs allow to manually test that messages are indeed sent in batches if the + * log processing is slower. + */ + private final boolean batchingTestModeEnabled; + + public PostLogHandler( + UsersService usersService, EventsService eventsService, boolean batchingTestModeEnabled) { + this.usersService = usersService; + this.eventsService = eventsService; + this.batchingTestModeEnabled = batchingTestModeEnabled; + if (batchingTestModeEnabled) { + System.out.println("Manual audit log batching test mode enabled."); + } } @Override @@ -37,16 +55,54 @@ public void handleCloudAPI(CloudExchange exchange) throws IOException { } JsonNode root = jsonMapper.readTree(exchange.decodeBodyAsText()); - String message = root.get("message").asText(); - String organizationId = users.currentUserOrganizationId(); - String userEmail = users.currentUserEmail(); - String timestamp = ZonedDateTime.now().withZoneSameInstant(ZoneId.of("UTC")).toString(); - JsonNode metadata = root.get("metadata"); - String projectId = root.get("projectId").asText(); - EventsService.LogEvent event = - new EventsService.LogEvent( - organizationId, userEmail, timestamp, metadata, message, projectId); - events.recordEvent(event); + var incomingEvents = decodeLogEvents(root); + if (batchingTestModeEnabled) { + System.out.println("Received a batch of " + incomingEvents.size() + " audit log messages."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore the interruption + } + } + if (incomingEvents.isEmpty()) { + exchange.sendResponse(400, "Empty array was sent."); + return; + } + for (var event : incomingEvents) { + eventsService.recordEvent(event); + } exchange.sendEmptyResponse(204); } + + private List decodeLogEvents(JsonNode root) { + if (!root.isObject()) { + throw new IllegalArgumentException("Invalid JSON structure: " + root); + } + + if (root.has("logs") && root.size() == 1) { + var array = root.get("logs"); + if (!array.isArray()) { + throw new IllegalArgumentException("Invalid JSON structure: " + root); + } + + List events = new ArrayList<>(array.size()); + for (JsonNode event : array) { + events.add(parseLogEvent(event)); + } + return events; + } else { + return List.of(parseLogEvent(root)); + } + } + + private EventsService.LogEvent parseLogEvent(JsonNode json) { + String message = json.get("message").asText(); + String organizationId = usersService.currentUserOrganizationId(); + String userEmail = usersService.currentUserEmail(); + String timestamp = ZonedDateTime.now().withZoneSameInstant(ZoneId.of("UTC")).toString(); + JsonNode metadata = json.get("metadata"); + String projectId = json.get("projectId").asText(); + return new EventsService.LogEvent( + organizationId, userEmail, timestamp, metadata, message, projectId); + } }