Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ALLUXIO-2226] Add a LocalFirstPolicy that without evict action #4445

Merged
merged 19 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.policy;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;

import javax.annotation.concurrent.ThreadSafe;
/**
* A policy that returns local host first, and if the local worker doesn't have enough availability,
* it randomly picks a worker from the active workers list for each block write.
* If No worker meets the demands, return local host.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also specify the behavior of USER_FILE_WRITE_CAPACITY_RESERVED_SIZE_BYTES?

* USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES is used to reserve some space of the worker
* to store the block, for the values mCapacityBytes minus mUsedBytes is not the available bytes.
*/
@ThreadSafe
public final class LocalFirstAvoidEvictionPolicy implements FileWriteLocationPolicy {
private String mLocalHostName;
/**
* Constructs a {@link LocalFirstAvoidEvictionPolicy}.
*/
public LocalFirstAvoidEvictionPolicy() {
mLocalHostName = NetworkAddressUtils.getLocalHostName();
}

@Override
public WorkerNetAddress getWorkerForNextBlock(Iterable<BlockWorkerInfo> workerInfoList,
long blockSizeBytes) {
// try the local host first
WorkerNetAddress localWorkerNetAddress = null;
for (BlockWorkerInfo workerInfo : workerInfoList) {
if (workerInfo.getNetAddress().getHost().equals(mLocalHostName)) {
localWorkerNetAddress = workerInfo.getNetAddress();
if (getAvailableBytes(workerInfo) >= blockSizeBytes) {
return localWorkerNetAddress;
}
}
}

// otherwise randomly pick a worker that has enough availability
List<BlockWorkerInfo> shuffledWorkers = Lists.newArrayList(workerInfoList);
Collections.shuffle(shuffledWorkers);
for (BlockWorkerInfo workerInfo : shuffledWorkers) {
if (getAvailableBytes(workerInfo) >= blockSizeBytes) {
return workerInfo.getNetAddress();
}
}
return localWorkerNetAddress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there was no worker on the local host? That means localWorkerNetAddress would have been null. Should null be returned, or should a random worker still be picked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gpang After long deliberation, if no worker on the local host and have on worker have available capacity to store the block, it should pick a random worker. When no worker in this cluster, it should return null. Thanks for you remind, it make sense.

}

/**
* @param workerInfo BlockWorkerInfo of the worker
* @return the available bytes of the worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comment how USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES is used to compute the available bytes of the worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original idea is use the variable mCapacityBytes minus mUsedBytes of class BlockWorkerInfo to get the valid Bytes of the worker. But yupeng9 tell me that this way does not actually get the available Bytes for the information of class BlockWorkerInfo is update only after a file is completely write. So, he suggest me to reserve some space to store the block.

Copy link
Contributor Author

@gjhkael gjhkael Dec 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will comment above describe to the method. Thanks for you advise.

*/
private long getAvailableBytes(BlockWorkerInfo workerInfo) {
long mUserFileWriteCapacityReserved = Configuration
.getBytes(PropertyKey.USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES);
long mCapacityBytes = workerInfo.getCapacityBytes();
long mUsedBytes = workerInfo.getUsedBytes();
return mCapacityBytes - mUsedBytes - mUserFileWriteCapacityReserved;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof LocalFirstAvoidEvictionPolicy)) {
return false;
}
LocalFirstAvoidEvictionPolicy that = (LocalFirstAvoidEvictionPolicy) o;
return Objects.equal(mLocalHostName, that.mLocalHostName);
}

@Override
public int hashCode() {
return Objects.hashCode(mLocalHostName);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("localHostName", mLocalHostName)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.policy;

import alluxio.CommonTestUtils;
import alluxio.Constants;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerNetAddress;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Tests {@link LocalFirstAvoidEvictionPolicy}.
*/
public class LocalFirstAvoidEvictionPolicyTest {
private static final int PORT = 1;

/**
* Tests that the local host is returned first.
*/
@Test
public void getLocalFirst() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
Assert.assertEquals(localhostName,
policy.getWorkerForNextBlock(workerInfoList, Constants.MB).getHost());
}

/**
* Tests that another worker is picked in case the local host does not have enough space.
*/
@Test
public void getOthersWhenNotEnoughSpaceOnLocal() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, 0));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.MB, Constants.MB));
Assert.assertEquals("worker1",
policy.getWorkerForNextBlock(workerInfoList, Constants.MB).getHost());
}

