Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sahibamittal committed Oct 10, 2023
1 parent deaa045 commit f77c34e
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static org.hyades.kstreams.util.KafkaStreamsUtil.processorNameConsume;
import static org.hyades.kstreams.util.KafkaStreamsUtil.processorNameProduce;
import static org.hyades.util.PurlUtil.parsePurlCoordinates;

public class RepositoryMetaAnalyzerTopology {

Expand All @@ -41,13 +42,7 @@ public Topology topology(final RepositoryAnalyzerFactory analyzerFactory,
.withName(processorNameConsume(KafkaTopic.REPO_META_ANALYSIS_COMMAND)))
.filter((key, scanCommand) -> scanCommand.hasComponent() && isValidPurl(scanCommand.getComponent().getPurl()),
Named.as("filter_components_with_valid_purl"))
// Re-key to PURL coordinates WITHOUT VERSION. As we are fetching data for packages,
// but not specific package versions, including the version here would make our caching
// largely ineffective. We want events for the same package to be sent to the same partition.
//
// Because we can't enforce this format on the keys of the input topic without causing
// serialization exceptions, we're left with this mandatory key change.
.selectKey((key, command) -> mustParsePurlCoordinatesWithoutVersion(command.getComponent().getPurl()),
.selectKey((key, command) -> parsePurlCoordinates(command.getComponent().getPurl()),
Named.as("re-key_to_purl_coordinates"))
// Force a repartition to ensure that the ordering guarantees we want, based on the
// previous re-keying operation, are effective.
Expand Down Expand Up @@ -84,18 +79,4 @@ private boolean isValidPurl(final String purl) {
return false;
}
}

private PackageURL mustParsePurlCoordinatesWithoutVersion(final String purl) {
try {
final var parsedPurl = new PackageURL(purl);
return new PackageURL(parsedPurl.getType(), parsedPurl.getNamespace(),
parsedPurl.getName(), null, null, null);
} catch (MalformedPackageURLException e) {
throw new IllegalStateException("""
The provided PURL is invalid, even though it should have been
validated in a previous processing step
""", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.TreeMap;

import static org.hyades.util.PurlUtil.parsePurlCoordinates;

class MetaAnalyzerProcessor extends ContextualFixedKeyProcessor<PackageURL, AnalysisCommand, AnalysisResult> {

Expand All @@ -47,13 +50,13 @@ class MetaAnalyzerProcessor extends ContextualFixedKeyProcessor<PackageURL, Anal

@Override
public void process(final FixedKeyRecord<PackageURL, AnalysisCommand> record) {
final PackageURL purlWithoutVersion = record.key();
final PackageURL purlKey = record.key();
final Component component = record.value().getComponent();

// NOTE: Do not use purlWithoutVersion for the analysis!
// It only contains the type, namespace and name, but is missing the
// version and other qualifiers. Some analyzers require the version.
final PackageURL purl = mustParsePurl(component.getPurl());
final PackageURL purl = parsePurlCoordinates(component.getPurl());

final Optional<IMetaAnalyzer> optionalAnalyzer = analyzerFactory.createAnalyzer(purl);
if (optionalAnalyzer.isEmpty()) {
Expand All @@ -78,110 +81,69 @@ public void process(final FixedKeyRecord<PackageURL, AnalysisCommand> record) {
continue;
}

// NOTE: The cache key currently does not take the PURL version into consideration.
// At the time of writing this, the meta analysis result is not version-specific.
// When that circumstance changes, the cache key must also include the PURL version.
final var cacheKey = new MetaAnalyzerCacheKey(analyzer.getName(), purlWithoutVersion.canonicalize(), repository.getUrl());

// Populate results from cache if possible.
final var cachedResult = getCachedResult(cacheKey);
if (cachedResult.isPresent()) {
LOGGER.debug("Cache hit (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
context().forward(record
.withValue(cachedResult.get())
.withTimestamp(context().currentSystemTimeMs()));
return;
} else {
LOGGER.debug("Cache miss (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
AnalysisResult.Builder resultBuilder = AnalysisResult.newBuilder()
.setComponent(component)
.setRepository(repository.getIdentifier());

// NOTE: The cache key currently does not take the PURL version into consideration if only latest version is to be fetched.
// Because the latest version analysis result is not version-specific.
// But if integrity meta is required, the cache key must also include the PURL version and qualifiers.
if (record.value().getFetchLatestVersion()) {
var cacheKeyWithoutVersion = new MetaAnalyzerCacheKey(analyzer.getName(), filterPurlCoordinates(purlKey, false).canonicalize(), repository.getUrl());
var cachedResult = getCachedResult(cacheKeyWithoutVersion);
if (cachedResult.isPresent()) {
LOGGER.debug("Cache hit for latest version (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
resultBuilder.setLatestVersion(cachedResult.get().getLatestVersion());
resultBuilder.setPublished(cachedResult.get().getPublished());
} else {
LOGGER.debug("Cache miss for latest version (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
final var repoMeta = fetchRepoMeta(analyzer, repository, record.value());
if (repoMeta != null) {
Optional.ofNullable(repoMeta.getLatestVersion()).ifPresent(
version -> resultBuilder.setLatestVersion(version));
Optional.ofNullable(repoMeta.getPublishedTimestamp()).ifPresent(
version -> resultBuilder.setPublished(Timestamp.newBuilder()
.setSeconds(repoMeta.getPublishedTimestamp().getTime() / 1000)));
cacheResult(cacheKeyWithoutVersion, resultBuilder.build());
}
}
}

final Optional<AnalysisResult> optionalResult = analyze(analyzer, repository, record.value());
if (optionalResult.isPresent()) {
context().forward(record
.withValue(optionalResult.get())
.withTimestamp(context().currentSystemTimeMs()));
cacheResult(cacheKey, optionalResult.get());
return;
if (record.value().getFetchIntegrityData()) {
var cacheKeyWithVersion = new MetaAnalyzerCacheKey(analyzer.getName(), filterPurlCoordinates(purlKey, true).canonicalize(), repository.getUrl());
var cachedResult = getCachedResult(cacheKeyWithVersion);
if (cachedResult.isPresent()) {
LOGGER.debug("Cache hit for integrity meta (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
resultBuilder.setIntegrityMeta(cachedResult.get().getIntegrityMeta());
} else {
LOGGER.debug("Cache miss for integrity meta (analyzer: {}, purl: {}, repository: {})", analyzer.getName(), purl, repository.getIdentifier());
var integrityMeta = fetchIntegrityMeta(analyzer, repository, record.value());
if (integrityMeta != null) {
var metaBuilder = IntegrityMeta.newBuilder();
Optional.ofNullable(integrityMeta.getMd5()).ifPresent(hash -> metaBuilder.setMd5(hash));
Optional.ofNullable(integrityMeta.getSha1()).ifPresent(hash -> metaBuilder.setSha1(hash));
Optional.ofNullable(integrityMeta.getSha256()).ifPresent(hash -> metaBuilder.setSha256(hash));
Optional.ofNullable(integrityMeta.getSha512()).ifPresent(hash -> metaBuilder.setSha512(hash));
Optional.ofNullable(integrityMeta.getMetaSourceUrl()).ifPresent(url -> metaBuilder.setMetaSourceUrl(url));
Optional.ofNullable(integrityMeta.getCurrentVersionLastModified()).ifPresent(date ->
metaBuilder.setCurrentVersionLastModified(Timestamp.newBuilder()
.setSeconds(date.getTime() / 1000)));
resultBuilder.setIntegrityMeta(metaBuilder);
cacheResult(cacheKeyWithVersion, resultBuilder.build());
}
}
}
}

context().forward(record
.withValue(resultBuilder.build())
.withTimestamp(context().currentSystemTimeMs()));
return;
}
// Produce "empty" result in case no repository did yield a satisfactory result.
context().forward(record
.withValue(AnalysisResult.newBuilder().setComponent(component).build())
.withTimestamp(context().currentSystemTimeMs()));
}

private Optional<AnalysisResult> analyze(final IMetaAnalyzer analyzer, final Repository repository, final AnalysisCommand analysisCommand) {
analyzer.setRepositoryBaseUrl(repository.getUrl());
boolean isAuthenticationRequired = Optional.ofNullable(repository.isAuthenticationRequired()).orElse(false);
if (isAuthenticationRequired) {
try {
analyzer.setRepositoryUsernameAndPassword(repository.getUsername(), secretDecryptor.decryptAsString(repository.getPassword()));
} catch (Exception e) {
LOGGER.error("Failed decrypting password for repository: " + repository.getIdentifier(), e);
}
}

// Analyzers still work with "legacy" data models,
// allowing us to avoid major refactorings of the original code.
final var component = new org.hyades.persistence.model.Component();
component.setPurl(analysisCommand.getComponent().getPurl());
LOGGER.debug("Performing meta analysis on purl: {}", component.getPurl());
MetaModel metaModel = null;
org.hyades.model.IntegrityMeta integrityMeta = null;
try {
if (analysisCommand.getFetchLatestVersion()) {
metaModel = analyzer.analyze(component);
}
if (analysisCommand.getFetchIntegrityData()) {
integrityMeta = analyzer.getIntegrityMeta(component);
}
} catch (Exception e) {
LOGGER.error("Failed to analyze {} using {} with repository {}",
component.getPurl(), analyzer.getName(), repository.getIdentifier(), e);
return Optional.empty();
}

final AnalysisResult.Builder resultBuilder = AnalysisResult.newBuilder()
.setComponent(analysisCommand.getComponent())
.setRepository(repository.getIdentifier());
if (metaModel != null && metaModel.getLatestVersion() != null) {
resultBuilder.setLatestVersion(metaModel.getLatestVersion());
if (metaModel.getPublishedTimestamp() != null) {
resultBuilder.setPublished(Timestamp.newBuilder()
.setSeconds(metaModel.getPublishedTimestamp().getTime() / 1000));
}
LOGGER.debug("Found component metadata for: {} using repository: {} ({})",
component.getPurl(), repository.getIdentifier(), repository.getType());
}
if (integrityMeta != null) {
IntegrityMeta.Builder metaBuilder = IntegrityMeta.newBuilder();
Optional.ofNullable(integrityMeta.getMd5()).ifPresent(hash -> metaBuilder.setMd5(hash));
Optional.ofNullable(integrityMeta.getSha1()).ifPresent(hash -> metaBuilder.setSha1(hash));
Optional.ofNullable(integrityMeta.getSha256()).ifPresent(hash -> metaBuilder.setSha256(hash));
Optional.ofNullable(integrityMeta.getSha512()).ifPresent(hash -> metaBuilder.setSha512(hash));
Optional.ofNullable(integrityMeta.getMetaSourceUrl()).ifPresent(url -> metaBuilder.setMetaSourceUrl(url));
Optional.ofNullable(integrityMeta.getCurrentVersionLastModified()).ifPresent(date ->
metaBuilder.setCurrentVersionLastModified(Timestamp.newBuilder()
.setSeconds(date.getTime() / 1000)));
resultBuilder.setIntegrityMeta(metaBuilder);
LOGGER.debug("Found integrity metadata for: {} using repository: {} ({})",
component.getPurl(), repository.getIdentifier(), repository.getType());
}
return Optional.of(resultBuilder.build());
}

private PackageURL mustParsePurl(final String purl) {
try {
return new PackageURL(purl);
} catch (MalformedPackageURLException e) {
throw new IllegalStateException("""
The provided PURL is invalid, even though it should have been
validated in a previous processing step
""", e);
}
}

private List<Repository> getApplicableRepositories(final RepositoryType repositoryType) {
// Hibernate requires an active transaction to perform any sort of interaction
// with the database. Because processors can't be CDI beans, we cannot use
Expand Down Expand Up @@ -216,4 +178,68 @@ private void cacheResult(final MetaAnalyzerCacheKey cacheKey, final AnalysisResu
cache.get(cacheKey, key -> result).await().indefinitely();
}

private PackageURL filterPurlCoordinates(final PackageURL purl, final boolean forIntegrityMeta) {
try {
return new PackageURL(purl.getType(), purl.getNamespace(), purl.getName(),
forIntegrityMeta ? purl.getVersion() : null,
forIntegrityMeta ? (TreeMap<String, String>) purl.getQualifiers() : null,
null);
} catch (MalformedPackageURLException e) {
throw new IllegalStateException("""
The provided PURL is invalid, even though it should have been
validated in a previous processing step
""", e);
}
}

private org.hyades.model.IntegrityMeta fetchIntegrityMeta(IMetaAnalyzer analyzer, final Repository repository, final AnalysisCommand analysisCommand) {
configureAnalyzer(analyzer, repository);
final var component = new org.hyades.persistence.model.Component();
component.setPurl(analysisCommand.getComponent().getPurl());
LOGGER.debug("Performing integrity meta fetch on purl: {}", component.getPurl());
org.hyades.model.IntegrityMeta integrityMeta;
try {
integrityMeta = analyzer.getIntegrityMeta(component);
} catch (Exception e) {
LOGGER.error("Failed to analyze {} using {} with repository {}",
component.getPurl(), analyzer.getName(), repository.getIdentifier(), e);
return null;
}
LOGGER.debug("Found integrity metadata for: {} using repository: {} ({})",
component.getPurl(), repository.getIdentifier(), repository.getType());
return integrityMeta;
}

private MetaModel fetchRepoMeta(IMetaAnalyzer analyzer, final Repository repository, final AnalysisCommand analysisCommand) {
configureAnalyzer(analyzer, repository);
// Analyzers still work with "legacy" data models,
// allowing us to avoid major refactorings of the original code.
final var component = new org.hyades.persistence.model.Component();
component.setPurl(analysisCommand.getComponent().getPurl());
LOGGER.debug("Performing meta analysis on purl: {}", component.getPurl());
MetaModel metaModel = null;
try {
if (analysisCommand.getFetchLatestVersion()) {
metaModel = analyzer.analyze(component);
LOGGER.debug("Found component metadata for: {} using repository: {} ({})",
component.getPurl(), repository.getIdentifier(), repository.getType());
}
} catch (Exception e) {
LOGGER.error("Failed to analyze {} using {} with repository {}",
component.getPurl(), analyzer.getName(), repository.getIdentifier(), e);
}
return metaModel;
}

private void configureAnalyzer(final IMetaAnalyzer analyzer, final Repository repository) {
analyzer.setRepositoryBaseUrl(repository.getUrl());
boolean isAuthenticationRequired = Optional.ofNullable(repository.isAuthenticationRequired()).orElse(false);
if (isAuthenticationRequired) {
try {
analyzer.setRepositoryUsernameAndPassword(repository.getUsername(), secretDecryptor.decryptAsString(repository.getPassword()));
} catch (Exception e) {
LOGGER.error("Failed decrypting password for repository: " + repository.getIdentifier(), e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.hyades.util;

import com.github.packageurl.MalformedPackageURLException;
import com.github.packageurl.PackageURL;

public final class PurlUtil {

private PurlUtil() {
}

/**
* @param purl the purl string to parse
* @return a PackageURL object
* @since 3.1.0
*/
public static PackageURL parsePurlCoordinates(final String purl) {
try {
return new PackageURL(purl);
} catch (MalformedPackageURLException e) {
throw new IllegalStateException("""
The provided PURL is invalid, even though it should have been
validated in a previous processing step
""", e);
}
}
}
Loading

0 comments on commit f77c34e

Please sign in to comment.