Skip to content

Commit

Permalink
[HUDI-4615] Return checkpoint as null for empty data from events queu…
Browse files Browse the repository at this point in the history
…e. (#6387)


Co-authored-by: sivabalan <[email protected]>
  • Loading branch information
vinishjail97 and nsivabalan authored Sep 7, 2022
1 parent 27c7efb commit d2d1cb8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ public Pair<List<String>, String> getNextEventsFromQueue(AmazonSQS sqs,
for (Map<String, Object> eventRecord : eventRecords) {
filteredEventRecords.add(new ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
}
return new ImmutablePair<>(filteredEventRecords, String.valueOf(newCheckpointTime));
// Return the old checkpoint if no messages to consume from queue.
String newCheckpoint = newCheckpointTime == 0 ? lastCheckpointStr.orElse(null) : String.valueOf(newCheckpointTime);
return new ImmutablePair<>(filteredEventRecords, newCheckpoint);
} catch (JSONException | IOException e) {
throw new HoodieException("Unable to read from SQS: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import org.apache.hadoop.fs.Path;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
Expand All @@ -43,8 +46,12 @@

import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
import static org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class TestS3EventsMetaSelector extends HoodieClientTestHarness {

Expand Down Expand Up @@ -102,4 +109,21 @@ public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class<?> clazz)
.getString("key"));
assertEquals("1627376736755", eventFromQueue.getRight());
}

@Test
public void testEventsFromQueueNoMessages() {
S3EventsMetaSelector selector = new S3EventsMetaSelector(props);
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(
new GetQueueAttributesResult()
.addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, "0"));

List<Message> processed = new ArrayList<>();
Pair<List<String>, String> eventFromQueue =
selector.getNextEventsFromQueue(sqs, Option.empty(), processed);

assertEquals(0, eventFromQueue.getLeft().size());
assertEquals(0, processed.size());
assertNull(eventFromQueue.getRight());
}
}

0 comments on commit d2d1cb8

Please sign in to comment.