Skip to content

Commit

Permalink
Azure Monitor Ingestion: migrate to test proxy (Azure#34415)
Browse files Browse the repository at this point in the history
* Azure Monitor Ingestion: migrate to test proxy

* debug missing recordings

* re-record with regression bug fix

* reduce recording size and add custom timeout
  • Loading branch information
srnagar authored Jun 5, 2023
1 parent 4096d8d commit eb5d469
Show file tree
Hide file tree
Showing 20 changed files with 1,961 additions and 1,318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,11 @@ private LogsIngestionRequest createRequest(boolean last) throws IOException {
generator.writeStartArray();
generator.writeRaw(serializedLogs.stream().collect(Collectors.joining(",")));
generator.writeEndArray();
generator.close();

byte[] zippedRequestBody = gzipRequest(byteArrayOutputStream.toByteArray());

return new LogsIngestionRequest(originalLogsRequest, zippedRequestBody);
} finally {
generator.close();
byteArrayOutputStream.close();

if (!last) {
originalLogsRequest = new ArrayList<>();
serializedLogs.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.test.annotation.RecordWithoutRequestBody;
import com.azure.core.util.BinaryData;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import com.azure.monitor.ingestion.models.LogsUploadException;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import reactor.test.StepVerifier;

import java.util.List;
Expand All @@ -26,44 +28,69 @@ public class LogsIngestionAsyncClientTest extends LogsIngestionTestBase {
@Test
public void testUploadLogs() {
List<Object> logs = getObjects(10);
LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient();
DataValidationPolicy dataValidationPolicy = new DataValidationPolicy(logs);

LogsIngestionAsyncClient client = clientBuilder.addPolicy(dataValidationPolicy).buildAsyncClient();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyComplete();
.verifyComplete();
}

@Test
public void testUploadLogsInBatches() {
List<Object> logs = getObjects(10000);

AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new BatchCountPolicy(count))
.buildAsyncClient();
.addPolicy(logsCountPolicy)
.addPolicy(new BatchCountPolicy(count))
.buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyComplete();
.verifyComplete();

assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsInBatchesConcurrently() {
List<Object> logs = getObjects(10000);

AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();
LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new BatchCountPolicy(count))
.addPolicy(logsCountPolicy)
.buildAsyncClient();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3)))
.verifyComplete();
assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsPartialFailure() {
List<Object> logs = getObjects(100000);
AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyErrorSatisfies(error -> {
assertTrue(error instanceof LogsUploadException);
if (error instanceof LogsUploadException) {
LogsUploadException ex = (LogsUploadException) error;
assertEquals(49460, ex.getFailedLogsCount());
assertEquals(5, ex.getLogsUploadErrors().size());
}
});
.verifyErrorSatisfies(error -> {
assertTrue(error instanceof LogsUploadException);
if (error instanceof LogsUploadException) {
LogsUploadException ex = (LogsUploadException) error;
assertEquals(49460, ex.getFailedLogsCount());
assertEquals(5, ex.getLogsUploadErrors().size());
}
});
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
Expand All @@ -72,55 +99,64 @@ public void testUploadLogsPartialFailureWithErrorHandler() {
AtomicInteger count = new AtomicInteger();
AtomicLong failedLogsCount = new AtomicLong();
LogsUploadOptions logsUploadOptions = new LogsUploadOptions()
.setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size()));
.setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size()));
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions))
.verifyComplete();
.verifyComplete();
assertEquals(49460, failedLogsCount.get());
assertEquals(11, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsStopOnFirstError() {
List<Object> logs = getObjects(100000);
AtomicInteger count = new AtomicInteger();
LogsUploadOptions logsUploadOptions = new LogsUploadOptions()
.setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
.setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions))
.verifyErrorSatisfies(ex -> assertTrue(ex instanceof HttpResponseException));
.verifyErrorSatisfies(ex -> assertTrue(ex instanceof HttpResponseException));
assertEquals(2, count.get());
// this should stop on first error, so, only one request should be sent that contains a subset of logs
assertTrue(logs.size() > logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsProtocolMethod() {
List<Object> logs = getObjects(10);
LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient();
StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName,
BinaryData.fromObject(logs), new RequestOptions().setHeader("Content-Encoding", "gzip")))
.assertNext(response -> assertEquals(204, response.getStatusCode()))
.verifyComplete();
BinaryData.fromObject(logs), new RequestOptions()))
.assertNext(response -> assertEquals(204, response.getStatusCode()))
.verifyComplete();
}

