Skip to content

Commit

Permalink
feat(WebFlux): integrate and configure WebFlux
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnymillergh committed Oct 19, 2020
1 parent 03a27a4 commit 4b14363
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 11 deletions.
8 changes: 4 additions & 4 deletions media-streaming-sample-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
<description>Media Streaming :: Sample App</description>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.jmframework.boot</groupId>
<artifactId>media-streaming-spring-boot-starter</artifactId>
Expand Down
30 changes: 29 additions & 1 deletion media-streaming-spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,41 @@
</parent>
<artifactId>media-streaming-spring-boot-autoconfigure</artifactId>
<name>Media Streaming :: Spring Boot Autoconfigure</name>
<description>Demo project for Spring Boot</description>
<description>Media Streaming :: Spring Boot Autoconfigure</description>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.drewnoakes</groupId>
<artifactId>metadata-extractor</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;

/**
* Description: MediaStreamingAutoConfiguration, change description here.
*
* @author 钟俊, email: [email protected]
* date 10/19/2020 2:51 PM
**/
@Configuration
@EnableWebFlux
@EnableConfigurationProperties(MediaStreamingProperties.class)
public class MediaStreamingAutoConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* Description: MediaStreamingProperties, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected]
* date 10/19/2020 3:28 PM
**/
@Data
@ConfigurationProperties(prefix = MediaStreamingProperties.PREFIX)
public class MediaStreamingProperties {
public static final String PREFIX = "media-streaming";
/**
* Initial Cache Capacity
*/
private int initialCacheCapacity = 4096;
/**
* Maximum Cache Capacity
*/
private int maximumCacheCapacity = 8192;
/**
* Expiration Time
*/
private int expirationTime = 60 * 10;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration;

import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.ResourceRegionEncoder;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.support.ResourceRegion;
import org.springframework.http.*;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.lang.Math.min;

/**
* {@code HttpMessageWriter} that can write a {@link ResourceRegion}.
* <p>
* This class was directly converted from the given {@link org.springframework.http.codec.ResourceHttpMessageWriter}
* and adapted to write multiple resource regions in a single header
*
* @author Johnny Miller (锺俊), email: [email protected]
* date 10/19/2020 2:32 PM
* @see
* <a href='https://github.com/Recks11/Webflux-Streaming-Service/blob/master/src/main/java/com/rexijie/webflixstreamingservice/config/ResourceRegionMessageWriter.java'>Source code by Rex Ijiekhuamen</a>
**/
@Slf4j
public class ResourceRegionMessageWriter implements HttpMessageWriter<ResourceRegion> {
private static final ResolvableType REGION_TYPE = ResolvableType.forClass(ResourceRegion.class);
private final ResourceRegionEncoder resourceRegionEncoder;
private final List<MediaType> mediaTypes;

public ResourceRegionMessageWriter() {
this.resourceRegionEncoder = new ResourceRegionEncoder();
this.mediaTypes = MediaType.asMediaTypes(resourceRegionEncoder.getEncodableMimeTypes());
log.info("Media types registered: {}", mediaTypes);
}

private static MediaType getResourceMediaType(MediaType mediaType, Resource resource) {
if (mediaType != null && mediaType.isConcrete() && mediaType != MediaType.APPLICATION_OCTET_STREAM) {
return mediaType;
} else {
return MediaTypeFactory.getMediaType(resource).orElse(MediaType.APPLICATION_OCTET_STREAM);
}
}

private static Optional<Mono<Void>> zeroCopy(Resource resource, ResourceRegion resourceRegion,
ReactiveHttpOutputMessage message) {
if (message instanceof ZeroCopyHttpOutputMessage && resource.isFile()) {
try {
File file = resource.getFile();
long position = resourceRegion.getPosition();
long count = resourceRegion.getCount();
return Optional.of(((ZeroCopyHttpOutputMessage) message).writeWith(file, position, count));
} catch (IOException e) {
// Ignored
}
}
return Optional.empty();
}

private static long lengthOf(ResourceRegion resourceRegion) {
if (resourceRegion.getResource().getClass() != InputStreamResource.class) {
try {
return resourceRegion.getResource().contentLength();
} catch (Exception e) {
// Ignore Exception
}
}
return -1;
}

@Override
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
return resourceRegionEncoder.canEncode(elementType, mediaType);
}

@Override
public List<MediaType> getWritableMediaTypes() {
return this.mediaTypes;
}

/**
* Write an given stream of object to the output message.
*
* @param inputStream the objects to write
* @param elementType the type of objects in the stream which must have been
* previously checked via {@link #canWrite(ResolvableType, MediaType)}
* @param mediaType the content type for the write (possibly {@code null} to
* indicate that the default content type of the writer must be used)
* @param message the message to write to
* @param hints additional information about how to encode and write
* @return indicates completion or error
*/
@Override
public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream,
ResolvableType elementType, MediaType mediaType,
ReactiveHttpOutputMessage message,
Map<String, Object> hints) {

return Mono.from(inputStream)
.flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
}

