Skip to content

Commit

Permalink
feat: epnding and ko files are saved to s3 in if active
Browse files Browse the repository at this point in the history
  • Loading branch information
Nolife999 committed Nov 2, 2024
1 parent 8a64630 commit 3280be7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 53 deletions.
111 changes: 58 additions & 53 deletions arc-batch/src/main/java/fr/insee/arc/batch/BatchARC.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import fr.insee.arc.utils.exception.ArcException;
import fr.insee.arc.utils.exception.ArcExceptionMessage;
import fr.insee.arc.utils.files.FileUtilsArc;
import fr.insee.arc.utils.minio.S3Template;
import fr.insee.arc.utils.ressourceUtils.PropertiesHandler;
import fr.insee.arc.utils.security.SecurityDao;
import fr.insee.arc.utils.utils.FormatSQL;
Expand Down Expand Up @@ -385,29 +384,6 @@ private void executorsDatabaseCreate() throws ArcException {
message(ApiManageExecutorDatabase.create().toString());
Sleep.sleep(waitExecutorTimerInMS);
}

/**
* Delete the volatile executor database Can only happen if executors are
* declared on kubernetes
*
* @throws ArcException
*/
private void executorsDatabaseDelete() throws ArcException {
message(ApiManageExecutorDatabase.delete().toString());
}


private void archiveDirectoryDelete() throws ArcException {

File[] archiveDirectories = FileSystemManagement.getArchiveDirectories(repertoire, envExecution);

for (File f:archiveDirectories)
{
FileUtilsArc.deleteDirectory(f);
FileUtilsArc.createDirIfNotexist(f);
}
}


/***
* Delete files, export to parquet
Expand All @@ -421,10 +397,6 @@ private void batchFinalize() throws ArcException {

executeIfParquetActive(this::exportToParquet);

executeIfVolatile(this::executorsDatabaseDelete);

executeIfVolatile(this::archiveDirectoryDelete);

message("Traitement Fin");

}
Expand Down Expand Up @@ -595,6 +567,36 @@ private void synchronizeExecutorsMetadata() throws ArcException {
message("Synchronization terminé");
}

/**
* mark if the batch has been interrupted get the list of archives which process
* was interrupted to move them back in the input directory
*
* @throws ArcException
* @throws IOException
*/
private void deplacerFichiersNonTraites() throws ArcException {

List<String> aBouger = exportOn ? //
BatchArcDao.execQuerySelectArchiveNotExported(envExecution) //
: BatchArcDao.execQuerySelectArchiveEnCours(envExecution);

dejaEnCours = (!aBouger.isEmpty());

// si oui, on essaie de recopier les archives dans chargement OK
if (dejaEnCours) {
copyPendingFilesOfLastBatchFromArchiveDirectoryToOKDirectory(envExecution, repertoire, aBouger);
}

// si le s3 est actif, on sauvegarde les archives pending ou KO vers le s3
List<String> aBougerToS3 = ArcS3.INPUT_BUCKET.isS3Off() ? new ArrayList<>():BatchArcDao.execQuerySelectArchivePendingOrKO(envExecution);
if (!aBougerToS3.isEmpty()) {
savePendingOrKOArchivesToS3(envExecution, repertoire, aBougerToS3);
}

message("Fin des déplacements de fichiers");

}

/**
* Copy the files from the archive directory to ok directory
*
Expand All @@ -603,7 +605,7 @@ private void synchronizeExecutorsMetadata() throws ArcException {
* @param aBouger
* @throws IOException
*/
private void copyFileInErrorLastBatchFromArchiveDirectoryToOKDirectory(String envExecution, String repertoire, List<String> aBouger)
private void copyPendingFilesOfLastBatchFromArchiveDirectoryToOKDirectory(String envExecution, String repertoire, List<String> aBouger)
throws ArcException {

for (String container : aBouger) {
Expand All @@ -623,38 +625,41 @@ private void copyFileInErrorLastBatchFromArchiveDirectoryToOKDirectory(String en
throw new ArcException(ArcExceptionMessage.FILE_COPY_FAILED, fIn.getAbsolutePath(),
fOut.getAbsolutePath());
}

// if s3 in exists, copy files to s3 ko directory if error had occured in batch before
String s3ArchiveDirectory = DirectoryPath.s3ReceptionEntrepotKO(envExecution, entrepotContainer);
ArcS3.INPUT_BUCKET.createDirectory(s3ArchiveDirectory);
ArcS3.INPUT_BUCKET.upload(fIn, s3ArchiveDirectory + File.separator + originalContainer);
ArcS3.INPUT_BUCKET.closeMinioClient();

}
}

