Skip to content

Commit

Permalink
perf: optimize s3link interfaces using index mechanisms (#127)
Browse files Browse the repository at this point in the history
* perf: optimize s3link list queries using index mechanisms
  • Loading branch information
longjuan authored Apr 3, 2024
1 parent c5d4e71 commit f4ec56b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 58 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repositories {
}

dependencies {
implementation platform('run.halo.tools.platform:plugin:2.12.0-SNAPSHOT')
implementation platform('run.halo.tools.platform:plugin:2.13.0-SNAPSHOT')
compileOnly 'run.halo.app:api'

implementation platform('software.amazon.awssdk:bom:2.19.8')
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/run/halo/s3os/LinkRequest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package run.halo.s3os;

import java.util.Set;
import lombok.Data;
import lombok.RequiredArgsConstructor;

import java.util.List;

@Data
@RequiredArgsConstructor
public class LinkRequest {
private String policyName;
private List<String> objectKeys;
private Set<String> objectKeys;
}
55 changes: 8 additions & 47 deletions src/main/java/run/halo/s3os/S3LinkController.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
package run.halo.s3os;

import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.attachment.Attachment;
import run.halo.app.core.extension.attachment.Policy;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.plugin.ApiVersion;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@ApiVersion("s3os.halo.run/v1alpha1")
@RestController
@RequiredArgsConstructor
public class S3LinkController {
private final S3LinkService s3LinkService;
private final ReactiveExtensionClient client;

/**
* Map of linking file, used as a lock, key is policyName/objectKey, value is policyName/objectKey.
*/
private final Map<String, Object> linkingFile = new ConcurrentHashMap<>();

@GetMapping("/policies/s3")
public Flux<Policy> listS3Policies() {
Expand All @@ -48,38 +40,7 @@ public Mono<S3ListResult> listObjects(@PathVariable(value = "policyName") String

@PostMapping("/attachments/link")
public Mono<LinkResult> addAttachmentRecord(@RequestBody LinkRequest linkRequest) {
return Flux.fromIterable(linkRequest.getObjectKeys())
.filter(objectKey -> linkingFile.put(linkRequest.getPolicyName() + "/" + objectKey,
linkRequest.getPolicyName() + "/" + objectKey) == null)
.collectList()
.flatMap(operableObjectKeys -> client.list(Attachment.class,
attachment -> Objects.equals(attachment.getSpec().getPolicyName(),
linkRequest.getPolicyName())
&& StringUtils.isNotEmpty(attachment.getMetadata().getAnnotations()
.get(S3OsAttachmentHandler.OBJECT_KEY))
&& linkRequest.getObjectKeys().contains(attachment.getMetadata()
.getAnnotations().get(S3OsAttachmentHandler.OBJECT_KEY)),
null)
.collectList()
.flatMap(existingAttachments -> Flux.fromIterable(linkRequest.getObjectKeys())
.flatMap((objectKey) -> {
if (operableObjectKeys.contains(objectKey) && existingAttachments.stream()
.noneMatch(attachment -> Objects.equals(
attachment.getMetadata().getAnnotations().get(
S3OsAttachmentHandler.OBJECT_KEY), objectKey))) {
return s3LinkService
.addAttachmentRecord(linkRequest.getPolicyName(), objectKey)
.onErrorResume((throwable) -> Mono.just(
new LinkResult.LinkResultItem(objectKey, false,
throwable.getMessage())));
} else {
return Mono.just(new LinkResult.LinkResultItem(objectKey, false,
"附件库中已存在该对象"));
}
})
.doOnNext(linkResultItem -> linkingFile.remove(
linkRequest.getPolicyName() + "/" + linkResultItem.getObjectKey()))
.collectList()
.map(LinkResult::new)));
return s3LinkService.addAttachmentRecords(linkRequest.getPolicyName(),
linkRequest.getObjectKeys());
}
}
3 changes: 2 additions & 1 deletion src/main/java/run/halo/s3os/S3LinkService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package run.halo.s3os;


import java.util.Set;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.core.extension.attachment.Policy;
Expand All @@ -11,7 +12,7 @@ public interface S3LinkService {
Mono<S3ListResult> listObjects(String policyName, String continuationToken,
Integer pageSize);

Mono<LinkResult.LinkResultItem> addAttachmentRecord(String policyName, String objectKey);
Mono<LinkResult> addAttachmentRecords(String policyName, Set<String> objectKeys);

Mono<S3ListResult> listObjectsUnlinked(String policyName, String continuationToken,
String continuationObject, Integer pageSize);
Expand Down
74 changes: 69 additions & 5 deletions src/main/java/run/halo/s3os/S3LinkServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -23,7 +26,11 @@
import run.halo.app.core.extension.attachment.Attachment;
import run.halo.app.core.extension.attachment.Policy;
import run.halo.app.extension.ConfigMap;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.MetadataUtil;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.index.query.QueryFactory;
import run.halo.app.extension.router.selector.FieldSelector;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
Expand All @@ -37,6 +44,11 @@ public class S3LinkServiceImpl implements S3LinkService {
private final ReactiveExtensionClient client;
private final S3OsAttachmentHandler handler;

/**
* Map of linking file, used as a lock, key is policyName/objectKey, value is policyName/objectKey.
*/
private final Map<String, Object> linkingFile = new ConcurrentHashMap<>();


@Override
public Flux<Policy> listS3Policies() {
Expand Down Expand Up @@ -73,9 +85,10 @@ public Mono<S3ListResult> listObjects(String policyName, String continuationToke
.stream().map(S3ListResult.ObjectVo::fromS3Object)
.filter(objectVo -> !objectVo.getKey().endsWith("/"))
.collect(Collectors.toMap(S3ListResult.ObjectVo::getKey, o -> o));
return client.list(Attachment.class,
attachment -> policyName.equals(
attachment.getSpec().getPolicyName()), null)
ListOptions listOptions = new ListOptions();
listOptions.setFieldSelector(
FieldSelector.of(QueryFactory.equal("spec.policyName", policyName)));
return client.listAll(Attachment.class, listOptions, null)
.doOnNext(attachment -> {
S3ListResult.ObjectVo objectVo =
objectVos.get(attachment.getMetadata().getAnnotations()
Expand All @@ -95,6 +108,59 @@ public Mono<S3ListResult> listObjects(String policyName, String continuationToke
.onErrorMap(S3ExceptionHandler::map);
}

@Override
public Mono<LinkResult> addAttachmentRecords(String policyName, Set<String> objectKeys) {
return getOperableObjectKeys(objectKeys, policyName)
.flatMap(operableObjectKeys -> getExistingAttachments(objectKeys, policyName)
.flatMap(existingAttachments -> getLinkResultItems(objectKeys, operableObjectKeys,
existingAttachments, policyName)
.collectList()
.map(LinkResult::new)));
}

private Mono<Set<String>> getOperableObjectKeys(Set<String> objectKeys, String policyName) {
return Flux.fromIterable(objectKeys)
.filter(objectKey ->
linkingFile.put(policyName + "/" + objectKey, policyName + "/" + objectKey) == null)
.collect(Collectors.toSet());
}

private Mono<Set<String>> getExistingAttachments(Set<String> objectKeys,
String policyName) {
ListOptions listOptions = new ListOptions();
listOptions.setFieldSelector(
FieldSelector.of(QueryFactory.equal("spec.policyName", policyName)));
return client.listAll(Attachment.class, listOptions, null)
.filter(attachment -> StringUtils.isNotBlank(
MetadataUtil.nullSafeAnnotations(attachment).get(S3OsAttachmentHandler.OBJECT_KEY))
&& objectKeys.contains(
MetadataUtil.nullSafeAnnotations(attachment).get(S3OsAttachmentHandler.OBJECT_KEY)))
.map(attachment -> MetadataUtil.nullSafeAnnotations(attachment)
.get(S3OsAttachmentHandler.OBJECT_KEY))
.collect(Collectors.toSet());
}

private Flux<LinkResult.LinkResultItem> getLinkResultItems(Set<String> objectKeys,
Set<String> operableObjectKeys,
Set<String> existingAttachments,
String policyName) {
return Flux.fromIterable(objectKeys)
.flatMap((objectKey) -> {
if (operableObjectKeys.contains(objectKey) &&
!existingAttachments.contains(objectKey)) {
return addAttachmentRecord(policyName, objectKey)
.onErrorResume((throwable) -> Mono.just(
new LinkResult.LinkResultItem(objectKey, false,
throwable.getMessage())));
} else {
return Mono.just(
new LinkResult.LinkResultItem(objectKey, false, "附件库中已存在该对象"));
}
})
.doFinally(signalType -> operableObjectKeys.forEach(
objectKey -> linkingFile.remove(policyName + "/" + objectKey)));
}

@Override
public Mono<S3ListResult> listObjectsUnlinked(String policyName, String continuationToken,
String continuationObject, Integer pageSize) {
Expand Down Expand Up @@ -151,8 +217,6 @@ public Mono<S3ListResult> listObjectsUnlinked(String policyName, String continua
record TokenState(String currToken, String nextToken) {
}


@Override
public Mono<LinkResult.LinkResultItem> addAttachmentRecord(String policyName,
String objectKey) {
return authenticationConsumer(authentication -> client.fetch(Policy.class, policyName)
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: PluginS3ObjectStorage
spec:
enabled: true
requires: ">=2.12.0"
requires: ">=2.13.0"
author:
name: Halo OSS Team
website: https://github.com/halo-dev
Expand Down

0 comments on commit f4ec56b

Please sign in to comment.