Skip to content

Commit

Permalink
Offloader: add API to scan objects on Tiered Storage: Move to JAX-RS …
Browse files Browse the repository at this point in the history
…StreamingOutput

(cherry picked from commit 2d1f85f)
  • Loading branch information
eolivelli authored and nicoloboschi committed Mar 31, 2022
1 parent 549fa14 commit 55add94
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.Map;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.Map;

@Builder
@Getter
@ToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -42,12 +40,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -57,9 +53,6 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -2774,7 +2767,8 @@ protected void internalSetNamespaceResourceGroup(String rgName) {
internalSetPolicies("resource_group_name", rgName);
}

protected Map<String, Object> internalScanOffloadedLedgers() throws Exception {
protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.ScannerResultSink sink)
throws Exception {
log.info("internalScanOffloadedLedgers {}", namespaceName);
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);

Expand All @@ -2783,101 +2777,10 @@ protected Map<String, Object> internalScanOffloadedLedgers() throws Exception {
.getManagedLedgerOffloader(namespaceName, (OffloadPoliciesImpl) policies.offload_policies);

String localClusterName = pulsar().getConfiguration().getClusterName();
Map<String, Object> topLevelResult = new HashMap<>();
List<Map<String, Object>> objects = new ArrayList<>();
topLevelResult.put("objects", objects);
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger totalErrors = new AtomicInteger();
AtomicInteger totalUnknown = new AtomicInteger();
managedLedgerOffloader.scanLedgers((md -> {
log.info("Found ledger {}", md);
Map<String, Object> objectInfo = new HashMap<>();
objectInfo.put("ledger", md.getLedgerId());
objectInfo.put("name", md.getName());
objectInfo.put("uri", md.getUri());
objectInfo.put("uuid", md.getUuid());
objectInfo.put("size", md.getSize());
objectInfo.put("lastModified", md.getLastModified());
objectInfo.put("userMetadata", md.getUserMetadata());

String status = "UNKNOWN";

if (md.getUserMetadata() != null) {
// non case sensistive
TreeMap<String, String> userMetadata = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
userMetadata.putAll(md.getUserMetadata());
String clusterName = userMetadata.get(LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME);
if (localClusterName.equals(clusterName)) {
String managedLedgerName = userMetadata.get("managedledgername");
if (managedLedgerName != null) {
objectInfo.put("managedLedgerName", managedLedgerName);
try {
status = checkLedgerShouldBeOnTieredStorage(md.getLedgerId(), md.getUuid(),
managedLedgerName, objectInfo, pulsar().getManagedLedgerFactory());
} catch (InterruptedException err) {
Thread.currentThread().interrupt();
throw new RuntimeException(err);
} catch (ManagedLedgerException err) {
log.error("Error while checking managed ledger {}", managedLedgerName);
throw new RuntimeException(err);
}
}
}
}
totalCount.incrementAndGet();
objectInfo.put("status", status);
switch (status) {
case "OK":
break;
case "UNKNOWN":
totalUnknown.incrementAndGet();
break;
default:
totalErrors.incrementAndGet();
break;
}

objects.add(objectInfo);
return true;
}), managedLedgerOffloader.getOffloadDriverMetadata());
topLevelResult.put("errors", totalErrors.intValue());
topLevelResult.put("total", totalCount.intValue());
topLevelResult.put("unknownObjects", totalUnknown.intValue());
log.info("internalScanOffloadedLedgers {} scan finished");

return topLevelResult;
}
OffloaderObjectsScannerUtils.scanOffloadedLedgers(managedLedgerOffloader,
localClusterName, pulsar().getManagedLedgerFactory(), sink);

