Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java/C++] Archive variable length recordings. #1069

Merged
merged 52 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
600ba64
[Java] Migrate `RecordingDescriptorHeader.valid` field from byte to e…
vyazelenko Aug 11, 2020
a6403e5
[Java] Add CatalogIndex.
vyazelenko Aug 17, 2020
a720465
[Java] Remove limitation on max size of combined Stringx in the recor…
vyazelenko Aug 17, 2020
93b0e1b
[Java] Use `capacity` instead of `maxEntries` in the Catalog.
vyazelenko Aug 17, 2020
5ef3681
[Java] Validate recording length before growing Catalog.
vyazelenko Aug 18, 2020
2324d5a
[Java] Fix interceptor code for Catalog resize logging.
vyazelenko Aug 18, 2020
757c3d4
[Java] Increase major Archive version to reflect changes due to varia…
vyazelenko Aug 20, 2020
a6cf2e1
[Java] Make RecordingDescriptors variable-length.
vyazelenko Aug 20, 2020
ea958f2
[Java] Validate minimal Catalog capacity.
vyazelenko Aug 20, 2020
ffac1e0
[Java] Adjust test.
vyazelenko Aug 20, 2020
62ee53e
[Java] Make ArchiveConductor#findLastMatchingRecording return `UNKNOW…
vyazelenko Aug 20, 2020
44253c9
[Java] Delete CatalogIndex#forEach.
vyazelenko Aug 20, 2020
b304f77
[Java] Use CatalogIndex to find last matching element.
vyazelenko Aug 20, 2020
1bde54f
[Java] Add InvalidateRecordingRequest and enable agent logging for th…
vyazelenko Aug 20, 2020
36d018c
[Java] Implement Catalog#invalidateRecording.
vyazelenko Aug 20, 2020
d25e577
[Java] Rename `invalidateRecording` to `purgeRecording`.
vyazelenko Aug 21, 2020
0f43b0e
[Java] Implement ArchiveConductor#purgeRecording.
vyazelenko Aug 21, 2020
ee35a85
[Java] Align an entire recording descriptor frame length (header + de…
vyazelenko Aug 22, 2020
d03ca4b
[Java] Implement Catalog from version 2 to version 3.
vyazelenko Aug 25, 2020
e4b3ddc
[Java] Initialize CatalogHeaderEncoder/Decoder upon `growCatalog`.
vyazelenko Aug 25, 2020
5ee5f99
[Java] Add ArchiveTool#compact operation.
vyazelenko Aug 25, 2020
a566a28
[Java] Add ArchiveTool#capacity operations.
vyazelenko Aug 25, 2020
2eb1367
[Java] Compute checksum on the recording descriptor.
vyazelenko Aug 27, 2020
8e3f54c
[Java] Compute Catalog checksum in ArchiveTool.
vyazelenko Aug 27, 2020
a8b9657
[Java] Verify Catalog checksum in ArchiveTool.
vyazelenko Aug 27, 2020
57eb64d
[Java] Clarify that `count-entries` only counts `VALID` entries.
vyazelenko Aug 27, 2020
b0d966c
[Java] Move function to Tests.
vyazelenko Aug 28, 2020
d5b6a9f
[Java] Add support for jumbo descriptors to ControlResponseProxy#send…
vyazelenko Aug 28, 2020
0ac4615
[Java] Switch to `offer` instead of `tryClaim` in RecordingEventsProx…
vyazelenko Aug 28, 2020
a4507f7
[Java] Simplify Catalog#invalidateRecording.
vyazelenko Aug 28, 2020
cc93c63
[Java] Use fieldAccessBuffer with proper endianness in Catalog#wrapDe…
vyazelenko Aug 28, 2020
b6555c8
[Java] Add a test to ensure that capacity can never grow beyond maxCa…
vyazelenko Aug 28, 2020
2311db8
[Java] Refactor listRecordings.
vyazelenko Aug 28, 2020
8da1323
[Java] Remove RecordingDescriptorDecoder from ListRecordingsSession.
vyazelenko Aug 30, 2020
005e865
[Java] Use CatalogIndex to implement AbstractListRecordingsSession re…
vyazelenko Aug 30, 2020
d3332ac
[C++] Add `purgeRecording` command.
vyazelenko Aug 30, 2020
2c373ef
[Java] Fix truncate on Windows.
vyazelenko Aug 30, 2020
ae38e95
[Java] Unmap MappedByteBuffers before closing the FileChannel.
vyazelenko Aug 30, 2020
8e993b1
[Java] Unmap ByteBuffer used for creating ArchiveMarkFile.
vyazelenko Aug 30, 2020
514b266
[Java] Unmap MappedByteBuffers to allow deletion of the Catalog file …
vyazelenko Aug 30, 2020
a5d1af9
[Java] Pretty-print C driver output.
vyazelenko Aug 31, 2020
912df92
[Java] Enable all logging events in C driver.
vyazelenko Aug 31, 2020
5b99772
[C] Allocate dynamic buffers when message does not fit in a static bu…
vyazelenko Sep 3, 2020
8ed50c4
[C] Inline static_buffer_length to make code compile on Windows.
vyazelenko Sep 7, 2020
864e137
[C] Allocate static buffer only if message fits.
vyazelenko Sep 7, 2020
5a015eb
[C++/Java] Fix purgeRecording doc.
vyazelenko Sep 14, 2020
93bf8fd
[C++] Add tests for purgeRecording and jumbo RecordingDescriptor.
vyazelenko Sep 14, 2020
5e70fde
[C++] Increase `driver_proxy_command_buffer_t` size to 4K to make jum…
vyazelenko Sep 14, 2020
be2bbbd
[Java] Fix merge error.
vyazelenko Oct 1, 2020
9c0c103
[Java] Fix merge conflict.
vyazelenko Oct 20, 2020
d057a1d
[C] Fix merge conflicts and rename raw_log types.
vyazelenko Oct 20, 2020
3d1f621
[Java] Fix merge conflicts.
vyazelenko Nov 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ public enum ArchiveEventCode implements EventCode
REPLAY_SESSION_ERROR(36, -1,
(event, buffer, offset, builder) -> dissectReplaySessionError(buffer, offset, builder)),
CATALOG_RESIZE(37, -1,
(event, buffer, offset, builder) -> dissectCatalogResize(buffer, offset, builder));
(event, buffer, offset, builder) -> dissectCatalogResize(buffer, offset, builder)),

