Skip to content

Commit

Permalink
send integrity meta events outside of transaction and handle integrit…
Browse files Browse the repository at this point in the history
…y violation (#380)

* send integrity meta events outside of transaction and handle conflict

Signed-off-by: vithikashukla <[email protected]>

* remove unwanted import

Signed-off-by: vithikashukla <[email protected]>

---------

Signed-off-by: vithikashukla <[email protected]>
Co-authored-by: vithikashukla <[email protected]>
  • Loading branch information
VithikaS and vithikashukla authored Oct 23, 2023
1 parent 06c4ddc commit 2ab3c63
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.dependencytrack.model.Component;
import org.dependencytrack.model.ComponentIdentity;
import org.dependencytrack.model.ConfigPropertyConstants;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.dependencytrack.model.Project;
import org.dependencytrack.model.RepositoryMetaComponent;
import org.dependencytrack.model.RepositoryType;
Expand Down Expand Up @@ -811,8 +810,4 @@ private void getDirectDependenciesForPathDependencies(Map<String, Component> dep
}
dependencyGraph.putAll(addToDependencyGraph);
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return persist(integrityMetaComponent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@ public synchronized IntegrityMetaComponent updateIntegrityMetaComponent(final In
}
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return persist(integrityMetaComponent);
}

public void createIntegrityMetaHandlingConflict(IntegrityMetaComponent integrityMetaComponent) {
final String createQuery = """
INSERT INTO "INTEGRITY_META_COMPONENT" ("PURL", "STATUS", "LAST_FETCH")
VALUES (?, ?, ?)
ON CONFLICT DO NOTHING
""";
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = (Connection) pm.getDataStoreConnection();
preparedStatement = connection.prepareStatement(createQuery);
preparedStatement.setString(1, integrityMetaComponent.getPurl().toString());
preparedStatement.setString(2, integrityMetaComponent.getStatus().toString());
preparedStatement.setTimestamp(3, new java.sql.Timestamp(integrityMetaComponent.getLastFetch().getTime()));
preparedStatement.execute();
} catch (Exception ex) {
LOGGER.error("Error in creating integrity meta component", ex);
throw new RuntimeException(ex);
} finally {
DbUtil.close(preparedStatement);
DbUtil.close(connection);
}
}