private static String checkLedgerShouldBeOnTieredStorage(long ledgerId, String uuid,
String managedLedgerName,
Map<String, Object> data,
ManagedLedgerFactory managedLedgerFactory)
throws InterruptedException, ManagedLedgerException {
try {
ManagedLedgerInfo managedLedgerInfo = managedLedgerFactory.getManagedLedgerInfo(managedLedgerName);
ManagedLedgerInfo.LedgerInfo ledgerInfo = managedLedgerInfo
.ledgers.stream().filter(l -> l.ledgerId == ledgerId).findAny().orElse(null);
if (ledgerInfo == null) {
log.info("Managed ledger {} does not contain ledger {}", managedLedgerName, ledgerId);
return "NOT-FOUND";
}
data.put("numEntries", ledgerInfo.entries);
data.put("offloaded", ledgerInfo.isOffloaded);
if (!ledgerInfo.isOffloaded) {
log.info("Ledger {} is not marked as OFFLOADED in {}", ledgerId, managedLedgerName);
return "NOT-OFFLOADED";
}
String uuidOnMetadata = ledgerInfo.offloadedContextUuid;
if (!Objects.equals(uuidOnMetadata, uuid)) {
log.info("Ledger {} uuid {} does not match name uuid {}", ledgerId, uuidOnMetadata, uuid);
return "BAD-UUID";
}
return "OK";
} catch (ManagedLedgerException.ManagedLedgerNotFoundException
| ManagedLedgerException.MetadataNotFoundException notFound) {
log.info("Managed ledger {} does not exist (maybe the topic has been deleted)", managedLedgerName);
return "NOT-FOUND";
}
}

private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;

