Skip to content

Commit

Permalink
Merge branch 'apache-3.2' into apache-3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Oct 11, 2023
2 parents c72c55a + a4ca070 commit 8bdcaf6
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 94 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/build-and-test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ env:
VERSIONS_LIMIT: 4
JACOCO_ENABLE: true
CANDIDATE_VERSIONS: '
spring.version:4.3.30.RELEASE;
spring-boot.version:1.5.22.RELEASE;
spring-boot.version:2.4.1;
spring.version:5.3.24;
spring-boot.version:2.7.6;
'

jobs:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.reactive;

import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.mockito.Mockito;

import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

public class CreateObserverAdapter {

private ServerCallToObserverAdapter<String> responseObserver;
private AtomicInteger nextCounter;
private AtomicInteger completeCounter;
private AtomicInteger errorCounter;

CreateObserverAdapter() {

nextCounter = new AtomicInteger();
completeCounter = new AtomicInteger();
errorCounter = new AtomicInteger();

responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));

}

public AtomicInteger getCompleteCounter() {
return completeCounter;
}

public AtomicInteger getNextCounter() {
return nextCounter;
}

public AtomicInteger getErrorCounter() {
return errorCounter;
}

public ServerCallToObserverAdapter<String> getResponseObserver() {
return this.responseObserver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,31 @@

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.handler.ManyToManyMethodHandler;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for ManyToManyMethodHandler
*/
public final class ManyToManyMethodHandlerTest {

@Test
void testInvoke() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
CreateObserverAdapter creator = new CreateObserverAdapter();

ManyToManyMethodHandler<String, String> handler = new ManyToManyMethodHandler<>(requestFlux ->
requestFlux.map(r -> r + "0"));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{creator.getResponseObserver()});
StreamObserver<String> requestObserver = future.get();
for (int i = 0; i < 10; i++) {
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(10, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(10, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,72 +22,54 @@
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for ManyToOneMethodHandler
*/
public final class ManyToOneMethodHandlerTest {

@Test
void testInvoker() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
private StreamObserver<String> requestObserver;
private CreateObserverAdapter creator;

@BeforeEach
void init() throws ExecutionException, InterruptedException {
creator = new CreateObserverAdapter();
ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestFlux ->
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
StreamObserver<String> requestObserver = future.get();
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{creator.getResponseObserver()});
requestObserver = future.get();
}

@Test
void testInvoker() {
for (int i = 0; i < 10; i++) {
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(1, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(1, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}

@Test
void testError() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestFlux ->
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
StreamObserver<String> requestObserver = future.get();
void testError() {
for (int i = 0; i < 10; i++) {
if (i == 6) {
requestObserver.onError(new Throwable());
}
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(0, nextCounter.get());
Assertions.assertEquals(1, errorCounter.get());
Assertions.assertEquals(0, completeCounter.get());
Assertions.assertEquals(0, creator.getNextCounter().get());
Assertions.assertEquals(1, creator.getErrorCounter().get());
Assertions.assertEquals(0, creator.getCompleteCounter().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,42 @@
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for OneToManyMethodHandler
*/
public final class OneToManyMethodHandlerTest {

private CreateObserverAdapter creator;

@BeforeEach
void init() {
creator = new CreateObserverAdapter();
}

@Test
void testInvoke() {
String request = "1,2,3,4,5,6,7";
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestMono ->
requestMono.flatMapMany(r -> Flux.fromArray(r.split(","))));
CompletableFuture<?> future = handler.invoke(new Object[]{request, responseObserver});
CompletableFuture<?> future = handler.invoke(new Object[]{request, creator.getResponseObserver()});
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(7, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(7, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}

@Test
void testError() {
String request = "1,2,3,4,5,6,7";
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestMono ->
Flux.create(emitter -> {
for (int i = 0; i < 10; i++) {
Expand All @@ -82,10 +67,10 @@ void testError() {
}
}
}));
CompletableFuture<?> future = handler.invoke(new Object[]{request, responseObserver});
CompletableFuture<?> future = handler.invoke(new Object[]{request, creator.getResponseObserver()});
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(6, nextCounter.get());
Assertions.assertEquals(1, errorCounter.get());
Assertions.assertEquals(0, completeCounter.get());
Assertions.assertEquals(6, creator.getNextCounter().get());
Assertions.assertEquals(1, creator.getErrorCounter().get());
Assertions.assertEquals(0, creator.getCompleteCounter().get());
}
}

0 comments on commit 8bdcaf6

Please sign in to comment.