Skip to content

Commit

Permalink
from review
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Nov 14, 2024
1 parent 4274d26 commit fabf6fe
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo) throws IOException
Thread.sleep(10_000);
// Verify offset positions
final Map<String, Object> offsetKeyPositions = verifyOffsetPositions(offsetKeys, 1);
offsetKeyPositions.forEach((key, value) -> assertThat(value).isEqualTo(5));
assertThat(offsetKeyPositions).values().containsOnly(5);
}

@Test
Expand Down Expand Up @@ -255,7 +255,8 @@ void avroTest(final TestInfo testInfo) throws IOException, InterruptedException
Thread.sleep(10_000);
// Verify offset positions
final Map<String, Object> offsetKeyPositions = verifyOffsetPositions(offsetKeys, 5);
offsetKeyPositions.forEach((key, value) -> assertThat(value).isEqualTo(100));
// offsetKeyPositions.forEach((key, value) -> assertThat(value).isEqualTo(100));
assertThat(offsetKeyPositions).values().containsOnly(100);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.aiven.kafka.connect.s3.source.utils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -50,7 +48,7 @@ class FileReaderTest {

private FileReader fileReader;

private static Map<String, Object> getObjectMap(final int maxTasks, final int taskId) {
private static Map<String, Object> getConfigMap(final int maxTasks, final int taskId) {
final Map<String, Object> configMap = new HashMap<>();
configMap.put("tasks.max", String.valueOf(maxTasks));
configMap.put("task.id", String.valueOf(taskId));
Expand All @@ -65,7 +63,7 @@ void testFetchObjectSummariesWithNoObjects() {
when(offsetManager.getOffsets()).thenReturn(new HashMap<>());

final Iterator<S3ObjectSummary> summaries = fileReader.fetchObjectSummaries(s3Client);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand All @@ -74,7 +72,7 @@ void testFetchObjectSummariesWithOneObjectWithBasicConfig() {

initializeWithTaskConfigs(1, 0);
final Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);
}

Expand All @@ -84,20 +82,20 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskId1() {

initializeWithTaskConfigs(4, 1);
Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);

initializeWithTaskConfigs(4, 0);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 2);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 3);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand All @@ -106,20 +104,20 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskId2() throws IOExce

initializeWithTaskConfigs(4, 2);
Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);

initializeWithTaskConfigs(4, 1);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 3);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 0);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand All @@ -128,20 +126,20 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskId3() {

initializeWithTaskConfigs(4, 3);
Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);

initializeWithTaskConfigs(4, 1);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 2);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 0);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand All @@ -150,20 +148,20 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskId4() {

initializeWithTaskConfigs(4, 0);
Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);

initializeWithTaskConfigs(4, 1);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 2);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(4, 3);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand All @@ -172,24 +170,24 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskId2MaxTasks5() thro

initializeWithTaskConfigs(5, 3);
Iterator<S3ObjectSummary> summaries = getS3ObjectSummaryIterator(objectKey);
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);

initializeWithTaskConfigs(5, 1);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(5, 2);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(5, 0);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

initializeWithTaskConfigs(5, 4);
summaries = getS3ObjectSummaryIterator(objectKey);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

// Has 3 objects with one of them zero bytes. Zero bytes object will be ignored
Expand All @@ -203,9 +201,9 @@ void testFetchObjectSummariesWithZeroByteObject() {
Iterator<S3ObjectSummary> summaries = fileReader.fetchObjectSummaries(s3Client);

// assigned 1 object to taskid 3
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();

// assigned 1 object to taskid 3
initializeWithTaskConfigs(4, 0);
Expand All @@ -215,9 +213,9 @@ void testFetchObjectSummariesWithZeroByteObject() {
summaries = fileReader.fetchObjectSummaries(s3Client);

// assigned to taskid 4
assertTrue(summaries.hasNext());
assertThat(summaries.hasNext()).isTrue();
assertThat(summaries.next().getSize()).isEqualTo(1);
assertFalse(summaries.hasNext());
assertThat(summaries.hasNext()).isFalse();
}

@Test
Expand Down Expand Up @@ -267,7 +265,7 @@ private Iterator<S3ObjectSummary> getS3ObjectSummaryIterator(final String object

public void initializeWithTaskConfigs(final int maxTasks, final int taskId) {
final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class);
final Map<String, Object> configMap = getObjectMap(maxTasks, taskId);
final Map<String, Object> configMap = getConfigMap(maxTasks, taskId);

when(s3SourceConfig.originals()).thenReturn(configMap);
offsetManager = mock(OffsetManager.class);
Expand Down

0 comments on commit fabf6fe

Please sign in to comment.