Skip to content

Commit

Permalink
minor fixes and upload file added
Browse files Browse the repository at this point in the history
  • Loading branch information
alebastrov committed Aug 30, 2023
1 parent 7a85bdd commit 8b63e3f
Show file tree
Hide file tree
Showing 34 changed files with 130 additions and 90 deletions.
10 changes: 7 additions & 3 deletions oap-fs/src/main/java/oap/fs/LocalFileManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Map;
import java.util.Optional;

import static oap.io.content.ContentWriter.ofBytes;

@Slf4j
public class LocalFileManager extends AbstractFileManager implements FileManager<Data> {

Expand All @@ -43,19 +45,21 @@ public LocalFileManager( Map<String, Path> buckets ) {
public String write( String bucket, Data data ) {
var name = data.nameOrConstruct( cuid.next() );
var path = getBucket( bucket ).resolve( name );
Files.write( path, data.decoded() );
Files.write( path, data.decoded(), ofBytes() );
return name;
}

@Override
public Optional<byte[]> read( String bucket, String relativePath ) {
var path = getBucket( bucket ).resolve( relativePath );
return Files.exists( path ) ? Optional.of( Files.read( path, ContentReader.ofBytes() ) ) : Optional.empty();
return Files.exists( path )
? Optional.of( Files.read( path, ContentReader.ofBytes() ) )
: Optional.empty();
}

@Override
public void copyFromTo( String src, String dist ) {
log.debug( "Copy files from {} to {}", src, dist );
log.trace( "Copying file from {} to {}", src, dist );
Files.copyContent( Path.of( src ), Path.of( dist ) );
}
}
6 changes: 4 additions & 2 deletions oap-fs/src/main/java/oap/fs/S3FileManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public S3FileManager( Map<String, Path> buckets, String region ) {
super( buckets );
this.region = region;
log.info( "Init s3-file-manager" );
var s3Client = AmazonS3ClientBuilder.standard().withRegion( this.region ).withCredentials( new ProfileCredentialsProvider() ).build();
if( this.s3client == null ) this.s3client = s3Client;
if( this.s3client == null ) {
this.s3client = AmazonS3ClientBuilder.standard().withRegion( this.region ).withCredentials( new ProfileCredentialsProvider() ).build();
}
transferManager = TransferManagerBuilder.standard().withS3Client( this.s3client ).build();
}

Expand Down Expand Up @@ -79,5 +80,6 @@ public Download download( String bucket, String relativePath, File destination )
}

public void copyFromTo( String src, String dist ) {
log.warn( "Not implemented for S3" );
}
}
1 change: 1 addition & 0 deletions oap-jpath/src/main/java/oap/jpath/JPathOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package oap.jpath;

@FunctionalInterface
public interface JPathOutput {
void write( Pointer pointer );
}
3 changes: 2 additions & 1 deletion oap-jpath/src/main/java/oap/jpath/MapPointer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public Pointer resolve( AbstractPathNode n ) {
if( n.type == PathType.FIELD ) {
var ret = v.get( n.name );
return ret != null ? Pointer.get( ret ) : NullPointer.INSTANCE;
} else return super.resolve( n );
}
return super.resolve( n );
}
}
2 changes: 0 additions & 2 deletions oap-jpath/src/main/java/oap/jpath/PathExpression.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public void add( AbstractPathNode path ) {
@SuppressWarnings( "unchecked" )
public void evaluate( Map<String, Object> variables, JPathOutput output ) {
Pointer pointer = new MapPointer( ( Map<Object, Object> ) ( Object ) variables );

for( var n : list ) pointer = pointer.resolve( n );

output.write( pointer );
}
}
6 changes: 3 additions & 3 deletions oap-jpath/src/main/java/oap/jpath/PathNodeArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ protected PathNodeArray( String name, int index ) {
}

