Skip to content

Commit

Permalink
[ISSUE #3983] Optimize MessageQueue (#3984)
Browse files Browse the repository at this point in the history
* [ISSUE #3983] Optimize MessageQueue

* modify public of attribute items  to private

* optimize code readability

* optimize code logic
  • Loading branch information
mxsm authored Jun 7, 2023
1 parent d374551 commit c5bbd67
Showing 1 changed file with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@

import com.google.common.base.Preconditions;

import lombok.Getter;

/**
* This is a block queue, can get entity by offset. The queue is a FIFO data structure.
*/
public class MessageQueue {

@Getter
private final MessageEntity[] items;

private int takeIndex;
private volatile int takeIndex;

private int putIndex;
private volatile int putIndex;

private int count;
private volatile int count;

private final ReentrantLock lock;

Expand All @@ -59,9 +62,10 @@ public MessageQueue(int capacity) {
}

/**
* Insert the message at the tail of this queue, waiting for space to become available if the queue is full
* Inserts the specified MessageEntity object into the queue.
*
* @param messageEntity
* @param messageEntity The MessageEntity object to be inserted into the queue.
* @throws InterruptedException if the current thread is interrupted while waiting for space to become available in the queue.
*/
public void put(MessageEntity messageEntity) throws InterruptedException {
Preconditions.checkNotNull(messageEntity);
Expand All @@ -80,8 +84,8 @@ public void put(MessageEntity messageEntity) throws InterruptedException {
/**
* Get the first message at this queue, waiting for the message is available if the queue is empty, this method will not remove the message
*
* @return MessageEntity
* @throws InterruptedException
* @return The MessageEntity object at the head of the queue.
* @throws InterruptedException if the current thread is interrupted while waiting for an element to become available in the queue.
*/
public MessageEntity take() throws InterruptedException {
ReentrantLock reentrantLock = this.lock;
Expand Down Expand Up @@ -145,21 +149,26 @@ public MessageEntity getTail() {
/**
* Get the message by offset, since the offset is increment, so we can get the first message in this queue and calculate the index of this offset
*
* @param offset
* @return MessageEntity
* @param offset The offset of the MessageEntity object to be retrieved.
* @return The MessageEntity object with the specified offset, or null if no such object exists in the queue.
* @throws RuntimeException if the specified offset is less than the offset of the head MessageEntity object.
*/
public MessageEntity getByOffset(long offset) {
ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
try {
MessageEntity head = getHead();
if (head == null) {
if (count == 0) {
return null;
}
int tailIndex = putIndex - 1;
MessageEntity head = itemAt(takeIndex);
if (head.getOffset() > offset) {
throw new RuntimeException(String.format("The message has been deleted, offset: %s", offset));
}
MessageEntity tail = getTail();
if (tailIndex < 0) {
tailIndex += items.length;
}
MessageEntity tail = itemAt(tailIndex);
if (tail == null || tail.getOffset() < offset) {
return null;
}
Expand All @@ -174,6 +183,9 @@ public MessageEntity getByOffset(long offset) {
}
}

/**
* Removes the MessageEntity object at the head of the queue.
*/
public void removeHead() {
ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
Expand All @@ -195,11 +207,21 @@ public int getSize() {
return count;
}


/**
* Returns the MessageEntity object at the specified index.
*
* @param index The index of the MessageEntity object to be returned.
* @return The MessageEntity object at the specified index.
*/
private MessageEntity itemAt(int index) {
return items[index];
}

/**
* Insert the message at the tail of this queue, waiting for space to become available if the queue is full
*
* @param messageEntity The MessageEntity object to be inserted into the queue.
*/
private void enqueue(MessageEntity messageEntity) {
items[putIndex++] = messageEntity;
if (putIndex == items.length) {
Expand All @@ -209,6 +231,11 @@ private void enqueue(MessageEntity messageEntity) {
notEmpty.signalAll();
}

/**
* Removes and returns the MessageEntity object at the head of the queue.
*
* @return The MessageEntity object at the head of the queue.
*/
private MessageEntity dequeue() {
MessageEntity item = items[takeIndex++];
if (takeIndex == items.length) {
Expand All @@ -225,8 +252,4 @@ public int getTakeIndex() {
public int getPutIndex() {
return putIndex;
}

public MessageEntity[] getItems() {
return items;
}
}

0 comments on commit c5bbd67

Please sign in to comment.