/**
* mark if the batch has been interrupted get the list of archives which process
* was interrupted to move them back in the input directory
*
* Copy files to s3 KO directory if archive was KO or pending from previous batch
* @param envExecution2
* @param repertoire2
* @param aBougerToS3
* @throws ArcException
* @throws IOException
*/
private void deplacerFichiersNonTraites() throws ArcException {

List<String> aBouger = exportOn ? //
BatchArcDao.execQuerySelectArchiveNotExported(envExecution) //
: BatchArcDao.execQuerySelectArchiveEnCours(envExecution);

dejaEnCours = (!aBouger.isEmpty());
private void savePendingOrKOArchivesToS3(String envExecution2, String repertoire2, List<String> aBougerToS3) throws ArcException {
for (String container : aBougerToS3) {
String entrepotContainer = ManipString.substringBeforeFirst(container, "_");
String originalContainer = ManipString.substringAfterFirst(container, "_");

// si oui, on essaie de recopier les archives dans chargement OK
if (dejaEnCours) {
copyFileInErrorLastBatchFromArchiveDirectoryToOKDirectory(envExecution, repertoire, aBouger);
File fIn = Paths
.get(DirectoryPath.directoryReceptionEntrepotArchive(repertoire, envExecution, entrepotContainer),
originalContainer)
.toFile();

if (!fIn.exists())
continue;

// save files to s3 if not already exist
String s3ArchiveDirectory = DirectoryPath.s3ReceptionEntrepotKO(envExecution, entrepotContainer);
ArcS3.INPUT_BUCKET.createDirectory(s3ArchiveDirectory);
String targetS3File= s3ArchiveDirectory + File.separator + originalContainer;
if (!ArcS3.INPUT_BUCKET.isExists(targetS3File))
{
ArcS3.INPUT_BUCKET.upload(fIn, targetS3File);
}
ArcS3.INPUT_BUCKET.closeMinioClient();

}

message("Fin des déplacements de fichiers");


}

/**
Expand Down
19 changes: 19 additions & 0 deletions arc-batch/src/main/java/fr/insee/arc/batch/dao/BatchArcDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ public static List<String> execQuerySelectArchiveNotExported(String envExecution
return new GenericBean(UtilitaireDao.get(ArcDatabase.COORDINATOR.getIndex()).executeRequest(null, query))
.getColumnValues(ColumnEnum.CONTAINER.getColumnName());
}

/**
* Select the archives KO used for volatile mode
*
* @param envExecution
* @return
* @throws ArcException
*/
public static List<String> execQuerySelectArchivePendingOrKO(String envExecution) throws ArcException {

ArcPreparedStatementBuilder query = new ArcPreparedStatementBuilder();
query.append(queryPipelineNotFinished(envExecution, ConditionExecution.PHASE_PRECEDENTE_TERMINE_PIPELINE_NON_TERMINE));
query.build(SQL.UNION);
query.append(queryPipelineNotFinished(envExecution, ConditionExecution.PIPELINE_TERMINE_DONNEES_KO));

return new GenericBean(UtilitaireDao.get(ArcDatabase.COORDINATOR.getIndex()).executeRequest(null, query))
.getColumnValues(ColumnEnum.CONTAINER.getColumnName());
}


/**
* Reset the status of interrupted archives in the pilotage table Archives entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum ConditionExecution {
AUCUN_PREREQUIS("null"),
PHASE_PRECEDENTE_TERMINE_PIPELINE_NON_TERMINE("etape=1"),
PIPELINE_TERMINE_DONNEES_NON_EXPORTEES("etape=2 AND client is null AND etat_traitement='{OK}'"),
PIPELINE_TERMINE_DONNEES_KO("etape=2 AND etat_traitement='{KO}'"),
;

private String sqlFilter;
Expand Down

0 comments on commit 3280be7

Please sign in to comment.