CMD_IN_PURGE_RECORDING(38, PurgeRecordingRequestDecoder.TEMPLATE_ID,
ArchiveEventDissector::dissectControlRequest);

static final int EVENT_CODE_TYPE = EventCodeType.ARCHIVE.getTypeCode();
private static final ArchiveEventCode[] EVENT_CODE_BY_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ final class ArchiveEventDissector
new TaggedReplicateRequestDecoder();
private static final StopRecordingByIdentityRequestDecoder STOP_RECORDING_BY_IDENTITY_REQUEST_DECODER =
new StopRecordingByIdentityRequestDecoder();
private static final PurgeRecordingRequestDecoder PURGE_RECORDING_REQUEST_DECODER =
new PurgeRecordingRequestDecoder();
private static final ControlResponseDecoder CONTROL_RESPONSE_DECODER = new ControlResponseDecoder();

private ArchiveEventDissector()
Expand Down Expand Up @@ -388,6 +390,15 @@ static void dissectControlRequest(
appendStopRecordingByIdentity(builder);
break;

case CMD_IN_PURGE_RECORDING:
PURGE_RECORDING_REQUEST_DECODER.wrap(
buffer,
offset + relativeOffset,
HEADER_DECODER.blockLength(),
HEADER_DECODER.version());
appendPurgeRecording(builder);
break;

default:
builder.append(": unknown command");
}
Expand Down Expand Up @@ -785,4 +796,12 @@ private static void appendTaggedReplicate(final StringBuilder builder)
builder.append(", liveDestination=");
TAGGED_REPLICATE_REQUEST_DECODER.getLiveDestination(builder);
}

