Skip to content

Commit

Permalink
[#436] feat(client,server): Introduce multi-part LocalStorageManager (#…
Browse files Browse the repository at this point in the history
…2253)

### What changes were proposed in this pull request?

- Introduce a factory to create specific LocalStorageManager by config.
- Introduce multiply disk LocalStorageManager.

### Why are the changes needed?

Fix: #436 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Existing UTs and new added UT
- Tested on our pressure test cluster.
  - new client -> old server ✓
  - new client -> new server ✓
  - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers
  • Loading branch information
maobaolong authored Nov 26, 2024
1 parent 54611f3 commit b7d391c
Show file tree
Hide file tree
Showing 40 changed files with 1,526 additions and 248 deletions.
142 changes: 142 additions & 0 deletions common/src/main/java/io/netty/util/CompositeFileRegion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.util;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

import io.netty.channel.FileRegion;

import org.apache.uniffle.common.netty.protocol.AbstractFileRegion;

public class CompositeFileRegion extends AbstractFileRegion {
private final FileRegion[] regions;
private long totalSize = 0;
private long bytesTransferred = 0;

public CompositeFileRegion(FileRegion... regions) {
this.regions = regions;
for (FileRegion region : regions) {
totalSize += region.count();
}
}

@Override
public long position() {
return bytesTransferred;
}

@Override
public long count() {
return totalSize;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
long totalBytesTransferred = 0;

for (FileRegion region : regions) {
if (position >= region.count()) {
position -= region.count();
} else {
long currentBytesTransferred = region.transferTo(target, position);
totalBytesTransferred += currentBytesTransferred;
bytesTransferred += currentBytesTransferred;

if (currentBytesTransferred < region.count() - position) {
break;
}
position = 0;
}
}

return totalBytesTransferred;
}

@Override
public long transferred() {
return bytesTransferred;
}

@Override
public AbstractFileRegion retain() {
super.retain();
for (FileRegion region : regions) {
region.retain();
}
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
for (FileRegion region : regions) {
region.retain(increment);
}
return this;
}

@Override
public boolean release() {
boolean released = super.release();
for (FileRegion region : regions) {
if (!region.release()) {
released = false;
}
}
return released;
}

@Override
public boolean release(int decrement) {
boolean released = super.release(decrement);
for (FileRegion region : regions) {
if (!region.release(decrement)) {
released = false;
}
}
return released;
}

@Override
protected void deallocate() {
for (FileRegion region : regions) {
if (region instanceof AbstractReferenceCounted) {
((AbstractReferenceCounted) region).deallocate();
}
}
}

@Override
public AbstractFileRegion touch() {
super.touch();
for (FileRegion region : regions) {
region.touch();
}
return this;
}

@Override
public AbstractFileRegion touch(Object hint) {
super.touch(hint);
for (FileRegion region : regions) {
region.touch(hint);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
public class ShuffleDataSegment {
private final long offset;
private final int length;

private final int storageId;
private final List<BufferSegment> bufferSegments;

public ShuffleDataSegment(long offset, int length, List<BufferSegment> bufferSegments) {
public ShuffleDataSegment(
long offset, int length, int storageId, List<BufferSegment> bufferSegments) {
this.offset = offset;
this.length = length;
this.storageId = storageId;
this.bufferSegments = bufferSegments;
}

Expand All @@ -46,4 +50,8 @@ public int getLength() {
public List<BufferSegment> getBufferSegments() {
return bufferSegments;
}

public int getStorageId() {
return storageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

public class ShuffleIndexResult {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleIndexResult.class);
private static final int[] DEFAULT_STORAGE_IDS = new int[] {0};

private final ManagedBuffer buffer;
private final int[] storageIds;
private long dataFileLen;
private String dataFileName;

Expand All @@ -44,15 +46,28 @@ public ShuffleIndexResult(byte[] data, long dataFileLen) {
}

public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
this.buffer =
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
this.dataFileLen = dataFileLen;
this(
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER),
dataFileLen,
null,
DEFAULT_STORAGE_IDS);
}

public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String dataFileName) {
this(buffer, dataFileLen, dataFileName, DEFAULT_STORAGE_IDS);
}

public ShuffleIndexResult(
ManagedBuffer buffer, long dataFileLen, String dataFileName, int storageId) {
this(buffer, dataFileLen, dataFileName, new int[] {storageId});
}

public ShuffleIndexResult(
ManagedBuffer buffer, long dataFileLen, String dataFileName, int[] storageIds) {
this.buffer = buffer;
this.dataFileLen = dataFileLen;
this.dataFileName = dataFileName;
this.storageIds = storageIds;
}

public byte[] getData() {
Expand Down Expand Up @@ -99,4 +114,8 @@ public ManagedBuffer getManagedBuffer() {
public String getDataFileName() {
return dataFileName;
}

public int[] getStorageIds() {
return storageIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
header.writeInt(bodyLength);
in.encode(header);
if (header.writableBytes() != 0) {
throw new RssException("header's writable bytes should be 0");
throw new RssException(
"header's writable bytes should be 0, but it is " + header.writableBytes());
}

if (body != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty.buffer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.CompositeFileRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A wrapper of multiple {@link FileSegmentManagedBuffer}, used for combine shuffle index files. */
public class MultiFileSegmentManagedBuffer extends ManagedBuffer {

private static final Logger LOG = LoggerFactory.getLogger(MultiFileSegmentManagedBuffer.class);
private final List<ManagedBuffer> managedBuffers;

public MultiFileSegmentManagedBuffer(List<ManagedBuffer> managedBuffers) {
this.managedBuffers = managedBuffers;
}

@Override
public int size() {
return managedBuffers.stream().mapToInt(ManagedBuffer::size).sum();
}

@Override
public ByteBuf byteBuf() {
return Unpooled.wrappedBuffer(this.nioByteBuffer());
}

@Override
public ByteBuffer nioByteBuffer() {
ByteBuffer merged = ByteBuffer.allocate(size());
for (ManagedBuffer managedBuffer : managedBuffers) {
ByteBuffer buffer = managedBuffer.nioByteBuffer();
merged.put(buffer.slice());
}
merged.flip();
return merged;
}

@Override
public ManagedBuffer retain() {
return this;
}

@Override
public ManagedBuffer release() {
return this;
}

@Override
public Object convertToNetty() {
List<FileRegion> fileRegions = new ArrayList<>(managedBuffers.size());
for (ManagedBuffer managedBuffer : managedBuffers) {
Object object = managedBuffer.convertToNetty();
if (object instanceof FileRegion) {
fileRegions.add((FileRegion) object);
}
}
return new CompositeFileRegion(fileRegions.toArray(new FileRegion[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class GetLocalShuffleDataRequest extends RequestMessage {
private long offset;
private int length;
private long timestamp;
private int storageId;

public GetLocalShuffleDataRequest(
long requestId,
Expand All @@ -41,6 +42,30 @@ public GetLocalShuffleDataRequest(
long offset,
int length,
long timestamp) {
this(
requestId,
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
offset,
length,
-1,
timestamp);
}

protected GetLocalShuffleDataRequest(
long requestId,
String appId,
int shuffleId,
int partitionId,
int partitionNumPerRange,
int partitionNum,
long offset,
int length,
int storageId,
long timestamp) {
super(requestId);
this.appId = appId;
this.shuffleId = shuffleId;
Expand All @@ -49,6 +74,7 @@ public GetLocalShuffleDataRequest(
this.partitionNum = partitionNum;
this.offset = offset;
this.length = length;
this.storageId = storageId;
this.timestamp = timestamp;
}

Expand Down Expand Up @@ -132,6 +158,10 @@ public long getTimestamp() {
return timestamp;
}

public int getStorageId() {
return storageId;
}

@Override
public String getOperationType() {
return "getLocalShuffleData";
Expand Down
Loading

0 comments on commit b7d391c

Please sign in to comment.