Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisted unsafe queues implementation #691

Merged
merged 14 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.moquette.broker.unsafequeues;

import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Properties;

/**
* Default implementation of SegmentAllocator. It uses a series of files (named pages) and split them in segments.
*
* This class is not thread safe.
* */
class PagedFilesAllocator implements SegmentAllocator {

interface AllocationListener {
void segmentedCreated(String name, Segment segment);
}

public static final int MB = 1024 * 1024;
public static final int PAGE_SIZE = 64 * MB;
private final Path pagesFolder;
private final int segmentSize;
private int lastSegmentAllocated;
private int lastPage;
private MappedByteBuffer currentPage;
private FileChannel currentPageFile;

PagedFilesAllocator(Path pagesFolder, int segmentSize, int lastPage, int lastSegmentAllocated) throws QueueException {
this.pagesFolder = pagesFolder;
this.segmentSize = segmentSize;
this.lastPage = lastPage;
this.lastSegmentAllocated = lastSegmentAllocated;
this.currentPage = openRWPageFile(this.pagesFolder, this.lastPage);
}

private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws QueueException {
final Path pageFile = pagesFolder.resolve(String.format("%d.page", pageId));
boolean createNew = false;
if (!Files.exists(pageFile)) {
try {
pageFile.toFile().createNewFile();
createNew = true;
} catch (IOException ex) {
throw new QueueException("Reached an IO error during the bootstrapping of empty 'checkpoint.properties'", ex);
}
}

try (FileChannel fileChannel = FileChannel.open(pageFile, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
this.currentPageFile = fileChannel;
final MappedByteBuffer mappedPage = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, PAGE_SIZE);
// DBG
if (createNew && QueuePool.queueDebug) {
for (int i = 0; i < PAGE_SIZE; i++){
mappedPage.put(i, (byte) 'C');
}
}
// DBG
return mappedPage;
} catch (IOException e) {
throw new QueueException("Can't open page file " + pageFile, e);
}
}

@Override
public Segment nextFreeSegment() throws QueueException {
if (currentPageIsExhausted()) {
lastPage ++;
currentPage = openRWPageFile(pagesFolder, lastPage);
lastSegmentAllocated = 0;
}

final int beginOffset = lastSegmentAllocated * segmentSize;
final int endOffset = ((lastSegmentAllocated + 1) * segmentSize) - 1;

lastSegmentAllocated += 1;
return new Segment(currentPage, new SegmentPointer(lastPage, beginOffset), new SegmentPointer(lastPage, endOffset));
}

@Override
public Segment reopenSegment(int pageId, int beginOffset) throws QueueException {
final MappedByteBuffer page = openRWPageFile(pagesFolder, pageId);
final SegmentPointer begin = new SegmentPointer(pageId, beginOffset);
final SegmentPointer end = new SegmentPointer(pageId, beginOffset + segmentSize - 1);
return new Segment(page, begin, end);
}

@Override
public void close() throws QueueException {
if (currentPageFile != null) {
try {
currentPageFile.close();
} catch (IOException ex) {
throw new QueueException("Problem closing current page file", ex);
}
}
}

@Override
public void dumpState(Properties checkpoint) {
checkpoint.setProperty("segments.last_page", String.valueOf(this.lastPage));
checkpoint.setProperty("segments.last_segment", String.valueOf(this.lastSegmentAllocated));
}

private boolean currentPageIsExhausted() {
return lastSegmentAllocated * segmentSize == PAGE_SIZE;
}
}
Loading