Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

major: Extend Consumer and Function for more cohesive API #303

Closed
wants to merge 54 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
21f08f6
START: the basics of a single queue
astubbs Mar 8, 2022
3f1f3ff
step: remove work mailbox manager
astubbs Mar 8, 2022
92b50cb
step: BROKEN: assign epoch to record immediately
astubbs Mar 8, 2022
1653bc2
step - trying to test perf
astubbs Mar 17, 2022
103c677
update
astubbs Mar 17, 2022
eed2190
logs
astubbs Mar 17, 2022
02ee3cd
fix: Debug output for sorted encoding pairs
astubbs Mar 17, 2022
83fda73
save
astubbs Mar 17, 2022
2a37d46
rebase update
astubbs Mar 25, 2022
b17c838
step
astubbs Apr 4, 2022
e1141e4
save
astubbs Apr 5, 2022
5a3bb55
save
astubbs Apr 5, 2022
6a1464c
save: unit test version of offset encoding backpressure test
astubbs Apr 5, 2022
0604934
save
astubbs Apr 5, 2022
4eeb008
omg - hashsets vs queues, wow
astubbs Apr 5, 2022
bcfc9c1
review
astubbs Apr 6, 2022
6054ac5
review
astubbs Apr 6, 2022
c968629
review
astubbs Apr 6, 2022
0f993dd
review
astubbs Apr 6, 2022
416fd2f
Merge remote-tracking branch 'confluent/master' into features/single-…
astubbs Apr 21, 2022
189dc59
step
astubbs Apr 21, 2022
908d8ed
step
astubbs Apr 21, 2022
7547ec6
fix test
astubbs Apr 21, 2022
939a15e
step - test fix?
astubbs Apr 21, 2022
3fa6ae3
step - test fix?
astubbs Apr 21, 2022
c44f50a
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
eff0b13
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
a6dc167
START: Explicit retry exception for cleaner logging
astubbs Apr 21, 2022
74e0efb
step: reduce consumer max poll
astubbs Apr 21, 2022
ae1ce22
step: loosen duplicate check a bit for jenkins
astubbs Apr 21, 2022
62ffa63
step: fix generics
astubbs Apr 21, 2022
bf4452e
step: Experiment: synchronisation no longer needed due to stronger ep…
astubbs Apr 21, 2022
94ebc5c
turn max poll back to default (500)
astubbs Apr 21, 2022
1e8fcd9
license
astubbs Apr 21, 2022
b85fd2d
review
astubbs Apr 22, 2022
c6056fe
review
astubbs Apr 22, 2022
333ccac
review
astubbs Apr 22, 2022
d751aa4
review
astubbs Apr 22, 2022
aa5c0e1
fix
astubbs Apr 22, 2022
3b51ffe
START: Rename PartitionMonitor to PartitionStateManager
astubbs Apr 22, 2022
1b087e5
Merge branch 'features/single-queue' into features/retry-exception
astubbs Apr 22, 2022
88e9f0e
START: Experiment: extend Consumer and Function - centralises documen…
astubbs May 13, 2022
6862a56
step
astubbs May 13, 2022
14f1d47
refactor #409: Clarify truncation code
astubbs Oct 19, 2022
24c61f1
refactor #409: Clarify truncation code - tests passing
astubbs Oct 19, 2022
62be60b
refactor #409: Clarify truncation code - tests passing
astubbs Oct 19, 2022
2122e9e
bump jabel for j19
astubbs Oct 19, 2022
0131d35
cleanup
astubbs Oct 19, 2022
158ca87
fix
astubbs Oct 19, 2022
a66b673
Merge remote-tracking branch 'origin/features/retry-exception' into e…
astubbs Oct 19, 2022
40ba4e3
merge fix
astubbs Oct 19, 2022
f97c3cb
migrate
astubbs Oct 19, 2022
8a996af
Merge branch 'features/retry-exception-simple' into features/extend-f…
astubbs Oct 20, 2022
f831137
step
astubbs Oct 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
omg - hashsets vs queues, wow
astubbs committed Apr 6, 2022
commit 4eeb0086b9942825e0a388d41603a804757c9dfb
Original file line number Diff line number Diff line change
@@ -889,7 +889,7 @@ private void transitionToClosing() {
*/
private void processWorkCompleteMailBox() {
log.trace("Processing mailbox (might block waiting for results)...");
Set<ActionItem> results = new HashSet<>();
Queue<ActionItem> results = new ArrayDeque<>();

final Duration timeToBlockFor = getTimeToBlockFor();

@@ -927,7 +927,7 @@ private void processWorkCompleteMailBox() {
for (var action : results) {
WorkContainer<K, V> work = action.getWorkContainer();
if (work == null) {
EpochAndRecords consumerRecords = action.getConsumerRecords();
EpochAndRecords<?, ?> consumerRecords = action.getConsumerRecords();
wm.registerWork(consumerRecords);
} else {
MDC.put("offset", work.toString());
@@ -1138,6 +1138,7 @@ protected void addToMailbox(WorkContainer<K, V> wc) {

public void registerWork(EpochAndRecords polledRecords) {
log.debug("Adding {} to mailbox...", polledRecords);

workMailBox.add(new ActionItem(null, polledRecords));
}

Original file line number Diff line number Diff line change
@@ -186,9 +186,15 @@ private void updateHighestSucceededOffsetSoFar(WorkContainer<K, V> work) {
}

public void addWorkContainer(WorkContainer<K, V> wc) {
long offsetHighestSeen = getOffsetHighestSeen();
if (wc.offset() != offsetHighestSeen + 1) {
log.error("");
}

maybeRaiseHighestSeenOffset(wc.offset());
commitQueue.put(wc.offset(), wc);
incompleteOffsets.add(wc.offset());

}

/**
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ public class ProcessingShard<K, V> {
* Uses a ConcurrentSkipListMap instead of a TreeMap as under high pressure there appears to be some concurrency
* errors (missing WorkContainers).
*/
@Getter
private final NavigableMap<Long, WorkContainer<K, V>> entries = new ConcurrentSkipListMap<>();

@Getter(PRIVATE)
@@ -74,9 +75,19 @@ public Optional<WorkContainer<K, V>> getWorkForOffset(long offset) {
}

public long getCountOfWorkAwaitingSelection() {
return entries.values().parallelStream()
return entries.values().stream()
// todo missing pm.isBlocked(topicPartition) ?
.filter(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork())
.filter(WorkContainer::isAvailableToTakeAsWork)
.count();
}

public long getCountOfWorkTracked() {
return entries.size();
}

public long getCountWorkInFlight() {
return entries.values().stream()
.filter(WorkContainer::isInFlight)
.count();
}

Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ public class ShardManager<K, V> {
* @see WorkManager#getWorkIfAvailable()
*/
// todo performance: disable/remove if using partition order
@Getter
private final Map<Object, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<>();

private final NavigableSet<WorkContainer<?, ?>> retryQueue = new TreeSet<>(Comparator.comparing(wc -> wc.getDelayUntilRetryDue()));
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ public abstract class AbstractParallelEoSStreamProcessorTestBase {

protected AbstractParallelEoSStreamProcessor<String, String> parentParallelConsumer;

public static int defaultTimeoutSeconds = 30;
public static int defaultTimeoutSeconds = 3000;

public static Duration defaultTimeout = ofSeconds(defaultTimeoutSeconds);
protected static long defaultTimeoutMs = defaultTimeout.toMillis();
@@ -287,7 +287,7 @@ private void blockingLoopLatchTrigger(int waitForCount) {
loopLatchV = new CountDownLatch(waitForCount);
try {
boolean timeout = !loopLatchV.await(defaultTimeoutSeconds, SECONDS);
if (timeout)
if (timeout || parentParallelConsumer.isClosedOrFailed())
throw new TimeoutException(msg("Timeout of {}, waiting for {} counts, on latch with {} left", defaultTimeout, waitForCount, loopLatchV.getCount()));
} catch (InterruptedException e) {
log.error("Interrupted while waiting for loop latch - timeout was {}", defaultTimeout);
Original file line number Diff line number Diff line change
@@ -9,10 +9,9 @@
import io.confluent.parallelconsumer.FakeRuntimeError;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes;
import io.confluent.parallelconsumer.state.PartitionMonitor;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.confluent.parallelconsumer.state.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
@@ -23,12 +22,11 @@
import pl.tlinkowski.unij.api.UniLists;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static io.confluent.csid.utils.JavaUtils.getLast;
import static io.confluent.csid.utils.JavaUtils.getOnlyOne;
@@ -83,9 +81,12 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O
OffsetMapCodecManager.DefaultMaxMetadataSize = 40; // reduce available to make testing easier
OffsetMapCodecManager.forcedCodec = Optional.of(OffsetEncoding.BitSetV2); // force one that takes a predictable large amount of space

ktu.send(consumerSpy, ktu.generateRecords(numberOfRecords));
List<ConsumerRecord<String, String>> records = ktu.generateRecords(numberOfRecords);
ktu.send(consumerSpy, records);

AtomicInteger userFuncFinishedCount = new AtomicInteger();
AtomicInteger userFuncStartCount = new AtomicInteger();

AtomicInteger userFuncFinishedCount = new AtomicInteger(0);
CountDownLatch msgLock = new CountDownLatch(1);
CountDownLatch msgLockTwo = new CountDownLatch(1);
CountDownLatch msgLockThree = new CountDownLatch(1);
@@ -94,32 +95,40 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O
List<Long> blockedOffsets = UniLists.of(0L, 2L);
final int numberOfBlockedMessages = blockedOffsets.size();

parallelConsumer.poll((rec) -> {
WorkManager<String, String> wm = parallelConsumer.getWm();
final PartitionState<String, String> partitionState = wm.getPm().getPartitionState(topicPartition);

ConcurrentLinkedQueue<Long> seen = new ConcurrentLinkedQueue<>();

parallelConsumer.poll((rec) -> {
seen.add(rec.offset());
userFuncStartCount.incrementAndGet();
// block the partition to create bigger and bigger offset encoding blocks
// don't let offset 0 finish
if (rec.offset() == offsetToBlock) {
int attemptNumber = attempts.incrementAndGet();
if (attemptNumber == 1) {
log.debug("Force first message to 'never' complete, causing a large offset encoding (lots of messages completing above the low water mark. Waiting for msgLock countdown.");
int sleepFor = 120;
awaitLatch(msgLock, sleepFor);
int timeout = 120;
awaitLatch(msgLock, timeout);
log.debug("Very slow message awoken, throwing exception");
throw new FakeRuntimeError("Fake error");
} else {
log.debug("Second attempt, waiting for msgLockTwo countdown");
awaitLatch(msgLockTwo, 60);
log.debug("Second attempt, unlocked, succeeding");
}
} else if (rec.offset() == 2l) {
} else if (rec.offset() == 2L) {
awaitLatch(msgLockThree);
log.debug("// msg 2L unblocked");
} else {
sleepQuietly(1);
}
userFuncFinishedCount.getAndIncrement();
userFuncFinishedCount.incrementAndGet();
});

ShardManager<String, String> sm = wm.getSm();

try {

// wait for all pre-produced messages to be processed and produced
@@ -129,7 +138,17 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O
//, () -> parallelConsumer.getFailureCause()) // requires https://github.com/awaitility/awaitility/issues/178#issuecomment-734769761
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
ProcessingShard<String, String> stringStringProcessingShard = sm.getProcessingShards().get(topicPartition);
if (stringStringProcessingShard != null) {
long countOfWorkAwaitingSelection = stringStringProcessingShard.getCountOfWorkAwaitingSelection();
NavigableMap<Long, WorkContainer<String, String>> entries = stringStringProcessingShard.getEntries();
boolean b = sm.workIsWaitingToBeProcessed();
long countWorkInFlight = stringStringProcessingShard.getCountWorkInFlight();
long countOfWorkTracked = stringStringProcessingShard.getCountOfWorkTracked();
long numberOfWorkQueuedInShardsAwaitingSelection = sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
}
assertThat(userFuncFinishedCount.get()).isEqualTo(numberOfRecords - numberOfBlockedMessages);
// Truth.assertThat(numberOfWorkQueuedInShardsAwaitingSelection).isEqualTo(-4);
});

// # assert commit ok - nothing blocked
@@ -159,7 +178,6 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O
assertThat(highestSeenOffset).as("offset 99 is encoded as having been seen").isEqualTo(expectedHighestSeenOffset);
}

WorkManager<String, String> wm = parallelConsumer.getWm();

// partition not blocked
{
@@ -202,25 +220,30 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O
.deserialiseIncompleteOffsetMapFromBase64(0L, meta);
Truth.assertWithMessage("The only incomplete record now is offset zero, which we are blocked on")
.that(incompletes.getIncompleteOffsets()).containsExactlyElementsIn(blockedOffsets);
Truth8.assertThat(incompletes.getHighestSeenOffset()).hasValue(numberOfRecords + extraRecordsToBlockWithThresholdBlocks - 1);
int expectedHighestSeen = numberOfRecords + extraRecordsToBlockWithThresholdBlocks - 1;
Truth8.assertThat(incompletes.getHighestSeenOffset()).hasValue(expectedHighestSeen);
}
);
}

// recreates the situation where the payload size is too large and must be dropped
log.debug("// test max payload exceeded, payload dropped");
int processedBeforePartitionBlock = userFuncFinishedCount.get();
int extraMessages = numberOfRecords + extraRecordsToBlockWithThresholdBlocks / 2;
log.debug("// messages already sent {}, sending {} more", processedBeforePartitionBlock, extraMessages);

// log.debug("// messages already sent {}, sending {} more", processedBeforePartitionBlock, extraMessages);
{
long numberOfWorkQueuedInShardsAwaitingSelection = sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
log.debug("// force system to allow more records (i.e. the actual system attempts to never allow the payload to grow this big)");
PartitionMonitor.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(2);
PartitionMonitor.setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(30);
parallelConsumer.requestCommitAsap();
awaitForOneLoopCycle();


//
log.debug("// unlock to make state dirty to get a commit");

msgLockThree.countDown();
int processedBeforePartitionBlock = userFuncFinishedCount.get();
int extraMessages = numberOfRecords + extraRecordsToBlockWithThresholdBlocks / 2;
log.debug("// send {} more messages", extraMessages);
ktu.send(consumerSpy, ktu.generateRecords(extraMessages));

@@ -229,7 +252,18 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O

log.debug("// wait for the new message to be processed");
await().atMost(defaultTimeout).untilAsserted(() ->
assertThat(userFuncFinishedCount.get()).isEqualTo(processedBeforePartitionBlock + extraMessages + 1)
{
long numberOfWorkQueuedInShardsAwaitingSelection1 = sm.getNumberOfWorkQueuedInShardsAwaitingSelection();
ShardManager<String, String> sm1 = sm;
List<Long> seen1 = seen.stream().sorted().collect(Collectors.toList());
long offsetHighestSucceeded = partitionState.getOffsetHighestSucceeded();
long offsetHighestSeen = partitionState.getOffsetHighestSeen();
long offsetHighestSequentialSucceeded = partitionState.getOffsetHighestSequentialSucceeded();
int i = userFuncStartCount.get();
int i1 = userFuncFinishedCount.get();
int expectedUserFunctionFinishedCount = processedBeforePartitionBlock + extraMessages + 1;
assertThat(userFuncFinishedCount.get()).isEqualTo(expectedUserFunctionFinishedCount);
}
);

log.debug("// assert payload missing from commit now");
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.state.PartitionMonitor;
import io.confluent.parallelconsumer.state.PartitionState;
@@ -207,6 +208,7 @@ private void sendRecordsToWM(int numberOfRecords, WorkManager<String, String> wm
log.debug("~Sending {} more records", numberOfRecords);
List<ConsumerRecord<String, String>> records = ktu.generateRecords(numberOfRecords);
wm.registerWork(new ConsumerRecords<>(UniMaps.of(topicPartition, records)));
Truth.assertThat(wm.getTotalWorkAwaitingIngestion()).isEqualTo(numberOfRecords);
}