@Slf4j
@UtilityClass
public class OffloaderObjectsScannerUtils {

private static final String STATUS_OK = "OK";
private static final String STATUS_UNKNOWN = "UNKNOWN";
private static final String STATUS_BAD_UUID = "BAD-UUID";
private static final String STATUS_NOT_FOUND = "NOT-FOUND";
private static final String STATUS_NOT_OFFLOADED = "NOT-OFFLOADED";

public interface ScannerResultSink {

void object(Map<String, Object> data) throws Exception;
void finished(int total, int errors, int unknown) throws Exception;
}

static void scanOffloadedLedgers(LedgerOffloader managedLedgerOffloader,
String localClusterName,
ManagedLedgerFactory managedLedgerFactory,
ScannerResultSink sink) throws Exception {
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger totalErrors = new AtomicInteger();
AtomicInteger totalUnknown = new AtomicInteger();
managedLedgerOffloader.scanLedgers((md -> {
log.info("Found ledger {}", md);
Map<String, Object> objectInfo = new HashMap<>();
objectInfo.put("ledger", md.getLedgerId());
objectInfo.put("name", md.getName());
objectInfo.put("uri", md.getUri());
objectInfo.put("uuid", md.getUuid());
objectInfo.put("size", md.getSize());
objectInfo.put("lastModified", md.getLastModified());
objectInfo.put("userMetadata", md.getUserMetadata());

String status = STATUS_UNKNOWN;

if (md.getUserMetadata() != null) {
// non case sensitive
TreeMap<String, String> userMetadata = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
userMetadata.putAll(md.getUserMetadata());
String clusterName = userMetadata.get(LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME);
if (localClusterName.equals(clusterName)) {
String managedLedgerName = userMetadata.get("managedledgername");
if (managedLedgerName != null) {
objectInfo.put("managedLedgerName", managedLedgerName);
try {
status = checkLedgerShouldBeOnTieredStorage(md.getLedgerId(), md.getUuid(),
managedLedgerName, objectInfo, managedLedgerFactory);
} catch (InterruptedException err) {
Thread.currentThread().interrupt();
throw new RuntimeException(err);
} catch (ManagedLedgerException err) {
log.error("Error while checking managed ledger {}", managedLedgerName);
throw new RuntimeException(err);
}
}
}
}
totalCount.incrementAndGet();
objectInfo.put("status", status);
switch (status) {
case STATUS_OK:
break;
case STATUS_UNKNOWN:
totalUnknown.incrementAndGet();
break;
default:
totalErrors.incrementAndGet();
break;
}

sink.object(objectInfo);
return true;
}), managedLedgerOffloader.getOffloadDriverMetadata());

sink.finished(totalCount.get(), totalErrors.get(), totalUnknown.get());
}

private static String checkLedgerShouldBeOnTieredStorage(long ledgerId, String uuid,
String managedLedgerName,
Map<String, Object> data,
ManagedLedgerFactory managedLedgerFactory)
throws InterruptedException, ManagedLedgerException {
try {
ManagedLedgerInfo managedLedgerInfo = managedLedgerFactory.getManagedLedgerInfo(managedLedgerName);
ManagedLedgerInfo.LedgerInfo ledgerInfo = managedLedgerInfo
.ledgers.stream().filter(l -> l.ledgerId == ledgerId).findAny().orElse(null);
if (ledgerInfo == null) {
log.info("Managed ledger {} does not contain ledger {}", managedLedgerName, ledgerId);
return STATUS_NOT_FOUND;
}
data.put("numEntries", ledgerInfo.entries);
data.put("offloaded", ledgerInfo.isOffloaded);
if (!ledgerInfo.isOffloaded) {
log.info("Ledger {} is not marked as OFFLOADED in {}", ledgerId, managedLedgerName);
return STATUS_NOT_OFFLOADED;
}
String uuidOnMetadata = ledgerInfo.offloadedContextUuid;
if (!Objects.equals(uuidOnMetadata, uuid)) {
log.info("Ledger {} uuid {} does not match name uuid {}", ledgerId, uuidOnMetadata, uuid);
return STATUS_BAD_UUID;
}
return "OK";
} catch (ManagedLedgerException.ManagedLedgerNotFoundException
| ManagedLedgerException.MetadataNotFoundException notFound) {
log.info("Managed ledger {} does not exist (maybe the topic has been deleted)", managedLedgerName);
return STATUS_NOT_FOUND;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -42,7 +44,9 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
Expand Down Expand Up @@ -70,6 +74,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1877,11 +1882,44 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
@ApiOperation(value = "Trigger the scan of offloaded Ledgers on the LedgerOffloader for the given namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public Map<String, Object> scanOffloadedLedgers(@PathParam("tenant") String tenant,
public Response scanOffloadedLedgers(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
try {
return internalScanOffloadedLedgers();
StreamingOutput output = (outputStream) -> {
try {
OutputStreamWriter out = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
out.append("{\"objects\":[\n");
internalScanOffloadedLedgers(new OffloaderObjectsScannerUtils.ScannerResultSink() {
boolean first = true;
@Override
public void object(Map<String, Object> data) throws Exception {
if (!first) {
out.write(',');
} else {
first = false;
}
String json = ObjectMapperFactory.getThreadLocal().writeValueAsString(data);
out.write(json);
}

@Override
public void finished(int total, int errors, int unknown) throws Exception {
out.append("],\n");
out.append("\"total\": " + total + ",\n");
out.append("\"errors\": " + errors + ",\n");
out.append("\"unknown\": " + unknown + "\n");
}
});
out.append("}");
out.flush();
outputStream.flush();
} catch (Exception err) {
log.error("error", err);
throw new RuntimeException(err);
}
};
return Response.ok(output).type(MediaType.APPLICATION_JSON_TYPE).build();
} catch (Throwable err) {
log.error("Error while scanning offloaded ledgers for namespace {}", namespaceName, err);
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ public void close() {
}

@Override
public void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map<String, String> offloadDriverMetadata) throws ManagedLedgerException {
public void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map<String,
String> offloadDriverMetadata) throws ManagedLedgerException {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String endpoint = bsKey.getEndpoint();
String readBucket = bsKey.getBucket();
Expand Down Expand Up @@ -669,7 +670,7 @@ private String scanContainer(OffloadedLedgerMetadataConsumer consumer, BlobStore
}
PageSet<? extends StorageMetadata> pages = readBlobstore.list(bucketName, options);
for (StorageMetadata md : pages) {
log.info("Found {} ",md);
log.info("Found {} ", md);
String name = md.getName();
Long size = md.getSize();
Date lastModified = md.getLastModified();
Expand Down

0 comments on commit 55add94

Please sign in to comment.