Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compress and index in background #32

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand All @@ -20,34 +21,39 @@
public class ReportConverter implements AttributeConverter<String, byte[]> {
@Override
public byte[] convertToDatabaseColumn(String report) {
byte[] reportBytes = report.getBytes(StandardCharsets.UTF_8);
if (isCompressed(reportBytes)) {
return reportBytes;
}
return compress(report);
}

@Override
public String convertToEntityAttribute(byte[] zippedReport) {
if (!isCompressed(zippedReport)) {
return new String(zippedReport, StandardCharsets.UTF_8);
if (isCompressed(zippedReport)) {
return decompress(zippedReport);
}
return decompress(zippedReport);
}

private boolean isCompressed(byte[] data) {
return (data != null && data.length >= 2 &&
(data[0] == (byte) 0x1f && data[1] == (byte) 0x8b));
return new String(zippedReport, StandardCharsets.UTF_8);
}

private byte[] compress(String report) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
InputStream inputStream = new ByteArrayInputStream(report.getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int bytesRead;

gzipOutputStream.write(report.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.finish();
return byteArrayOutputStream.toByteArray();
while ((bytesRead = inputStream.read(buffer)) != -1) {
gzipOutputStream.write(buffer, 0, bytesRead);
}
gzipOutputStream.finish();
return byteArrayOutputStream.toByteArray();

} catch (IOException e) {
throw new RuntimeException("Failed to compress report content", e);
} catch (IOException e) {
throw new RuntimeException("Failed to compress report content", e);
}
}
}


private String decompress(byte[] compressedData) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);
Expand All @@ -60,4 +66,9 @@ private String decompress(byte[] compressedData) {
throw new RuntimeException("Failed to decompress report content", e);
}
}

private boolean isCompressed(byte[] data) {
return (data != null && data.length >= 2 &&
(data[0] == (byte) 0x1f && data[1] == (byte) 0x8b));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public List<Long> idsByKeywordInReport(String keyword) {
.toList();

}

public int count() {
Query whatQuery = new TermQuery(new Term(WHAT, SCENARIO_EXECUTION_REPORT));
return indexRepository.count(whatQuery);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.domain;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class DataMigrationExecutor implements CommandLineRunner {

private final List<DataMigrator> dataMigrators;
private final ExecutorService executorService;


public DataMigrationExecutor(List<DataMigrator> dataMigrators) {
this.dataMigrators = dataMigrators;
executorService = Executors.newFixedThreadPool(dataMigrators.size());
}

@Override
public void run(String... args) throws Exception {
dataMigrators.forEach(dataMigrator -> executorService.submit(dataMigrator::migrate));
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.domain;

public interface DataMigrator {
void migrate();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.domain;

import com.chutneytesting.execution.infra.storage.ScenarioExecutionReportJpaRepository;
import com.chutneytesting.execution.infra.storage.jpa.ScenarioExecutionReportEntity;
import com.chutneytesting.index.infra.ScenarioExecutionReportIndexRepository;
import com.chutneytesting.migration.infra.ExecutionReportRepository;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Component;

@Component
public class ExecutionReportMigrator implements DataMigrator {

private final ScenarioExecutionReportJpaRepository scenarioExecutionReportJpaRepository;
private final ScenarioExecutionReportIndexRepository scenarioExecutionReportIndexRepository;
private final ExecutionReportRepository executionReportRepository;
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionReportMigrator.class);

public ExecutionReportMigrator(ExecutionReportRepository executionReportRepository,
ScenarioExecutionReportJpaRepository scenarioExecutionReportJpaRepository, ScenarioExecutionReportIndexRepository scenarioExecutionReportIndexRepository) {
this.scenarioExecutionReportJpaRepository = scenarioExecutionReportJpaRepository;
this.executionReportRepository = executionReportRepository;
this.scenarioExecutionReportIndexRepository = scenarioExecutionReportIndexRepository;
}

@Override
public void migrate() {
if (isMigrationDone()) {
LOGGER.info("Report index not empty. Skipping indexing and in-db compression...");
return;
}
LOGGER.info("Start indexing and in-db compression...");
PageRequest firstPage = PageRequest.of(0, 10);
int count = 0;
migrate(firstPage, count);
}

private void migrate(Pageable pageable, int previousCount) {
LOGGER.debug("Indexing and compressing reports in page n° {}", pageable.getPageNumber());
Slice<ScenarioExecutionReportEntity> slice = scenarioExecutionReportJpaRepository.findAll(pageable);
List<ScenarioExecutionReportEntity> reports = slice.getContent();

executionReportRepository.compressAndSaveInDb(reports);
index(reports);

int count = previousCount + slice.getNumberOfElements();
if (slice.hasNext()) {
migrate(slice.nextPageable(), count);
} else {
LOGGER.info("{} report(s) successfully compressed and indexed", count);
}
}

private void index(List<ScenarioExecutionReportEntity> reportsInDb) {
scenarioExecutionReportIndexRepository.saveAll(reportsInDb);
}

private boolean isMigrationDone() {
int indexedReports = scenarioExecutionReportIndexRepository.count();
return indexedReports > 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.infra;

import com.chutneytesting.execution.infra.storage.jpa.ScenarioExecutionReportEntity;
import jakarta.persistence.EntityManager;
import java.util.List;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class ExecutionReportRepository {

private final EntityManager entityManager;

public ExecutionReportRepository(EntityManager entityManager ) {
this.entityManager = entityManager;
}

@Transactional
public void compressAndSaveInDb(List<ScenarioExecutionReportEntity> reportsInDb) {
reportsInDb.forEach(report -> {
entityManager.createQuery(
"UPDATE SCENARIO_EXECUTIONS_REPORTS SET report = :report WHERE id = :id")
.setParameter("report", report.getReport())
.setParameter("id", report.scenarioExecutionId())
.executeUpdate();
entityManager.detach(report);
});
}
}
Loading