private static void appendPurgeRecording(final StringBuilder builder)
{
builder.append(": controlSessionId=").append(PURGE_RECORDING_REQUEST_DECODER.controlSessionId())
.append(", correlationId=").append(PURGE_RECORDING_REQUEST_DECODER.correlationId())
.append(", recordingId=").append(PURGE_RECORDING_REQUEST_DECODER.recordingId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,14 @@ static void encodeCatalogResize(
final int offset,
final int captureLength,
final int length,
final int maxEntries,
final long catalogLength,
final int newMaxEntries,
final long newCatalogLength)
{
int relativeOffset = encodeLogHeader(encodingBuffer, offset, captureLength, length);

encodingBuffer.putInt(offset + relativeOffset, maxEntries, LITTLE_ENDIAN);
relativeOffset += SIZE_OF_INT;

encodingBuffer.putLong(offset + relativeOffset, catalogLength, LITTLE_ENDIAN);
relativeOffset += SIZE_OF_LONG;

encodingBuffer.putInt(offset + relativeOffset, newMaxEntries, LITTLE_ENDIAN);
relativeOffset += SIZE_OF_INT;

encodingBuffer.putLong(offset + relativeOffset, newCatalogLength, LITTLE_ENDIAN);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ public void logReplaySessionError(final long sessionId, final long recordingId,
}
}

public void logCatalogResize(
final int maxEntries, final long catalogLength, final int newMaxEntries, final long newCatalogLength)
public void logCatalogResize(final long catalogLength, final long newCatalogLength)
{
final int length = SIZE_OF_LONG * 2 + 2 * SIZE_OF_INT;
final int length = SIZE_OF_LONG * 2;
final int captureLength = captureLength(length);
final int encodedLength = encodedLength(captureLength);
final ManyToOneRingBuffer ringBuffer = this.ringBuffer;
Expand All @@ -153,9 +152,7 @@ public void logCatalogResize(
index,
captureLength,
length,
maxEntries,
catalogLength,
newMaxEntries,
newCatalogLength);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ static void onPendingError(final long sessionId, final long recordingId, final S
static class Catalog
{
@Advice.OnMethodEnter
static void catalogResized(
final int maxEntries, final long catalogLength, final int newMaxEntries, final long newCatalogLength)
static void catalogResized(final long catalogLength, final long newCatalogLength)
{
LOGGER.logCatalogResize(maxEntries, catalogLength, newMaxEntries, newCatalogLength);
LOGGER.logCatalogResize(catalogLength, newCatalogLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,4 +833,24 @@ void catalogResize()
" 24 entries (100 bytes) => 777 entries (10000000000 bytes)",
builder.toString());
}

@Test
void controlRequestPurgeRecording()
{
internalEncodeLogHeader(buffer, 0, 56, 901, () -> 1_125_000_000L);
final PurgeRecordingRequestEncoder requestEncoder = new PurgeRecordingRequestEncoder();
requestEncoder.wrapAndApplyHeader(buffer, LOG_HEADER_LENGTH, headerEncoder)
.controlSessionId(15)
.correlationId(421)
.recordingId(6);

dissectControlRequest(CMD_IN_PURGE_RECORDING, buffer, 0, builder);

assertEquals("[1.125] " + CONTEXT + ": " + CMD_IN_PURGE_RECORDING.name() + " [56/901]:" +
" controlSessionId=15" +
", correlationId=421" +
", recordingId=6",
builder.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,16 @@ void testEncodeCatalogResize()
final int offset = 24;
final int length = SIZE_OF_LONG * 2 + SIZE_OF_INT * 2;
final int captureLength = captureLength(length);
final int maxEntries = 3;
final long catalogLength = 128;
final int newMaxEntries = 7;
final long newCatalogLength = 1024;

encodeCatalogResize(
buffer, offset, captureLength, length, maxEntries, catalogLength, newMaxEntries, newCatalogLength);
buffer, offset, captureLength, length, catalogLength, newCatalogLength);

assertEquals(captureLength, buffer.getInt(offset, LITTLE_ENDIAN));
assertEquals(length, buffer.getInt(offset + SIZE_OF_INT, LITTLE_ENDIAN));
assertNotEquals(0, buffer.getLong(offset + SIZE_OF_INT * 2, LITTLE_ENDIAN));
assertEquals(maxEntries, buffer.getInt(offset + LOG_HEADER_LENGTH));
assertEquals(catalogLength, buffer.getLong(offset + LOG_HEADER_LENGTH + SIZE_OF_INT));
assertEquals(newMaxEntries, buffer.getInt(offset + LOG_HEADER_LENGTH + SIZE_OF_INT + SIZE_OF_LONG));
assertEquals(newCatalogLength, buffer.getLong(offset + LOG_HEADER_LENGTH + 2 * SIZE_OF_INT + SIZE_OF_LONG));
assertEquals(catalogLength, buffer.getLong(offset + LOG_HEADER_LENGTH));
assertEquals(newCatalogLength, buffer.getLong(offset + LOG_HEADER_LENGTH + SIZE_OF_LONG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,16 @@ void logCatalogResize()
{
final int offset = ALIGNMENT * 3;
logBuffer.putLong(CAPACITY + TAIL_POSITION_OFFSET, offset);
final int captureLength = SIZE_OF_LONG * 2 + SIZE_OF_INT * 2;
final int maxEntries = 21;
final int captureLength = SIZE_OF_LONG * 2;
final long catalogLength = 42;
final int newMaxEntries = 121;
final long newCatalogLength = 142;

logger.logCatalogResize(maxEntries, catalogLength, newMaxEntries, newCatalogLength);
logger.logCatalogResize(catalogLength, newCatalogLength);

verifyLogHeader(logBuffer, offset, toEventCodeId(CATALOG_RESIZE), captureLength, captureLength);
assertEquals(maxEntries, logBuffer.getInt(encodedMsgOffset(offset + LOG_HEADER_LENGTH), LITTLE_ENDIAN));
assertEquals(catalogLength,
logBuffer.getLong(encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_INT), LITTLE_ENDIAN));
assertEquals(newMaxEntries, logBuffer.getInt(
encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_INT + SIZE_OF_LONG), LITTLE_ENDIAN));
assertEquals(newCatalogLength, logBuffer.getLong(
encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_INT * 2 + SIZE_OF_LONG), LITTLE_ENDIAN));
logBuffer.getLong(encodedMsgOffset(offset + LOG_HEADER_LENGTH), LITTLE_ENDIAN));
assertEquals(newCatalogLength,
logBuffer.getLong(encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_LONG), LITTLE_ENDIAN));
}
}
1 change: 1 addition & 0 deletions aeron-archive/src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(GENERATED_CODECS
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/ExtendRecordingRequest2.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/RecordingPositionRequest.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/TruncateRecordingRequest.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/PurgeRecordingRequest.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/StopRecordingSubscriptionRequest.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/StartPositionRequest.h
${ARCHIVE_CODEC_TARGET_DIR}/aeron_archive_client/StopPositionRequest.h
Expand Down
24 changes: 24 additions & 0 deletions aeron-archive/src/main/cpp/client/AeronArchive.h
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,30 @@ class AeronArchive
pollForResponse<IdleStrategy>(m_lastCorrelationId);
}

/**
* Purge a stopped recording, i.e. mark recording as 'RecordingState#INVALID' and delete the corresponding segment
* files. The space in the Catalog will be reclaimed upon compaction.
*
* @param recordingId of the stopped recording to be purged.
* @tparam IdleStrategy to use for polling operations.
*/
template<typename IdleStrategy = aeron::concurrent::BackoffIdleStrategy>
inline void purgeRecording(std::int64_t recordingId)
{
std::lock_guard<std::recursive_mutex> lock(m_lock);
ensureOpen();
ensureNotReentrant();

m_lastCorrelationId = m_aeron->nextCorrelationId();

if (!m_archiveProxy->purgeRecording<IdleStrategy>(recordingId, m_lastCorrelationId, m_controlSessionId))
{
throw ArchiveException("failed to send purge recording request", SOURCEINFO);
}

pollForResponse<IdleStrategy>(m_lastCorrelationId);
}

/**
* List active recording subscriptions in the archive. These are the result of requesting one of
* #startRecording(String, int, SourceLocation) or a
Expand Down
17 changes: 17 additions & 0 deletions aeron-archive/src/main/cpp/client/ArchiveProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "aeron_archive_client/KeepAliveRequest.h"
#include "aeron_archive_client/ChallengeResponse.h"
#include "aeron_archive_client/TaggedReplicateRequest.h"
#include "aeron_archive_client/PurgeRecordingRequest.h"

using namespace aeron;
using namespace aeron::concurrent;
Expand Down Expand Up @@ -461,6 +462,22 @@ util::index_t ArchiveProxy::truncateRecording(
return messageAndHeaderLength(request);
}

util::index_t ArchiveProxy::purgeRecording(
AtomicBuffer &buffer,
std::int64_t recordingId,
std::int64_t correlationId,
std::int64_t controlSessionId)
{
PurgeRecordingRequest request;

wrapAndApplyHeader(request, buffer)
.controlSessionId(controlSessionId)
.correlationId(correlationId)
.recordingId(recordingId);

return messageAndHeaderLength(request);
}

util::index_t ArchiveProxy::listRecordingSubscriptions(
AtomicBuffer &buffer,
std::int32_t pseudoIndex,
Expand Down
26 changes: 26 additions & 0 deletions aeron-archive/src/main/cpp/client/ArchiveProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,26 @@ class ArchiveProxy
return offer<IdleStrategy>(m_buffer, 0, length);
}

/**
* Purge a stopped recording, i.e. mark recording as 'RecordingState#INVALID' and delete the corresponding segment
* files. The space in the Catalog will be reclaimed upon compaction.
*
* @param recordingId of the stopped recording to be purged.
* @param correlationId for this request.
* @param controlSessionId for this request.
* @return true if successfully offered otherwise false.
*/
template<typename IdleStrategy = aeron::concurrent::BackoffIdleStrategy>
bool purgeRecording(
std::int64_t recordingId,
std::int64_t correlationId,
std::int64_t controlSessionId)
{
const util::index_t length = purgeRecording(m_buffer, recordingId, correlationId, controlSessionId);

return offer<IdleStrategy>(m_buffer, 0, length);
}

/**
* List registered subscriptions in the archive which have been used to record streams.
*
Expand Down Expand Up @@ -1042,6 +1062,12 @@ class ArchiveProxy
std::int64_t correlationId,
std::int64_t controlSessionId);

static util::index_t purgeRecording(
AtomicBuffer &buffer,
std::int64_t recordingId,
std::int64_t correlationId,
std::int64_t controlSessionId);

static util::index_t listRecordingSubscriptions(
AtomicBuffer &buffer,
std::int32_t pseudoIndex,
Expand Down
Loading