Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ALLUXIO-2226] Add a LocalFirstPolicy that without evict action #4445

Merged
merged 19 commits into from
Jan 25, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.policy;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;

import javax.annotation.concurrent.ThreadSafe;
/**
* A policy that returns local host first, and if the local worker doesn't have enough availability,
* it randomly picks a worker from the active workers list for each block write.
* If No worker meets the demands, return local host.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also specify the behavior of USER_FILE_WRITE_CAPACITY_RESERVED_SIZE_BYTES?

* USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES is used to reserve some space of the worker
* to store the block, for the values mCapacityBytes minus mUsedBytes is not the available bytes.
*/
@ThreadSafe
public final class LocalFirstAvoidEvictionPolicy implements FileWriteLocationPolicy {
private String mLocalHostName;
/**
* Constructs a {@link LocalFirstAvoidEvictionPolicy}.
*/
public LocalFirstAvoidEvictionPolicy() {
mLocalHostName = NetworkAddressUtils.getLocalHostName();
}

@Override
public WorkerNetAddress getWorkerForNextBlock(Iterable<BlockWorkerInfo> workerInfoList,
long blockSizeBytes) {
// try the local host first
WorkerNetAddress localWorkerNetAddress = null;
for (BlockWorkerInfo workerInfo : workerInfoList) {
if (workerInfo.getNetAddress().getHost().equals(mLocalHostName)) {
localWorkerNetAddress = workerInfo.getNetAddress();
if (getAvailableBytes(workerInfo) >= blockSizeBytes) {
return localWorkerNetAddress;
}
}
}

// otherwise randomly pick a worker that has enough availability
List<BlockWorkerInfo> shuffledWorkers = Lists.newArrayList(workerInfoList);
Collections.shuffle(shuffledWorkers);
for (BlockWorkerInfo workerInfo : shuffledWorkers) {
if (getAvailableBytes(workerInfo) >= blockSizeBytes) {
return workerInfo.getNetAddress();
}
}
if (localWorkerNetAddress == null && shuffledWorkers.size() > 0) {
return shuffledWorkers.get(0).getNetAddress();
}
return localWorkerNetAddress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there was no worker on the local host? That means localWorkerNetAddress would have been null. Should null be returned, or should a random worker still be picked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gpang After long deliberation, if no worker on the local host and have on worker have available capacity to store the block, it should pick a random worker. When no worker in this cluster, it should return null. Thanks for you remind, it make sense.

}

/**
* The information of BlockWorkerInfo is update after a file complete write. To avoid evict,
* user should configure "alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes"
* to reserve some space to store the block.
*
* @param workerInfo BlockWorkerInfo of the worker
* @return the available bytes of the worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comment how USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES is used to compute the available bytes of the worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original idea is use the variable mCapacityBytes minus mUsedBytes of class BlockWorkerInfo to get the valid Bytes of the worker. But yupeng9 tell me that this way does not actually get the available Bytes for the information of class BlockWorkerInfo is update only after a file is completely write. So, he suggest me to reserve some space to store the block.

Copy link
Contributor Author

@gjhkael gjhkael Dec 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will comment above describe to the method. Thanks for you advise.

*/
private long getAvailableBytes(BlockWorkerInfo workerInfo) {
long mUserFileWriteCapacityReserved = Configuration
.getBytes(PropertyKey.USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES);
long mCapacityBytes = workerInfo.getCapacityBytes();
long mUsedBytes = workerInfo.getUsedBytes();
return mCapacityBytes - mUsedBytes - mUserFileWriteCapacityReserved;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof LocalFirstAvoidEvictionPolicy)) {
return false;
}
LocalFirstAvoidEvictionPolicy that = (LocalFirstAvoidEvictionPolicy) o;
return Objects.equal(mLocalHostName, that.mLocalHostName);
}

@Override
public int hashCode() {
return Objects.hashCode(mLocalHostName);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("localHostName", mLocalHostName)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.policy;

import alluxio.CommonTestUtils;
import alluxio.Constants;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerNetAddress;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Tests {@link LocalFirstAvoidEvictionPolicy}.
*/
public class LocalFirstAvoidEvictionPolicyTest {
private static final int PORT = 1;

/**
* Tests that the local host is returned first.
*/
@Test
public void getLocalFirst() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
Assert.assertEquals(localhostName,
policy.getWorkerForNextBlock(workerInfoList, Constants.MB).getHost());
}

/**
* Tests that another worker is picked in case the local host does not have enough space.
*/
@Test
public void getOthersWhenNotEnoughSpaceOnLocal() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.MB, Constants.MB));
Assert.assertEquals("worker1",
policy.getWorkerForNextBlock(workerInfoList, Constants.MB).getHost());
}

