Skip to content

Commit

Permalink
[Java] Changed publication log rotation algorithm so that if a publis…
Browse files Browse the repository at this point in the history
…her dies, or takes a long pause, mid rotation then other publishers can complete the operation and make progress. Issue #377.
  • Loading branch information
mjpt777 committed Sep 26, 2017
1 parent 6c43b8d commit c715c19
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 97 deletions.
41 changes: 19 additions & 22 deletions aeron-client/src/main/java/io/aeron/ExclusivePublication.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,44 +637,41 @@ private long newPosition(final int resultingOffset)

return termBeginPosition + resultingOffset;
}
else

if ((termBeginPosition + termBufferLength) >= maxPossiblePosition)
{
if ((termBeginPosition + termBufferLength) >= maxPossiblePosition)
{
return MAX_POSITION_EXCEEDED;
}
return MAX_POSITION_EXCEEDED;
}

final int nextIndex = nextPartitionIndex(activePartitionIndex);
final int nextTermId = termId + 1;
final int nextIndex = nextPartitionIndex(activePartitionIndex);
final int nextTermId = termId + 1;

activePartitionIndex = nextIndex;
termOffset = 0;
termId = nextTermId;
termBeginPosition = computeTermBeginPosition(nextTermId, positionBitsToShift, initialTermId);
activePartitionIndex = nextIndex;
termOffset = 0;
termId = nextTermId;
termBeginPosition = computeTermBeginPosition(nextTermId, positionBitsToShift, initialTermId);

final int termCount = nextTermId - initialTermId;
final int termCount = nextTermId - initialTermId;

initialiseTailWithTermId(logMetaDataBuffer, nextIndex, nextTermId);
activeTermCountOrdered(logMetaDataBuffer, termCount);
initialiseTailWithTermId(logMetaDataBuffer, nextIndex, nextTermId);
activeTermCountOrdered(logMetaDataBuffer, termCount);

return ADMIN_ACTION;
}
return ADMIN_ACTION;
}

private long backPressureStatus(final long currentPosition, final int messageLength)
{
long status = NOT_CONNECTED;

if ((currentPosition + messageLength) >= maxPossiblePosition)
{
status = MAX_POSITION_EXCEEDED;
return MAX_POSITION_EXCEEDED;
}
else if (LogBufferDescriptor.isConnected(logMetaDataBuffer))

if (LogBufferDescriptor.isConnected(logMetaDataBuffer))
{
status = BACK_PRESSURED;
return BACK_PRESSURED;
}

return status;
return NOT_CONNECTED;
}

private void checkForMaxPayloadLength(final int length)
Expand Down
35 changes: 11 additions & 24 deletions aeron-client/src/main/java/io/aeron/Publication.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,47 +594,34 @@ public void removeDestination(final String endpointChannel)
private long newPosition(
final int termCount, final int termOffset, final int termId, final long position, final int resultingOffset)
{
long newPosition = ADMIN_ACTION;
if (resultingOffset > 0)
{
newPosition = (position - termOffset) + resultingOffset;
return (position - termOffset) + resultingOffset;
}
else if ((position + termOffset) > maxPossiblePosition)

if ((position + termOffset) > maxPossiblePosition)
{
newPosition = MAX_POSITION_EXCEEDED;
return MAX_POSITION_EXCEEDED;
}
else if (resultingOffset == TermAppender.TRIPPED)
{
final int nextTermCount = termCount + 1;
final int nextIndex = indexByTermCount(nextTermCount);

initialiseTailWithTermId(logMetaDataBuffer, nextIndex, termId + 1);
rotateLog(logMetaDataBuffer, termCount, termId);

if (!casActiveTermCount(logMetaDataBuffer, termCount, nextTermCount))
{
throw new IllegalStateException(
"CAS failed: expected=" + termCount +
" update=" + nextTermCount + " actual=" + activeTermCount(logMetaDataBuffer));
}
}

return newPosition;
return ADMIN_ACTION;
}