@Test
@RecordWithoutRequestBody
@EnabledIfEnvironmentVariable(named = "AZURE_TEST_MODE", matches = "LIVE", disabledReason = "Test proxy network connection is timing out for this test in playback mode.")
public void testUploadLargeLogsProtocolMethod() {
List<Object> logs = getObjects(1000000);
List<Object> logs = getObjects(375000);
LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient();
StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName,
BinaryData.fromObject(logs), new RequestOptions()))
.verifyErrorMatches(responseException -> (responseException instanceof HttpResponseException)
&& ((HttpResponseException) responseException).getResponse().getStatusCode() == 413);
BinaryData.fromObject(logs), new RequestOptions()))
.verifyErrorMatches(responseException -> (responseException instanceof HttpResponseException)
&& ((HttpResponseException) responseException).getResponse().getStatusCode() == 413);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.test.annotation.RecordWithoutRequestBody;
import com.azure.core.util.BinaryData;
import com.azure.monitor.ingestion.models.LogsUploadException;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test cases for {@link LogsIngestionClient}.
Expand All @@ -26,7 +29,8 @@ public class LogsIngestionClientTest extends LogsIngestionTestBase {
@Test
public void testUploadLogs() {
List<Object> logs = getObjects(10);
LogsIngestionClient client = clientBuilder.buildClient();
DataValidationPolicy dataValidationPolicy = new DataValidationPolicy(logs);
LogsIngestionClient client = clientBuilder.addPolicy(dataValidationPolicy).buildClient();
client.upload(dataCollectionRuleId, streamName, logs);
}

Expand All @@ -35,39 +39,49 @@ public void testUploadLogsInBatches() {
List<Object> logs = getObjects(10000);

AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();
LogsIngestionClient client = clientBuilder
.addPolicy(new BatchCountPolicy(count))
.buildClient();
.addPolicy(new BatchCountPolicy(count))
.addPolicy(logsCountPolicy)
.buildClient();
client.upload(dataCollectionRuleId, streamName, logs);
assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsInBatchesConcurrently() {
List<Object> logs = getObjects(10000);

AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();
LogsIngestionClient client = clientBuilder
.addPolicy(new BatchCountPolicy(count))
.buildClient();
.addPolicy(new BatchCountPolicy(count))
.addPolicy(logsCountPolicy)
.buildClient();
client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3));
assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsPartialFailure() {
List<Object> logs = getObjects(100000);
AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildClient();
.addPolicy(new PartialFailurePolicy(count))
.addPolicy(logsCountPolicy)
.buildClient();

LogsUploadException uploadLogsException = assertThrows(LogsUploadException.class, () -> {
client.upload(dataCollectionRuleId, streamName, logs);
});
assertEquals(49460, uploadLogsException.getFailedLogsCount());
assertEquals(5, uploadLogsException.getLogsUploadErrors().size());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());

}

@Test
Expand All @@ -76,53 +90,63 @@ public void testUploadLogsPartialFailureWithErrorHandler() {
AtomicInteger count = new AtomicInteger();
AtomicLong failedLogsCount = new AtomicLong();
LogsUploadOptions logsUploadOptions = new LogsUploadOptions()
.setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size()));
.setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size()));
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildClient();
.addPolicy(new PartialFailurePolicy(count))
.addPolicy(logsCountPolicy)
.buildClient();

client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions);
assertEquals(11, count.get());
assertEquals(49460, failedLogsCount.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());

}

@Test
public void testUploadLogsStopOnFirstError() {
List<Object> logs = getObjects(100000);
AtomicInteger count = new AtomicInteger();
LogsUploadOptions logsUploadOptions = new LogsUploadOptions()
.setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
.setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionClient client = clientBuilder
.addPolicy(new PartialFailurePolicy(count))
.buildClient();
.addPolicy(new PartialFailurePolicy(count))
.addPolicy(logsCountPolicy)
.buildClient();

assertThrows(HttpResponseException.class, () -> client.upload(dataCollectionRuleId, streamName, logs,
logsUploadOptions));
logsUploadOptions));
assertEquals(2, count.get());

// only a subset of logs should be sent
assertTrue(logs.size() > logsCountPolicy.getTotalLogsCount());
}

@Test
public void testUploadLogsProtocolMethod() {
List<Object> logs = getObjects(10);
LogsIngestionClient client = clientBuilder.buildClient();
Response<Void> response = client.uploadWithResponse(dataCollectionRuleId, streamName,
BinaryData.fromObject(logs), new RequestOptions().setHeader("Content-Encoding", "gzip"));
BinaryData.fromObject(logs), new RequestOptions());
assertEquals(204, response.getStatusCode());
}

@Test
@RecordWithoutRequestBody
@EnabledIfEnvironmentVariable(named = "AZURE_TEST_MODE", matches = "LIVE", disabledReason = "Test proxy network connection is timing out for this test in playback mode.")
public void testUploadLargeLogsProtocolMethod() {
List<Object> logs = getObjects(1000000);
List<Object> logs = getObjects(375000);
LogsIngestionClient client = clientBuilder.buildClient();

HttpResponseException responseException = assertThrows(HttpResponseException.class,
() -> client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs),
new RequestOptions()));
() -> client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs), new RequestOptions()));
assertEquals(413, responseException.getResponse().getStatusCode());
}
}
Loading

0 comments on commit eb5d469

Please sign in to comment.