Skip to content

Commit

Permalink
spring-projectsGH-249 Addressing code review comments related to code…
Browse files Browse the repository at this point in the history
… style.
  • Loading branch information
siddharthjain210 committed Dec 14, 2024
1 parent 377dbe6 commit 3d7de0f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
6 changes: 5 additions & 1 deletion src/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>

</module>

<!-- Enable this in the future to restrict a Line Length Check.-->
<!-- <module name="LineLength">-->
<!-- <property name="max" value="120"/>-->
<!-- </module>-->
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement

private volatile ScheduledFuture<?> flushFuture;

private long maxRecordsInFlight = 0;
private long maxInFlightRecords = 0;

private int maxRecordInFlightsSleepDurationInMillis = 100;
private int maxInFlightRecordsDuration = 100;

public KplMessageHandler(KinesisProducer kinesisProducer) {
Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
Expand All @@ -121,23 +121,31 @@ public void setConverter(Converter<Object, byte[]> converter) {
}

/**
* When in KPL mode, the setting allows handling backpressure on the KPL native process. Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in maxRecordInFlightsSleepDurationInMillis.
* When in KPL mode, the setting allows handling backpressure on the KPL native process.
* Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in
* maxRecordInFlightsSleepDurationInMillis.
*
* @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. Specify a positive value to enable back pressure.
* @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled.
* Specify a positive value to enable back pressure.
* @since 3.0.9
*/
public void setMaxOutstandingRecordsInFlight(long maxRecordsInFlight) {
public void setMaxRecordsInFlight(long maxRecordsInFlight) {
Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0.");
this.maxRecordsInFlight = maxRecordsInFlight;
this.maxInFlightRecords = maxRecordsInFlight;
}

/**
* The setting allows handling backpressure on the KPL native process. Enabled when maxOutstandingRecordsCount is greater than 0. The configurations puts the KPL Thread to sleep for the specified number of milliseconds.
* The setting allows handling backpressure on the KPL native process.
* Enabled when maxOutstandingRecordsCount is greater than 0.
* The configurations puts the KPL Thread to sleep for the specified number of milliseconds.
*
* @param maxRecordInFlightsSleepDurationInMillis Default is 100ms.
* @param maxInFlightRecordsDuration Default is 100ms.
* @since 3.0.9
*/
public void setMaxRecordInFlightsSleepDurationInMillis(int maxRecordInFlightsSleepDurationInMillis) {
Assert.isTrue(maxRecordInFlightsSleepDurationInMillis > 0, "'maxRecordInFlightsSleepDurationInMillis must be greater than 0.");
this.maxRecordInFlightsSleepDurationInMillis = maxRecordInFlightsSleepDurationInMillis;
public void setMaxInFlightRecordsDuration(int maxInFlightRecordsDuration) {
Assert.isTrue(maxInFlightRecordsDuration > 0,
"'maxRecordInFlightsSleepDurationInMillis must be greater than 0.");
this.maxInFlightRecordsDuration = maxInFlightRecordsDuration;
}

/**
Expand Down Expand Up @@ -394,9 +402,10 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
}

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
if (this.maxRecordsInFlight != -1 && this.kinesisProducer.getOutstandingRecordsCount() > this.maxRecordsInFlight) {
if (this.maxInFlightRecords != -1 &&
this.kinesisProducer.getOutstandingRecordsCount() > this.maxInFlightRecords) {
try {
Thread.sleep(this.maxRecordInFlightsSleepDurationInMillis);
Thread.sleep(this.maxInFlightRecordsDuration);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -439,7 +448,8 @@ private PutRecordRequest buildPutRecordRequest(Message<?> message) {
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. "
Assert.state(partitionKey != null,
"'partitionKey' must not be null for sending a Kinesis record. "
+ "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " +
"or supply an 'aws_partitionKey' message header.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
/**
* @author Siddharth Jain
*
* @since 4.0
* @since 3.0.9
*/
@SpringJUnitConfig
@DirtiesContext
public class KplMessageHandlerTest {
public class KplMessageHandlerTests {

@Autowired
protected KinesisProducer kinesisProducer;
Expand Down Expand Up @@ -92,8 +92,8 @@ void testKPLMessageHandler_raw_payload_success() {
void testKPLMessageHandler_raw_payload_success_backpressure_test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxOutstandingRecordsInFlight(1);
this.kplMessageHandler.setMaxRecordInFlightsSleepDurationInMillis(100);
this.kplMessageHandler.setMaxRecordsInFlight(1);
this.kplMessageHandler.setMaxInFlightRecordsDuration(100);
given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(2);
final Message<?> message = MessageBuilder
.withPayload("message1")
Expand Down Expand Up @@ -139,4 +139,5 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
return kplMessageHandler;
}
}

}

0 comments on commit 3d7de0f

Please sign in to comment.