/**
* Tests that local host is picked if none of the workers has enough availability.
*/
@Test
public void getLocalWhenNoneHasSpace() {
String localhostName = NetworkAddressUtils.getLocalHostName();
LocalFirstAvoidEvictionPolicy policy = new LocalFirstAvoidEvictionPolicy();
List<BlockWorkerInfo> workerInfoList = new ArrayList<>();
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost("worker1")
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, Constants.MB));
workerInfoList.add(new BlockWorkerInfo(new WorkerNetAddress().setHost(localhostName)
.setRpcPort(PORT).setDataPort(PORT).setWebPort(PORT), Constants.GB, Constants.MB));
Assert.assertEquals(localhostName,
policy.getWorkerForNextBlock(workerInfoList, Constants.GB).getHost());
}

@Test
public void equalsTest() throws Exception {
CommonTestUtils.testEquals(LocalFirstAvoidEvictionPolicy.class);
}

}
4 changes: 4 additions & 0 deletions core/common/src/main/java/alluxio/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public enum PropertyKey {
Name.USER_FILE_WORKER_CLIENT_POOL_GC_THRESHOLD_MS, 300 * Constants.SECOND_MS),
USER_FILE_WRITE_LOCATION_POLICY(Name.USER_FILE_WRITE_LOCATION_POLICY,
"alluxio.client.file.policy.LocalFirstPolicy"),
USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES(
Name.USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES, "0MB"),
USER_FILE_WRITE_TYPE_DEFAULT(Name.USER_FILE_WRITE_TYPE_DEFAULT, "MUST_CACHE"),
USER_HEARTBEAT_INTERVAL_MS(Name.USER_HEARTBEAT_INTERVAL_MS, 1000),
USER_LINEAGE_ENABLED(Name.USER_LINEAGE_ENABLED, false),
Expand Down Expand Up @@ -661,6 +663,8 @@ public static final class Name {
"alluxio.user.file.worker.client.pool.gc.threshold.ms";
public static final String USER_FILE_WRITE_LOCATION_POLICY =
"alluxio.user.file.write.location.policy.class";
public static final String USER_FILE_WRITE_AVOID_EVICTION_POLICY_RESERVED_BYTES =
"alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes";
public static final String USER_FILE_WRITE_TYPE_DEFAULT = "alluxio.user.file.writetype.default";
public static final String USER_HEARTBEAT_INTERVAL_MS = "alluxio.user.heartbeat.interval.ms";
public static final String USER_LINEAGE_ENABLED = "alluxio.user.lineage.enabled";
Expand Down
2 changes: 2 additions & 0 deletions docs/_data/table/cn/user-configuration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ alluxio.user.file.worker.client.threads:
文件worker client从worker读取数据时使用的线程数目。
alluxio.user.file.write.location.policy.class:
选择worker进行写文件数据块时的默认定位机制。
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes:
当用户选择LocalFirstAvoidEvictionPolicy作为写文件数据块的定位机制时,用户需要配置worker预留一些数据量来保证数据的存储,默认是0MB。
alluxio.user.file.readtype.default:
创建Alluxio文件时的默认读类型。可选值为`CACHE_PROMOTE` (如果数据已经在Alluxio存储内,将其移动到最高存储层,如果数据需要从底层存储进行读取,将其写到本地Alluxio的最高存储层)、`CACHE` (如果数据需要从底层存储进行读取,将其写到本地Alluxio的最高存储层), `NO_CACHE` (数据不与Alluxio交互,如果是从Alluxio中进行读取,将不会发生数据块迁移或者剔除)。
alluxio.user.file.writetype.default:
Expand Down
3 changes: 3 additions & 0 deletions docs/_data/table/en/user-configuration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ alluxio.user.file.worker.client.threads:
How many threads to use for file worker clients to read from workers.
alluxio.user.file.write.location.policy.class:
The default location policy for choosing workers for writing a file's blocks
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes:
The portion of space reserved in worker when user use the LocalFirstAvoidEvictionPolicy class
as file write location policy, default 0 MB.
alluxio.user.file.readtype.default:
Default read type when creating Alluxio files.
Valid options are `CACHE_PROMOTE` (move data to highest tier if already in Alluxio storage,
Expand Down
1 change: 1 addition & 0 deletions docs/_data/table/user-configuration.csv
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ alluxio.user.file.master.client.threads,10
alluxio.user.file.waitcompleted.poll.ms,1000
alluxio.user.file.worker.client.threads,10
alluxio.user.file.write.location.policy.class,alluxio.client.file.policy.LocalFirstPolicy
alluxio.user.file.write.avoid.eviction.policy.reserved.size.bytes,0MB
alluxio.user.file.readtype.default,CACHE_PROMOTE
alluxio.user.file.writetype.default,MUST_CACHE
alluxio.user.heartbeat.interval.ms,1000
Expand Down