/**
* Synchronizes IntegrityMetaComponent with purls from COMPONENT. This is part of initializer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,11 @@ public void batchUpdateIntegrityMetaComponent(List<IntegrityMetaComponent> purls
}

public IntegrityMetaComponent createIntegrityMetaComponent(IntegrityMetaComponent integrityMetaComponent) {
return getComponentQueryManager().createIntegrityMetaComponent(integrityMetaComponent);
return getIntegrityMetaQueryManager().createIntegrityMetaComponent(integrityMetaComponent);
}

public void createIntegrityMetaHandlingConflict(IntegrityMetaComponent integrityMetaComponent) {
getIntegrityMetaQueryManager().createIntegrityMetaHandlingConflict(integrityMetaComponent);
}

public IntegrityAnalysis getIntegrityAnalysisByComponentUuid(UUID uuid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import alpine.event.framework.Subscriber;
import alpine.notification.Notification;
import alpine.notification.NotificationLevel;
import com.google.common.collect.ImmutableMap;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
Expand Down Expand Up @@ -263,6 +264,8 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump

final var vulnAnalysisEvents = new ArrayList<ComponentVulnerabilityAnalysisEvent>();
final var repoMetaAnalysisEvents = new ArrayList<ComponentRepositoryMetaAnalysisEvent>();
final var integrityMetaAnalysisEvents = new ArrayList<ComponentRepositoryMetaAnalysisEvent>();
Map<ComponentIdentity, Component> copyOfPersistentComponents;

try (final var qm = new QueryManager()) {
final PersistenceManager pm = qm.getPersistenceManager();
Expand Down Expand Up @@ -311,6 +314,7 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
processServices(qm, project, services, identitiesByBomRef, bomRefsByIdentity);
processDependencyGraph(ctx, pm, cdxBom, project, persistentComponents, persistentServices, identitiesByBomRef);
recordBomImport(ctx, pm, project);
copyOfPersistentComponents = ImmutableMap.copyOf(persistentComponents);

// BOM ref <-> ComponentIdentity indexes are no longer needed.
// Let go of their contents to make it eligible for GC sooner.
Expand All @@ -322,15 +326,8 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
// The constructors of ComponentRepositoryMetaAnalysisEvent and ComponentVulnerabilityAnalysisEvent
// merely call a few getters on it, but the component object itself is not passed around.
// Detaching would imply additional database interactions that we'd rather not do.
if (component.getPurl() != null) {
boolean result = SUPPORTED_PACKAGE_URLS_FOR_INTEGRITY_CHECK.contains(component.getPurl().getType());
ComponentRepositoryMetaAnalysisEvent event;
if (result) {
event = createRepoMetaAnalysisEvent(component, qm);
} else {
event = new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().toString(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION);
}
repoMetaAnalysisEvents.add(event);
if(component.getPurl() != null) {
repoMetaAnalysisEvents.add(new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().toString(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION));
}
vulnAnalysisEvents.add(new ComponentVulnerabilityAnalysisEvent(
ctx.uploadToken, component, VulnerabilityAnalysisLevel.BOM_UPLOAD_ANALYSIS, component.isNew()));
Expand All @@ -343,6 +340,23 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
}
}

//only integrity metadata events have to be sent because latest version events
//have been sent already
for (final Component component : copyOfPersistentComponents.values()) {
// Note: component does not need to be detached.
// The constructors of ComponentRepositoryMetaAnalysisEvent and ComponentVulnerabilityAnalysisEvent
// merely call a few getters on it, but the component object itself is not passed around.
// Detaching would imply additional database interactions that we'd rather not do
if (component.getPurl() != null) {
if(SUPPORTED_PACKAGE_URLS_FOR_INTEGRITY_CHECK.contains(component.getPurl().getType())) {
ComponentRepositoryMetaAnalysisEvent event = integrityRepoMetaAnalysisEvent(component, qm);
if (event != null) {
integrityMetaAnalysisEvents.add(event);
}
}
}
}

var vulnAnalysisState = qm.getWorkflowStateByTokenAndStep(ctx.uploadToken, WorkflowStep.VULN_ANALYSIS);
if (!vulnAnalysisEvents.isEmpty()) {
qm.createVulnerabilityScan(TargetType.PROJECT, ctx.project.getUuid(), ctx.uploadToken.toString(), vulnAnalysisEvents.size());
Expand Down Expand Up @@ -389,6 +403,13 @@ private void processBom(final Context ctx, final File bomFile) throws BomConsump
}
}));

integrityMetaAnalysisEvents.forEach(event -> kafkaEventDispatcher.dispatchAsync(event, (metadata, exception) -> {
if (exception != null) {
// Include context in the log message to make log correlation easier.
LOGGER.error("Failed to produce %s to Kafka (%s)".formatted(event, ctx), exception);
}
}));

// TODO: Trigger index updates
}
}
Expand Down Expand Up @@ -1013,20 +1034,20 @@ public String toString() {

}

private ComponentRepositoryMetaAnalysisEvent createRepoMetaAnalysisEvent(Component component, QueryManager qm) {

private ComponentRepositoryMetaAnalysisEvent integrityRepoMetaAnalysisEvent(Component component, QueryManager qm) {
IntegrityMetaComponent integrityMetaComponent = qm.getIntegrityMetaComponent(component.getPurl().toString());
if (integrityMetaComponent == null) {
qm.getPersistenceManager().makePersistent(AbstractMetaHandler.createIntegrityMetaComponent(component.getPurl().toString()));
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA_AND_LATEST_VERSION);
qm.createIntegrityMetaHandlingConflict(AbstractMetaHandler.createIntegrityMetaComponent(component.getPurl().toString()));
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA);
}
if (integrityMetaComponent.getStatus() == null || (integrityMetaComponent.getStatus() == FetchStatus.IN_PROGRESS && (Date.from(Instant.now()).getTime() - integrityMetaComponent.getLastFetch().getTime()) > TIME_SPAN)) {
else if (integrityMetaComponent.getStatus() == null || (integrityMetaComponent.getStatus() == FetchStatus.IN_PROGRESS && (Date.from(Instant.now()).getTime() - integrityMetaComponent.getLastFetch().getTime()) > TIME_SPAN)) {
integrityMetaComponent.setLastFetch(Date.from(Instant.now()));
qm.getPersistenceManager().makePersistent(integrityMetaComponent);
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA_AND_LATEST_VERSION);
} else {
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurlCoordinates().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_LATEST_VERSION);
return new ComponentRepositoryMetaAnalysisEvent(component.getUuid(), component.getPurl().canonicalize(), component.isInternal(), FetchMeta.FETCH_META_INTEGRITY_DATA);
}
//don't send event because integrity metadata would be in db already in Processed state or sent recently
//and don't want to send again
return null;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.dependencytrack.persistence;

import org.dependencytrack.AbstractPostgresEnabledTest;
import org.dependencytrack.model.FetchStatus;
import org.dependencytrack.model.IntegrityMetaComponent;
import org.junit.Test;

import java.util.Date;

import static org.assertj.core.api.Assertions.assertThat;

public class IntegrityMetaQueryManagerPostgresTest extends AbstractPostgresEnabledTest {

@Test
public void testCreateIntegrityMetadataHandlingConflict() {
var integrityMeta = new IntegrityMetaComponent();
integrityMeta.setPurl("pkg:maven/acme/[email protected]?type=jar");
integrityMeta.setStatus(FetchStatus.IN_PROGRESS);
integrityMeta.setLastFetch(new Date());
qm.createIntegrityMetaHandlingConflict(integrityMeta);

var integrityMeta2 = new IntegrityMetaComponent();
//inserting same purl twice should not cause exception
integrityMeta2.setPurl("pkg:maven/acme/[email protected]?type=jar");
integrityMeta2.setStatus(FetchStatus.IN_PROGRESS);
integrityMeta2.setLastFetch(new Date());
assertThat(qm.getIntegrityMetaComponentCount()).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.github.packageurl.PackageURL;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.awaitility.Awaitility;
import org.dependencytrack.AbstractPostgresEnabledTest;
import org.dependencytrack.event.BomUploadEvent;
import org.dependencytrack.event.kafka.KafkaEventDispatcher;
Expand Down Expand Up @@ -101,6 +100,7 @@ public void informTest() throws Exception {
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name())
);
qm.getPersistenceManager().refresh(project);
Expand Down Expand Up @@ -186,6 +186,7 @@ public void informTestWithComponentAlreadyExistsForIntegrityCheck() throws Excep
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name())
);
qm.getPersistenceManager().refresh(project);
Expand Down Expand Up @@ -520,7 +521,7 @@ public void informWithBloatedBomTest() throws Exception {
.map(ProducerRecord::topic)
.filter(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()::equals)
.count();
assertThat(repoMetaAnalysisCommandsSent).isEqualTo(9056);
assertThat(repoMetaAnalysisCommandsSent).isEqualTo(18112);
}

@Test // https://github.com/DependencyTrack/dependency-track/issues/2519
Expand Down Expand Up @@ -656,6 +657,7 @@ public void informWithDelayedBomProcessedNotification() throws Exception {
assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED);
},
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()),
event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name())
// BOM_PROCESSED notification should not have been sent.
);
Expand Down

0 comments on commit 2ab3c63

Please sign in to comment.