@Override
public Object evaluate( Object v, Reflection reflect ) throws PathNotFound {
public Object evaluate( Object v, Reflection reflect ) throws PathNotFoundException {
log.trace( "array -> {}[{}]", name, index );
var field = reflect.field( name ).orElse( null );
if( field == null ) throw new PathNotFound();
if( field == null ) throw new PathNotFoundException( "Field not found: " + name );

if( field.isArray() ) {
return Array.get( field.get( v ), index );
} else if( field.underlying.getType().isAssignableFrom( List.class ) ) {
return ( ( List<?> ) field.get( v ) ).get( index );
}

throw new PathNotFound();
throw new PathNotFoundException( "Field not found: " + name );
}
}
4 changes: 2 additions & 2 deletions oap-jpath/src/main/java/oap/jpath/PathNodeField.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ protected PathNodeField( String name ) {
}

@Override
public Object evaluate( Object v, Reflection reflect ) throws PathNotFound {
public Object evaluate( Object v, Reflection reflect ) throws PathNotFoundException {
log.trace( "field -> {}", name );
var field = reflect.field( name ).orElse( null );
if( field == null ) throw new PathNotFound();
if( field == null ) throw new PathNotFoundException( "Field not found: " + name );
return field.get( v );
}
}
8 changes: 6 additions & 2 deletions oap-jpath/src/main/java/oap/jpath/PathNodeMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ protected PathNodeMethod( String name, List<Object> arguments ) {
public Object evaluate( Object v, Reflection reflect ) {
log.trace( "method -> {}", name );
var method = reflect.method( m -> m.name().equals( name ) && equals( m.parameters, arguments ) ).orElse( null );
if( method == null ) throw new PathNotFound();
method.underlying.setAccessible( true );
if( method == null ) throw new PathNotFoundException( "method " + name + " not found on object of class " + v.getClass().getCanonicalName() );
try {
method.underlying.setAccessible( true );
} catch ( Exception ex ) {
throw new ReflectionException( ex );
}
return method.invoke( v, arguments.toArray( new Object[0] ) );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@

package oap.jpath;

public class PathNotFound extends RuntimeException {
public class PathNotFoundException extends RuntimeException {
public PathNotFoundException( String message ) {
super( message );
}
}
7 changes: 7 additions & 0 deletions oap-jpath/src/main/java/oap/jpath/ReflectionException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package oap.jpath;

public class ReflectionException extends RuntimeException {
public ReflectionException( Exception ex ) {
super( ex );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public void write( Pointer pointer ) {
}

public void reset() {
sb.delete( 0, sb.length() );
sb.setLength( 0 );
}
}
4 changes: 3 additions & 1 deletion oap-pnio/src/main/java/oap/http/pnio/PnioBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.io.ByteStreams;
import oap.io.FixedLengthArrayOutputStream;

import javax.annotation.concurrent.NotThreadSafe;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -36,6 +37,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

@NotThreadSafe
public class PnioBuffer {
byte[] buffer;
public int length;
Expand All @@ -46,7 +48,7 @@ public PnioBuffer( int capacity ) {
}

public final void copyFrom( InputStream inputStream ) throws IOException, BufferOverflowException {
FixedLengthArrayOutputStream to = new FixedLengthArrayOutputStream( buffer );
var to = new FixedLengthArrayOutputStream( buffer );
ByteStreams.copy( inputStream, to );
length = to.size();
}
Expand Down
30 changes: 12 additions & 18 deletions oap-pnio/src/main/java/oap/http/pnio/PnioExchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class PnioExchange<WorkflowState> {
public final PnioBuffer requestBuffer;
public final PnioBuffer responseBuffer;

public CompletableFuture<Void> future;
public volatile CompletableFuture<Void> future;
public Throwable throwable;
public volatile ProcessState processState = ProcessState.RUNNING;

Expand Down Expand Up @@ -71,15 +71,15 @@ public String getCurrentTaskName() {
return currentTaskNode.handler.getClass().getSimpleName();
}

private void readFully(InputStream body) {
private void readFully( InputStream body ) {
try {
requestBuffer.copyFrom(body);
} catch (BufferOverflowException e) {
completeWithBufferOverflow(true);
} catch (SocketException e) {
completeWithConnectionClosed(e);
} catch (Exception e) {
completeWithFail(e);
requestBuffer.copyFrom( body );
} catch ( BufferOverflowException e ) {
completeWithBufferOverflow( true );
} catch ( SocketException e ) {
completeWithConnectionClosed( e );
} catch ( Exception e ) {
completeWithFail( e );
}
}

Expand All @@ -89,43 +89,37 @@ public String getRequestAsString() {

public void completeWithBufferOverflow(boolean request) {
processState = request ? ProcessState.REQUEST_BUFFER_OVERFLOW : ProcessState.RESPONSE_BUFFER_OVERFLOW;

completeFuture();
}

public void completeWithTimeout() {
processState = ProcessState.TIMEOUT;

completeFuture();
}

public void completeWithConnectionClosed(Throwable throwable) {
this.throwable = throwable;
this.processState = ProcessState.CONNECTION_CLOSED;

completeFuture();
}

void completeFuture() {
if (future != null) future.complete(null);
if (future != null) future.complete( null );
}

public void completeWithInterrupted() {
processState = ProcessState.INTERRUPTED;

completeFuture();
}

public void completeWithFail(Throwable throwable) {
this.throwable = throwable;
this.processState = ProcessState.EXCEPTION;

completeFuture();
}

public void completeWithRejected() {
processState = ProcessState.REJECTED;

completeFuture();
}

Expand Down Expand Up @@ -189,9 +183,9 @@ public long getTimeLeft(double percent) {

public long getTimeLeft() {
long now = System.nanoTime();
long duration = (now - startTimeNano) / 1000000;
long durationInMillis = (now - startTimeNano) / 1_000_000;

return timeout - duration;
return timeout - durationInMillis;
}

public void runTasks(@Nullable PnioRequestHandler.Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ public void run() {
PnioExchange<WorkflowState> pnioExchange = queue.take();
try {
pnioExchange.runTasks( COMPUTE );

pnioExchange.completeFuture();
} catch( Throwable e ) {
pnioExchange.completeWithFail( e );
}
} catch( InterruptedException e ) {
Thread.currentThread().interrupt();
done = true;
interrupt();
break;
}
}
Expand Down
3 changes: 0 additions & 3 deletions oap-pnio/src/main/java/oap/http/pnio/RequestWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,11 @@ private RequestWorkflow( Node<WorkflowState> root ) {

public <T> List<T> map( Function<PnioRequestHandler<WorkflowState>, T> mapFunc ) {
var ret = new ArrayList<T>();

var current = root;

while( current != null ) {
ret.add( mapFunc.apply( current.handler ) );
current = current.next;
}

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void send( Message p ) {
try {
guaranteedDeliveryTransport.send( p, transport );
} catch( Exception e ) {
log.error( "Unexpected exception", e );
log.error( "Cannot send a message: ", p, e );
}
}
}
1 change: 1 addition & 0 deletions oap-stdlib/src/main/java/oap/alert/MessageStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package oap.alert;

@FunctionalInterface
public interface MessageStream<Message> extends MessageTransport<Message> {

}
9 changes: 5 additions & 4 deletions oap-stdlib/src/main/java/oap/application/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

@ToString
public class Configuration<T> {
public static final String META_INF = "META-INF/";
private final Class<T> clazz;
private final String name;

Expand All @@ -52,10 +53,10 @@ public List<T> fromClassPath() {

public List<URL> urlsFromClassPath() {
var ret = new ArrayList<URL>();
ret.addAll( Resources.urls( "META-INF/" + name + ".json" ) );
ret.addAll( Resources.urls( "META-INF/" + name + ".conf" ) );
ret.addAll( Resources.urls( "META-INF/" + name + ".yaml" ) );
ret.addAll( Resources.urls( "META-INF/" + name + ".yml" ) );
ret.addAll( Resources.urls( META_INF + name + ".json" ) );
ret.addAll( Resources.urls( META_INF + name + ".conf" ) );
ret.addAll( Resources.urls( META_INF + name + ".yaml" ) );
ret.addAll( Resources.urls( META_INF + name + ".yml" ) );

return ret;
}
Expand Down
Loading

0 comments on commit 8b63e3f

Please sign in to comment.