private Mono<Void> writeResource(ResourceRegion resourceRegion,
ResolvableType elementType, MediaType mediaType,
ReactiveHttpOutputMessage message,
Map<String, Object> hints) {

HttpHeaders headers = message.getHeaders();

Mono<MediaType> mediaTypeMono = Mono.fromCallable(() -> getResourceMediaType(mediaType,
resourceRegion.getResource()));

Mono<Long> headersMono = Mono.fromCallable(() -> lengthOf(resourceRegion))
.filter(length -> length > -1)
.doOnNext(headers::setContentLength);


return headersMono.zipWith(mediaTypeMono)
.map((Tuple2<Long, MediaType> objects) -> {
MediaType resourceType = objects.getT2();
headers.setContentType(resourceType);
return resourceType;
})
.flatMap(resourceType ->
zeroCopy(resourceRegion.getResource(), resourceRegion, message)
.orElseGet(() -> {

Mono<ResourceRegion> input = Mono.just(resourceRegion);
DataBufferFactory bufferFactory = message.bufferFactory();
Flux<DataBuffer> body =
this.resourceRegionEncoder.encode(input, bufferFactory, elementType,
resourceType, hints);
return message.writeWith(body);
}));
}

/**
* Server-side only alternative to
* {@link #write(Publisher, ResolvableType, MediaType, ReactiveHttpOutputMessage, Map)}
* with additional context available.
*
* @param actualType the actual return type of the method that returned the
* value; for annotated controllers, the {@link MethodParameter} can be
* accessed via {@link ResolvableType#getSource()}.
* @param elementType the type of Objects in the input stream
* @param mediaType the content type to use (possibly {@code null} indicating
* the default content type of the writer should be used)
* @param request the current request
* @param response the current response
* @return a {@link Mono} that indicates completion of writing or error
*/
@Override
public Mono<Void> write(Publisher<? extends ResourceRegion> inputStream,
ResolvableType actualType, ResolvableType elementType,
MediaType mediaType, ServerHttpRequest request,
ServerHttpResponse response, Map<String, Object> hints) {

HttpHeaders headers = response.getHeaders();
headers.set(HttpHeaders.ACCEPT_RANGES, "bytes");

Mono<ResourceRegion> inputStreamMono = Mono.from(inputStream);

return inputStreamMono.flatMap(resourceRegion -> {
long contentLength = lengthOf(resourceRegion);
long startPosition = resourceRegion.getPosition(); //Where zero copy starts
long endPosition = min(startPosition + resourceRegion.getCount() - 1, contentLength - 1); //where zero
// copy ends relative to start position

MediaType resourceMediaType = getResourceMediaType(mediaType, resourceRegion.getResource());


headers.setContentType(resourceMediaType);
headers.add(HttpHeaders.CONTENT_RANGE,
"bytes " + startPosition + '-' + endPosition + '/' + contentLength);
headers.setContentLength(endPosition - startPosition + 1);

response.setStatusCode(HttpStatus.PARTIAL_CONTENT);
return writeEncodedRegion(resourceRegion, response, resourceMediaType, hints);
});
}

private Mono<Void> writeEncodedRegion(ResourceRegion region, ReactiveHttpOutputMessage message,
MediaType mediaType, Map<String, Object> hints) {
return zeroCopy(region.getResource(), region, message)
.orElseGet(() -> {
Mono<ResourceRegion> input = Mono.just(region);
Flux<DataBuffer> body = this.resourceRegionEncoder
.encode(input, message.bufferFactory(), REGION_TYPE, mediaType, hints);
return message.writeWith(body);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration;

import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.config.WebFluxConfigurer;

/**
* Description: Test, change description here.
*
* @author Johnny Miller (锺俊), email: [email protected]
* date 10/19/2020 2:33 PM
**/
@Configuration
public class WebFluxConfiguration implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
// for resource region
configurer.customCodecs().register(new ResourceRegionMessageWriter());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration.MediaStreamingAutoConfiguration,\
com.jmframework.boot.mediastreamingspringbootautoconfigure.configuration.WebFluxConfiguration
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@

<!-- Global Dependencies -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down

0 comments on commit 4b14363

Please sign in to comment.