Skip to content

Commit

Permalink
change API
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Oct 26, 2023
1 parent 10e2180 commit 86b03e2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 68 deletions.
20 changes: 5 additions & 15 deletions bindings/java/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,20 @@ use jni::sys::{jboolean, jfloat, jlong};
use jni::JNIEnv;

use opendal::layers::RetryLayer;
use opendal::raw::{FusedAccessor, Layer};

pub(crate) enum NativeLayer {
Retry(RetryLayer),
}

impl NativeLayer {
pub(crate) fn into_inner(self) -> impl Layer<FusedAccessor> {
match self {
NativeLayer::Retry(layer) => layer,
}
}
}
use opendal::Operator;

#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_layer_RetryNativeLayerSpec_makeNativeLayer(
pub extern "system" fn Java_org_apache_opendal_layer_RetryNativeLayer_doLayer(
_: JNIEnv,
_: JClass,
op: *mut Operator,
jitter: jboolean,
factor: jfloat,
min_delay: jlong,
max_delay: jlong,
max_times: jlong,
) -> jlong {
let op = unsafe { Box::from_raw(op) };
let mut retry = RetryLayer::new();
retry = retry.with_factor(factor);
retry = retry.with_min_delay(Duration::from_nanos(min_delay as u64));
Expand All @@ -56,5 +46,5 @@ pub extern "system" fn Java_org_apache_opendal_layer_RetryNativeLayerSpec_makeNa
if max_times >= 0 {
retry = retry.with_max_times(max_times as usize);
}
Box::into_raw(Box::new(NativeLayer::Retry(retry))) as jlong
Box::into_raw(Box::new(op.layer(retry))) as jlong
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.opendal.layer.NativeLayerSpec;

/**
* BlockingOperator represents an underneath OpenDAL operator that
Expand All @@ -49,10 +48,10 @@ public static BlockingOperator of(String schema, Map<String, String> map) {
*
* @param schema the name of the underneath service to access data from.
* @param map a map of properties to construct the underneath operator.
* @param specs a list of native layer specs to construct layers onto the op.
* @param layers a list of native layer specs to construct layers onto the op.
*/
public static BlockingOperator of(String schema, Map<String, String> map, List<NativeLayerSpec> specs) {
try (final Operator operator = Operator.of(schema, map, specs)) {
public static BlockingOperator of(String schema, Map<String, String> map, List<NativeLayer> layers) {
try (final Operator operator = Operator.of(schema, map, layers)) {
return operator.blocking();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
* under the License.
*/

package org.apache.opendal.layer;
package org.apache.opendal;

public abstract class NativeLayerSpec {

/**
* This method is called from native code. It returns the pointer of the constructed native layer,
* which is immediately used (moved) to make the operator.
*/
@SuppressWarnings("unused")
protected abstract long makeNativeLayer();
public abstract class NativeLayer {
protected abstract long layer(long operatorNativeHandle);
}
23 changes: 15 additions & 8 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.opendal.layer.NativeLayerSpec;

/**
* Operator represents an underneath OpenDAL operator that
Expand Down Expand Up @@ -122,15 +121,23 @@ public static Operator of(String schema, Map<String, String> map) {
*
* @param schema the name of the underneath service to access data from.
* @param map a map of properties to construct the underneath operator.
* @param specs a list of native layer specs to construct layers onto the op.
* @param layers a list of native layer specs to construct layers onto the op.
*/
public static Operator of(String schema, Map<String, String> map, List<NativeLayerSpec> specs) {
final long nativeHandle = constructor(schema, map, specs.toArray(new NativeLayerSpec[0]));
final OperatorInfo info = makeOperatorInfo(nativeHandle);
return new Operator(nativeHandle, info);
public static Operator of(String schema, Map<String, String> map, List<NativeLayer> layers) {
final long nativeHandle = constructor(schema, map);
if (layers.isEmpty()) {
final OperatorInfo info = makeOperatorInfo(nativeHandle);
return new Operator(nativeHandle, info);
} else {
long op = nativeHandle;
for (NativeLayer layer : layers) {
op = layer.layer(op);
}
return new Operator(op, makeOperatorInfo(op));
}
}

Operator(long nativeHandle, OperatorInfo info) {
private Operator(long nativeHandle, OperatorInfo info) {
super(nativeHandle);
this.info = info;
}
Expand Down Expand Up @@ -234,7 +241,7 @@ public CompletableFuture<List<Entry>> list(String path) {

private static native long duplicate(long nativeHandle);

private static native long constructor(String schema, Map<String, String> map, NativeLayerSpec[] specs);
private static native long constructor(String schema, Map<String, String> map);

private static native long read(long nativeHandle, String path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.time.Duration;
import lombok.Builder;
import org.apache.opendal.NativeLayer;

@Builder
public class RetryNativeLayerSpec extends NativeLayerSpec {
public class RetryNativeLayer extends NativeLayer {

private final boolean jitter;

Expand All @@ -40,10 +41,10 @@ public class RetryNativeLayerSpec extends NativeLayerSpec {
private final long maxTimes = 3;

@Override
protected long makeNativeLayer() {
return makeNativeLayer(jitter, factor, minDelay.toNanos(), maxDelay.toNanos(), maxTimes);
protected long layer(long op) {
return doLayer(op, jitter, factor, minDelay.toNanos(), maxDelay.toNanos(), maxTimes);
}

private static native long makeNativeLayer(
boolean jitter, float factor, long minDelay, long maxDelay, long maxTimes);
private static native long doLayer(
long nativeHandle, boolean jitter, float factor, long minDelay, long maxDelay, long maxTimes);
}
26 changes: 3 additions & 23 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
use std::str::FromStr;
use std::time::Duration;

use jni::objects::JByteArray;
use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::JValue;
use jni::objects::JValueOwned;
use jni::objects::{JByteArray, JObjectArray};
use jni::sys::jsize;
use jni::sys::{jlong, jobject};
use jni::JNIEnv;
Expand All @@ -37,7 +37,6 @@ use crate::convert::jmap_to_hashmap;
use crate::convert::jstring_to_string;
use crate::get_current_env;
use crate::get_global_runtime;
use crate::layer::NativeLayer;
use crate::make_entry;
use crate::make_metadata;
use crate::make_operator_info;
Expand All @@ -50,40 +49,21 @@ pub extern "system" fn Java_org_apache_opendal_Operator_constructor(
_: JClass,
scheme: JString,
map: JObject,
specs: JObjectArray,
) -> jlong {
intern_constructor(&mut env, scheme, map, specs).unwrap_or_else(|e| {
intern_constructor(&mut env, scheme, map).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_constructor(
env: &mut JNIEnv,
scheme: JString,
map: JObject,
specs: JObjectArray,
) -> Result<jlong> {
fn intern_constructor(env: &mut JNIEnv, scheme: JString, map: JObject) -> Result<jlong> {
let scheme = Scheme::from_str(jstring_to_string(env, &scheme)?.as_str())?;
let map = jmap_to_hashmap(env, &map)?;

let mut op = Operator::via_map(scheme, map)?;

// auto add the blocking layer on demand
if !op.info().full_capability().blocking {
let _guard = unsafe { get_global_runtime() }.enter();
op = op.layer(BlockingLayer::create()?);
}

// add user-specified layers
let specs_len = env.get_array_length(&specs)?;
for i in 0..specs_len {
let spec = env.get_object_array_element(&specs, i)?;
let layer = env.call_method(&spec, "makeNativeLayer", "()J", &[])?.j()?;
let layer = unsafe { Box::from_raw(layer as *mut NativeLayer) };
op = op.layer(layer.into_inner());
}

Ok(Box::into_raw(Box::new(op)) as jlong)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import java.util.List;
import java.util.Map;
import org.apache.opendal.Operator;
import org.apache.opendal.layer.NativeLayerSpec;
import org.apache.opendal.layer.RetryNativeLayerSpec;
import org.apache.opendal.NativeLayer;
import org.apache.opendal.layer.RetryNativeLayer;
import org.junit.jupiter.api.Test;

public class NativeLayerTest {
@Test
void testOperatorWithRetryLayer() {
final Map<String, String> conf = new HashMap<>();
conf.put("root", "/opendal/");
final NativeLayerSpec retryLayerSpec = RetryNativeLayerSpec.builder().build();
final List<NativeLayerSpec> nativeLayerSpecs = Collections.singletonList(retryLayerSpec);
final NativeLayer retryLayerSpec = RetryNativeLayer.builder().build();
final List<NativeLayer> nativeLayerSpecs = Collections.singletonList(retryLayerSpec);
try (final Operator op = Operator.of("memory", conf, nativeLayerSpecs)) {
assertThat(op.info).isNotNull();
}
Expand Down

0 comments on commit 86b03e2

Please sign in to comment.