/**
* Tests that local host is picked if none of the workers has enough availability.
*/
@Test
public void getLocalWhenNoneHasSpace() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, Constants.MB));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, Constants.MB));
Assert.assertEquals(localhostName,
policy.getWorkerForNextBlock(workerInfoList, Constants.GB).getHost());
}

@Test
public void equalsTest() throws Exception {
CommonTestUtils.testEquals(LocalFirstAvoidEvictionPolicy.class);
}

}
4 changes: 4 additions & 0 deletions core/common/src/main/java/alluxio/PropertyKey.java
Original file line number Diff line number Diff line change
@@ -270,6 +270,8 @@ public enum PropertyKey {
Name.USER_FILE_WORKER_CLIENT_POOL_GC_THRESHOLD_MS, 300 * Constants.SECOND_MS),
USER_FILE_WRITE_LOCATION_POLICY(Name.USER_FILE_WRITE_LOCATION_POLICY,
"alluxio.client.file.policy.LocalFirstPolicy"),
USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES(
Name.USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES, "0MB"),
USER_FILE_WRITE_TYPE_DEFAULT(Name.USER_FILE_WRITE_TYPE_DEFAULT, "MUST_CACHE"),
USER_FILE_WRITE_TIER_DEFAULT(Name.USER_FILE_WRITE_TIER_DEFAULT, Constants.FIRST_TIER),
USER_HEARTBEAT_INTERVAL_MS(Name.USER_HEARTBEAT_INTERVAL_MS, 1000),
@@ -710,6 +712,8 @@ public static final class Name {
"alluxio.user.file.worker.client.pool.gc.threshold.ms";
public static final String USER_FILE_WRITE_LOCATION_POLICY =
"alluxio.user.file.write.location.policy.class";
public static final String USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES =
"alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes";
public static final String USER_FILE_WRITE_TYPE_DEFAULT = "alluxio.user.file.writetype.default";
public static final String USER_FILE_WRITE_TIER_DEFAULT =
"alluxio.user.file.write.tier.default";
2 changes: 2 additions & 0 deletions docs/_data/table/cn/user-configuration.yml
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ alluxio.user.file.worker.client.threads:
文件worker client从worker读取数据时使用的线程数目。
alluxio.user.file.write.location.policy.class:
选择worker进行写文件数据块时的默认定位机制。
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes:
当用户选择LocalFirstAvoidEvictionPolicy作为写文件数据块的定位机制时,用户需要配置worker预留一些数据量来保证数据的存储,默认是0MB。
alluxio.user.file.write.tier.default:
数据块写入的默认存储层。可选值为整型数值。非负值代表从高层到底层的存储层(0代表第一层存储层,1代表第二层存储层,以此类推)。如果给定值大于存储层数量,这个数字代表最底层的存储层。负值代表从底层到高层的存储层(-1代表最底层存储层,-2代表次底层存储层,以此类推)如果给定值的绝对值大于存储层数量,这个数字代表最高层存储层。
alluxio.user.file.readtype.default:
5 changes: 4 additions & 1 deletion docs/_data/table/en/user-configuration.yml
Original file line number Diff line number Diff line change
@@ -36,7 +36,10 @@ alluxio.user.file.waitcompleted.poll.ms:
alluxio.user.file.worker.client.threads:
How many threads to use for file worker clients to read from workers.
alluxio.user.file.write.location.policy.class:
The default location policy for choosing workers for writing a file's blocks.
The default location policy for choosing workers for writing a file's blocks
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes:
The portion of space reserved in worker when user use the LocalFirstAvoidEvictionPolicy class
as file write location policy, default 0 MB.
alluxio.user.file.write.tier.default:
The default tier for choosing a where to write a block. Valid option is any integer. Non-negative
values identify tiers starting from top going down (0 identifies the first tier, 1 identifies the
1 change: 1 addition & 0 deletions docs/_data/table/user-configuration.csv
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ alluxio.user.file.master.client.threads,10
alluxio.user.file.waitcompleted.poll.ms,1000
alluxio.user.file.worker.client.threads,10
alluxio.user.file.write.location.policy.class,alluxio.client.file.policy.LocalFirstPolicy
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes,0MB
alluxio.user.file.readtype.default,CACHE_PROMOTE
alluxio.user.file.writetype.default,MUST_CACHE
alluxio.user.file.write.tier.default,0