Skip to content

Commit

Permalink
Showing 7 changed files with 7 additions and 261 deletions.
Original file line number Diff line number Diff line change
@@ -188,7 +188,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
Path qualifiedPath = makeQualified(f);

try {
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
@@ -250,7 +250,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
Path qualifiedPath = makeQualified(f);

try {
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
Original file line number Diff line number Diff line change
@@ -412,10 +412,8 @@ public void deleteFilesystem() throws AzureBlobFileSystemException {
}
}

public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics,
final boolean overwrite, final FsPermission permission,
final FsPermission umask) throws AzureBlobFileSystemException {
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
final FsPermission umask) throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled();
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -438,7 +436,6 @@ public OutputStream createFile(final Path path,

return new AbfsOutputStream(
client,
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0,
abfsConfiguration.getWriteBufferSize(),
@@ -499,7 +496,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
}
}

public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@@ -532,7 +529,6 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic

return new AbfsOutputStream(
client,
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset,
abfsConfiguration.getWriteBufferSize(),
Original file line number Diff line number Diff line change
@@ -101,7 +101,6 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
incrementReadOps();
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
@@ -202,7 +201,6 @@ private int readInternal(final long position, final byte[] b, final int offset,
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
incrementReadOps();
return receivedBytes;
}

@@ -238,7 +236,6 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
incrementReadOps();
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@@ -255,15 +252,6 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
return (int) bytesRead;
}

/**
* Increment Read Operations.
*/
private void incrementReadOps() {
if (statistics != null) {
statistics.incrementReadOps(1);
}
}

/**
* Seek to given position in stream.
* @param n position to seek to
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
@@ -81,11 +80,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final ElasticByteBufferPool byteBufferPool
= new ElasticByteBufferPool();

private final Statistics statistics;

public AbfsOutputStream(
final AbfsClient client,
final Statistics statistics,
final String path,
final long position,
final int bufferSize,
@@ -94,7 +90,6 @@ public AbfsOutputStream(
final boolean supportAppendWithFlush,
final boolean appendBlob) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.position = position;
this.closed = false;
@@ -192,16 +187,6 @@ public synchronized void write(final byte[] data, final int off, final int lengt

writableBytes = bufferSize - bufferIndex;
}
incrementWriteOps();
}

/**
* Increment Write Operations.
*/
private void incrementWriteOps() {
if (statistics != null) {
statistics.incrementWriteOps(1);
}
}

/**
Original file line number Diff line number Diff line change
@@ -17,19 +17,12 @@
*/
package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;

import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;

@@ -38,9 +31,6 @@
* This class does not attempt to bind to Azure.
*/
public class AbstractAbfsTestWithTimeout extends Assert {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);

/**
* The name of the current method.
*/
@@ -77,53 +67,4 @@ public void nameThread() {
protected int getTestTimeoutMillis() {
return TEST_TIMEOUT;
}

/**
* Describe a test in the logs.
*
* @param text text to print
* @param args arguments to format in the printing
*/
protected void describe(String text, Object... args) {
LOG.info("\n\n{}: {}\n",
methodName.getMethodName(),
String.format(text, args));
}

/**
* Validate Contents written on a file in Abfs.
*
* @param fs AzureBlobFileSystem
* @param path Path of the file
* @param originalByteArray original byte array
* @return if content is validated true else, false
* @throws IOException
*/
protected boolean validateContent(AzureBlobFileSystem fs, Path path,
byte[] originalByteArray)
throws IOException {
int pos = 0;
int lenOfOriginalByteArray = originalByteArray.length;

try (FSDataInputStream in = fs.open(path)) {
byte valueOfContentAtPos = (byte) in.read();

while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
if (originalByteArray[pos] != valueOfContentAtPos) {
assertEquals("Mismatch in content validation at position {}", pos,
originalByteArray[pos], valueOfContentAtPos);
return false;
}
valueOfContentAtPos = (byte) in.read();
pos++;
}
if (valueOfContentAtPos != -1) {
assertEquals("Expected end of file", -1, valueOfContentAtPos);
return false;
}
return true;
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -22,21 +22,18 @@
import java.io.InputStream;
import java.util.Map;

import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
@@ -55,8 +52,6 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
private static final Path FILE_PATH = new Path("/testFile");
private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);

public ITestAzureBlobFileSystemOauth() throws Exception {
Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
@@ -148,11 +143,9 @@ public void testBlobDataReader() throws Exception {

// TEST WRITE FILE
try {
abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true);
abfsStore.openFileForWrite(EXISTED_FILE_PATH, true);
} catch (AbfsRestOperationException e) {
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
} finally {
IOUtils.cleanupWithLogger(LOG, abfsStore);
}

}

0 comments on commit 28afdce

Please sign in to comment.