private long backPressureStatus(final long currentPosition, final int messageLength)
{
long status = NOT_CONNECTED;

if ((currentPosition + messageLength) >= maxPossiblePosition)
{
status = MAX_POSITION_EXCEEDED;
return MAX_POSITION_EXCEEDED;
}
else if (LogBufferDescriptor.isConnected(logMetaDataBuffer))

if (LogBufferDescriptor.isConnected(logMetaDataBuffer))
{
status = BACK_PRESSURED;
return BACK_PRESSURED;
}

return status;
return NOT_CONNECTED;
}

private void checkForMaxPayloadLength(final int length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ExclusiveTermAppender
/**
* The append operation tripped the end of the buffer and needs to rotate.
*/
public static final int TRIPPED = -1;
public static final int FAILED = -1;

private final long tailAddressOffset;
private final byte[] tailBuffer;
Expand Down Expand Up @@ -80,7 +80,7 @@ public ExclusiveTermAppender(
* @param header for writing the default header.
* @param length of the message to be written.
* @param bufferClaim to be updated with the claimed region.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int claim(
final int termId,
Expand Down Expand Up @@ -117,7 +117,7 @@ public int claim(
* @param termOffset in the term at which to append.
* @param header for writing the default header.
* @param length of the padding to be written.
* @return the resulting offset of the term after success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after success otherwise {@link #FAILED}.
*/
public int appendPadding(
final int termId,
Expand Down Expand Up @@ -157,7 +157,7 @@ public int appendPadding(
* @param srcOffset at which the message begins.
* @param length of the message in the source buffer.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendUnfragmentedMessage(
final int termId,
Expand Down Expand Up @@ -206,7 +206,7 @@ public int appendUnfragmentedMessage(
* @param vectors to the buffers.
* @param length of the message as a sum of the vectors.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendUnfragmentedMessage(
final int termId,
Expand Down Expand Up @@ -263,7 +263,7 @@ public int appendUnfragmentedMessage(
* @param length of the message in the source buffer.
* @param maxPayloadLength that the message will be fragmented into.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendFragmentedMessage(
final int termId,
Expand Down Expand Up @@ -343,7 +343,7 @@ public int appendFragmentedMessage(
* @param length of the message in the source buffer.
* @param maxPayloadLength that the message will be fragmented into.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendFragmentedMessage(
final int termId,
Expand Down Expand Up @@ -447,7 +447,7 @@ private int handleEndOfLogCondition(
frameLengthOrdered(termBuffer, offset, paddingLength);
}

return TRIPPED;
return FAILED;
}

private void putRawTailOrdered(final int termId, final int termOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ public static void applyDefaultHeader(
}

/**
* Rotate the log and update the default headers for the new term.
* Rotate the log and update the tail counter for the new term.
*
* This method is safe for concurrent use.
*
* @param logMetaDataBuffer for the meta data.
* @param currentTermCount from which to rotate.
Expand All @@ -551,10 +553,23 @@ public static void applyDefaultHeader(
public static void rotateLog(
final UnsafeBuffer logMetaDataBuffer, final int currentTermCount, final int currentTermId)
{
final int nextTermId = currentTermId + 1;
final int nextTermCount = currentTermCount + 1;
final int nextIndex = indexByTermCount(nextTermCount);
initialiseTailWithTermId(logMetaDataBuffer, nextIndex, currentTermId + 1);
activeTermCountOrdered(logMetaDataBuffer, nextTermCount);
final int expectedTermId = nextTermId - PARTITION_COUNT;

long rawTail;
do
{
rawTail = rawTail(logMetaDataBuffer, nextIndex);
if (expectedTermId != termId(rawTail))
{
break;
}
}
while (!casRawTail(logMetaDataBuffer, nextIndex, rawTail, packTail(nextTermId, 0)));

casActiveTermCount(logMetaDataBuffer, currentTermCount, nextTermCount);
}

/**
Expand All @@ -567,7 +582,7 @@ public static void rotateLog(
public static void initialiseTailWithTermId(
final UnsafeBuffer logMetaData, final int partitionIndex, final int termId)
{
logMetaData.putLongOrdered(TERM_TAIL_COUNTERS_OFFSET + (partitionIndex * SIZE_OF_LONG), packTail(termId, 0));
logMetaData.putLong(TERM_TAIL_COUNTERS_OFFSET + (partitionIndex * SIZE_OF_LONG), packTail(termId, 0));
}

/**
Expand Down Expand Up @@ -679,4 +694,23 @@ public static long rawTailVolatile(final UnsafeBuffer logMetaDataBuffer)
final int partitionIndex = indexByTermCount(activeTermCount(logMetaDataBuffer));
return logMetaDataBuffer.getLongVolatile(TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * partitionIndex));
}

/**
* Compare and set the raw value of the tail for the given partition.
*
* @param logMetaDataBuffer containing the tail counters.
* @param partitionIndex for the tail counter.
* @param expectedRawTail expected current value.
* @param updateRawTail to be applied.
* @return true if the update was successful otherwise false.
*/
public static boolean casRawTail(
final UnsafeBuffer logMetaDataBuffer,
final int partitionIndex,
final long expectedRawTail,
final long updateRawTail)
{
final int index = TERM_TAIL_COUNTERS_OFFSET + (SIZE_OF_LONG * partitionIndex);
return logMetaDataBuffer.compareAndSetLong(index, expectedRawTail, updateRawTail);
}
}
41 changes: 12 additions & 29 deletions aeron-client/src/main/java/io/aeron/logbuffer/TermAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@
*/
public class TermAppender
{
/**
* The append operation tripped the end of the buffer and needs to rotate.
*/
public static final int TRIPPED = -1;

/**
* The append operation failed because it was past the end of the buffer.
*/
Expand Down Expand Up @@ -98,8 +93,7 @@ public long rawTailVolatile()
* @param length of the message to be written.
* @param bufferClaim to be updated with the claimed region.
* @param activeTermId used for flow control.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}
* or {@link #FAILED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int claim(
final HeaderWriter header,
Expand Down Expand Up @@ -141,8 +135,7 @@ public int claim(
* @param length of the message in the source buffer.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @param activeTermId used for flow control.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED} or
* {@link #FAILED}
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}
*/
public int appendUnfragmentedMessage(
final HeaderWriter header,
Expand Down Expand Up @@ -193,8 +186,7 @@ public int appendUnfragmentedMessage(
* @param length of the message as a sum of the vectors.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @param activeTermId used for flow control.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED} or
* {@link #FAILED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendUnfragmentedMessage(
final HeaderWriter header,
Expand Down Expand Up @@ -253,8 +245,7 @@ public int appendUnfragmentedMessage(
* @param maxPayloadLength that the message will be fragmented into.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @param activeTermId used for flow control.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}
* or {@link #FAILED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendFragmentedMessage(
final HeaderWriter header,
Expand Down Expand Up @@ -336,8 +327,7 @@ public int appendFragmentedMessage(
* @param maxPayloadLength that the message will be fragmented into.
* @param reservedValueSupplier {@link ReservedValueSupplier} for the frame.
* @param activeTermId used for flow control.
* @return the resulting offset of the term after the append on success otherwise {@link #TRIPPED}
* or {@link #FAILED}.
* @return the resulting offset of the term after the append on success otherwise {@link #FAILED}.
*/
public int appendFragmentedMessage(
final HeaderWriter header,
Expand Down Expand Up @@ -443,23 +433,16 @@ private int handleEndOfLogCondition(
final int termLength,
final int termId)
{
int resultingOffset = FAILED;

if (termOffset <= termLength)
if (termOffset < termLength)
{
resultingOffset = TRIPPED;

if (termOffset < termLength)
{
final int offset = (int)termOffset;
final int paddingLength = termLength - offset;
header.write(termBuffer, offset, paddingLength, termId);
frameType(termBuffer, offset, PADDING_FRAME_TYPE);
frameLengthOrdered(termBuffer, offset, paddingLength);
}
final int offset = (int)termOffset;
final int paddingLength = termLength - offset;
header.write(termBuffer, offset, paddingLength, termId);
frameType(termBuffer, offset, PADDING_FRAME_TYPE);
frameLengthOrdered(termBuffer, offset, paddingLength);
}

return resultingOffset;
return FAILED;
}

private long getAndAddRawTail(final int alignedLength)
Expand Down
Loading

0 comments on commit c715c19

Please sign in to comment.