Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compress and index in background #32

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,34 @@
public class ReportConverter implements AttributeConverter<String, byte[]> {
@Override
public byte[] convertToDatabaseColumn(String report) {
byte[] reportBytes = report.getBytes(StandardCharsets.UTF_8);
if (isCompressed(reportBytes)) {
return reportBytes;
}
return compress(report);
}

@Override
public String convertToEntityAttribute(byte[] zippedReport) {
if (!isCompressed(zippedReport)) {
return new String(zippedReport, StandardCharsets.UTF_8);
if (isCompressed(zippedReport)) {
return decompress(zippedReport);
}
return decompress(zippedReport);
}

private boolean isCompressed(byte[] data) {
return (data != null && data.length >= 2 &&
(data[0] == (byte) 0x1f && data[1] == (byte) 0x8b));
return new String(zippedReport, StandardCharsets.UTF_8);
}

private byte[] compress(String report) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {

gzipOutputStream.write(report.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.finish();
return byteArrayOutputStream.toByteArray();
gzipOutputStream.write(report.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.finish();
return byteArrayOutputStream.toByteArray();

} catch (IOException e) {
throw new RuntimeException("Failed to compress report content", e);
} catch (IOException e) {
throw new RuntimeException("Failed to compress report content", e);
}
}
}


private String decompress(byte[] compressedData) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);
Expand All @@ -60,4 +60,9 @@ private String decompress(byte[] compressedData) {
throw new RuntimeException("Failed to decompress report content", e);
}
}

private boolean isCompressed(byte[] data) {
return (data != null && data.length >= 2 &&
(data[0] == (byte) 0x1f && data[1] == (byte) 0x8b));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public List<Long> idsByKeywordInReport(String keyword) {
.toList();

}

public int count() {
Query whatQuery = new TermQuery(new Term(WHAT, SCENARIO_EXECUTION_REPORT));
return indexRepository.count(whatQuery);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.domain;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class DataMigrationExecutor implements CommandLineRunner {

private final List<DataMigrator> dataMigrators;
private final ExecutorService executorService;


public DataMigrationExecutor(List<DataMigrator> dataMigrators) {
this.dataMigrators = dataMigrators;
executorService = Executors.newFixedThreadPool(dataMigrators.size());
}

@Override
public void run(String... args) throws Exception {
dataMigrators.forEach(dataMigrator -> executorService.submit(dataMigrator::migrate));
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-FileCopyrightText: 2017-2024 Enedis
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package com.chutneytesting.migration.domain;

public interface DataMigrator {
void migrate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,45 @@
*
*/

package com.chutneytesting.execution.infra.migration;

import static com.chutneytesting.index.infra.ScenarioExecutionReportIndexRepository.SCENARIO_EXECUTION_REPORT;
import static com.chutneytesting.index.infra.ScenarioExecutionReportIndexRepository.WHAT;
package com.chutneytesting.migration.infra;

import com.chutneytesting.execution.infra.storage.ScenarioExecutionReportJpaRepository;
import com.chutneytesting.execution.infra.storage.jpa.ScenarioExecutionReportEntity;
import com.chutneytesting.index.infra.IndexRepository;
import com.chutneytesting.index.infra.ScenarioExecutionReportIndexRepository;
import com.chutneytesting.migration.domain.DataMigrator;
import jakarta.persistence.EntityManager;
import java.util.List;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class ZipReportMigration implements CommandLineRunner {
public class ExecutionReportMigrator implements DataMigrator {


private final ScenarioExecutionReportIndexRepository scenarioExecutionReportIndexRepository;
private final ScenarioExecutionReportJpaRepository scenarioExecutionReportJpaRepository;
private final IndexRepository indexRepository;
private final EntityManager entityManager;
private static final Logger LOGGER = LoggerFactory.getLogger(ZipReportMigration.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionReportMigrator.class);


public ZipReportMigration(ScenarioExecutionReportIndexRepository scenarioExecutionReportIndexRepository, ScenarioExecutionReportJpaRepository scenarioExecutionReportJpaRepository, IndexRepository indexRepository, EntityManager entityManager) {
public ExecutionReportMigrator(ScenarioExecutionReportIndexRepository scenarioExecutionReportIndexRepository,
ScenarioExecutionReportJpaRepository scenarioExecutionReportJpaRepository,
EntityManager entityManager) {
this.scenarioExecutionReportIndexRepository = scenarioExecutionReportIndexRepository;
this.scenarioExecutionReportJpaRepository = scenarioExecutionReportJpaRepository;
this.indexRepository = indexRepository;
this.entityManager = entityManager;
}

@Override
@Transactional
public void run(String... args) {
public void migrate() {
if (isMigrationDone()) {
LOGGER.info("Report compression & indexing already done, skipping...");
LOGGER.info("Report index not empty. Skipping indexing and in-db compression...");
return;
}
PageRequest firstPage = PageRequest.of(0, 10);
Expand All @@ -74,24 +67,21 @@ private void compressAndIndex(Pageable pageable, int previousCount) {
}

private void compressAndSaveInDb(List<ScenarioExecutionReportEntity> reportsInDb) {
// calling scenarioExecutionReportJpaRepository find() and then save() doesn't call ReportConverter
// ReportConverter will be called by entityManager update. So compression will be done
reportsInDb.forEach(report -> {
entityManager.createQuery(
"UPDATE SCENARIO_EXECUTIONS_REPORTS SET report = :report WHERE id = :id")
.setParameter("report", report.getReport())
.setParameter("id", report.scenarioExecutionId())
.executeUpdate();
});
// calling scenarioExecutionReportJpaRepository find() and then save() doesn't call ReportConverter.
// ReportConverter will be called by entityManager update. So compression will be done.
reportsInDb.forEach(report -> entityManager.createQuery(
"UPDATE SCENARIO_EXECUTIONS_REPORTS SET report = :report WHERE id = :id")
.setParameter("report", report.getReport())
.setParameter("id", report.scenarioExecutionId())
.executeUpdate());
}

private void index(List<ScenarioExecutionReportEntity> reportsInDb) {
scenarioExecutionReportIndexRepository.saveAll(reportsInDb);
}

private boolean isMigrationDone() {
Query whatQuery = new TermQuery(new Term(WHAT, SCENARIO_EXECUTION_REPORT));
int indexedReports = indexRepository.count(whatQuery);
int indexedReports = scenarioExecutionReportIndexRepository.count();
return indexedReports > 0;
}
}
Loading