Skip to content

Commit

Permalink
Chunk in queue
Browse files Browse the repository at this point in the history
Signed-off-by: Dan Harrin <[email protected]>
  • Loading branch information
danharrin committed Feb 18, 2025
1 parent 00d9065 commit d63a216
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 326 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@

class FinalizeStudentDataImport
{
public function __construct(
protected CleanUpFailedStudentDataImportTables $cleanUpFailedStudentDataImportTables,
) {}

public function execute(
Import $studentsImport,
?Import $programsImport = null,
Expand All @@ -64,24 +60,32 @@ public function execute(

$totalFailedRowsCount = $studentsImportFailedRowsCount + $programsImportFailedRowsCount + $enrollmentsImportFailedRowsCount;

// if (! $totalFailedRowsCount) {
// DB::transaction(function () use ($studentsImport, $programsImport, $enrollmentsImport) {
// DB::statement('drop table "students"');
// DB::statement("alter table \"import_{$studentsImport->getKey()}_students\" rename to \"students\"");

// if ($programsImport) {
// DB::statement('drop table "programs"');
// DB::statement("alter table \"import_{$programsImport->getKey()}_programs\" rename to \"programs\"");
// }

// if ($enrollmentsImport) {
// DB::statement('drop table "enrollments"');
// DB::statement("alter table \"import_{$enrollmentsImport->getKey()}_enrollments\" rename to \"enrollments\"");
// }
// });
// } else {
// $this->cleanUpFailedStudentDataImportTables->execute($studentsImport, $programsImport, $enrollmentsImport);
// }
if (! $totalFailedRowsCount) {
DB::transaction(function () use ($studentsImport, $programsImport, $enrollmentsImport) {
DB::statement('drop table "students"');
DB::statement("alter table \"import_{$studentsImport->getKey()}_students\" rename to \"students\"");

if ($programsImport) {
DB::statement('drop table "programs"');
DB::statement("alter table \"import_{$programsImport->getKey()}_programs\" rename to \"programs\"");
}

if ($enrollmentsImport) {
DB::statement('drop table "enrollments"');
DB::statement("alter table \"import_{$enrollmentsImport->getKey()}_enrollments\" rename to \"enrollments\"");
}
});
} else {
DB::statement("drop table if exists import_{$studentsImport->getKey()}_students");

if ($programsImport) {
DB::statement("drop table if exists import_{$programsImport->getKey()}_programs");
}

if ($enrollmentsImport) {
DB::statement("drop table if exists import_{$enrollmentsImport->getKey()}_enrollments");
}
}

if (! $studentsImport->user instanceof Authenticatable) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

namespace AdvisingApp\StudentDataModel\Filament\Actions;

use AdvisingApp\StudentDataModel\Actions\CleanUpFailedStudentDataImportTables;
use AdvisingApp\StudentDataModel\Actions\CreateTemporaryStudentDataImportTables;
use AdvisingApp\StudentDataModel\Actions\FinalizeStudentDataImport;
use AdvisingApp\StudentDataModel\Filament\Imports\StudentEnrollmentImporter;
use AdvisingApp\StudentDataModel\Filament\Imports\StudentImporter;
use AdvisingApp\StudentDataModel\Filament\Imports\StudentProgramImporter;
use AdvisingApp\StudentDataModel\Jobs\PrepareStudentDataCsvImport;
use AdvisingApp\StudentDataModel\Models\Enrollment;
use AdvisingApp\StudentDataModel\Models\Program;
use AdvisingApp\StudentDataModel\Models\Student;
Expand All @@ -55,21 +55,17 @@
use Filament\Forms\Components\FileUpload;
use Filament\Forms\Components\Select;
use Filament\Notifications\Notification;
use Filament\Support\ChunkIterator;
use Illuminate\Bus\PendingBatch;
use Illuminate\Contracts\Support\Htmlable;
use Illuminate\Foundation\Bus\PendingChain;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\HtmlString;
use Illuminate\Support\Number;
use Illuminate\Support\Str;
use Illuminate\Validation\ValidationException;
use League\Csv\ByteSequence;
use League\Csv\Reader as CsvReader;
use League\Csv\Statement;
use League\Csv\TabularDataReader;
use League\Csv\Writer;
use Livewire\Component;
use Livewire\Features\SupportFileUploads\TemporaryUploadedFile;
Expand Down Expand Up @@ -266,53 +262,11 @@ public static function make(): ImportAction
$programsCsvFile = $data['programsFile'] ?? null;
$enrollmentsCsvFile = $data['enrollmentsFile'] ?? null;

$getCsvResults = function (?TemporaryUploadedFile $csvFile) use ($action): ?TabularDataReader {
if (! $csvFile) {
return null;
}

$csvStream = $action->getUploadedFileStream($csvFile);

if (! $csvStream) {
return null;
}

$csvReader = CsvReader::createFromStream($csvStream);

if (filled($csvDelimiter = $action->getCsvDelimiter($csvReader))) {
$csvReader->setDelimiter($csvDelimiter);
}

$csvReader->setHeaderOffset($action->getHeaderOffset() ?? 0);
$csvResults = Statement::create()->process($csvReader);

$totalRows = $csvResults->count();
$maxRows = $action->getMaxRows() ?? $totalRows;

if ($maxRows < $totalRows) {
Notification::make()
->title(__('filament-actions::import.notifications.max_rows.title'))
->body(trans_choice('filament-actions::import.notifications.max_rows.body', $maxRows, [
'count' => Number::format($maxRows),
]))
->danger()
->send();

$action->halt();
}

return $csvResults;
};

$csvResults = $getCsvResults($csvFile);
$programsCsvResults = $getCsvResults($programsCsvFile);
$enrollmentsCsvResults = $getCsvResults($enrollmentsCsvFile);

$user = auth()->user();

[$import, $programsImport, $enrollmentsImport] = DB::transaction(function () use ($action, $csvFile, $programsCsvFile, $enrollmentsCsvFile, $csvResults, $programsCsvResults, $enrollmentsCsvResults, $user) {
$makeImport = function (?TabularDataReader $csvResults, ?TemporaryUploadedFile $csvFile, ?string $importer = null) use ($action, $user): ?Import {
if (! $csvResults) {
[$import, $programsImport, $enrollmentsImport] = DB::transaction(function () use ($action, $csvFile, $programsCsvFile, $enrollmentsCsvFile, $user) {
$makeImport = function (?TemporaryUploadedFile $csvFile = null, ?string $importer = null) use ($action, $user): ?Import {
if (! $csvFile) {
return null;
}

Expand All @@ -321,21 +275,19 @@ public static function make(): ImportAction
$import->file_name = $csvFile->getClientOriginalName();
$import->file_path = $csvFile->getRealPath();
$import->importer = $importer ?? $action->getImporter();
$import->total_rows = $csvResults->count();
$import->total_rows = 0;
$import->save();

return $import;
};

return [
$makeImport($csvResults, $csvFile),
$makeImport($programsCsvResults, $programsCsvFile, StudentProgramImporter::class),
$makeImport($enrollmentsCsvResults, $enrollmentsCsvFile, StudentEnrollmentImporter::class),
$makeImport($csvFile),
$makeImport($programsCsvFile, StudentProgramImporter::class),
$makeImport($enrollmentsCsvFile, StudentEnrollmentImporter::class),
];
});

$job = $action->getJob();

$options = array_merge(
$action->getOptions(),
Arr::except($data, [
Expand All @@ -351,26 +303,6 @@ public static function make(): ImportAction
$programsImport?->unsetRelation('user');
$enrollmentsImport?->unsetRelation('user');

$makeImportJobs = function (?TabularDataReader $csvResults, ?Import $import, ?array $columnMap) use ($action, $job, $options): ?array {
if (! $csvResults) {
return null;
}

$importChunkIterator = new ChunkIterator($csvResults->getRecords(), chunkSize: $action->getChunkSize());

/** @var array<array<array<string, string>>> $importChunks */
$importChunks = $importChunkIterator->get();

return collect($importChunks)
->map(fn (array $importChunk): object => app($job, [
'import' => $import,
'rows' => base64_encode(serialize($importChunk)),
'columnMap' => $columnMap,
'options' => $options,
]))
->all();
};

$columnMap = $data['columnMap'];
$programsColumnMap = $data['programsColumnMap'] ?? null;
$enrollmentsColumnMap = $data['enrollmentsColumnMap'] ?? null;
Expand All @@ -396,9 +328,9 @@ public static function make(): ImportAction
app(CreateTemporaryStudentDataImportTables::class)->execute($import, $programsImport, $enrollmentsImport);

Bus::batch([
...$makeImportJobs($csvResults, $import, $columnMap),
...($programsImport ? $makeImportJobs($programsCsvResults, $programsImport, $programsColumnMap) : []),
...($enrollmentsImport ? $makeImportJobs($enrollmentsCsvResults, $enrollmentsImport, $enrollmentsColumnMap) : []),
new PrepareStudentDataCsvImport($import, $columnMap, $options),
...($programsImport ? [new PrepareStudentDataCsvImport($programsImport, $programsColumnMap, $options)] : []),
...($enrollmentsImport ? [new PrepareStudentDataCsvImport($enrollmentsImport, $enrollmentsColumnMap, $options)] : []),
])
->allowFailures()
->when(
Expand All @@ -414,16 +346,11 @@ public static function make(): ImportAction
fn (PendingChain $chain) => $chain->onConnection($jobConnection),
)
->finally(fn () => app(FinalizeStudentDataImport::class)->execute($import, $programsImport, $enrollmentsImport))
->catch(fn () => app(CleanUpFailedStudentDataImportTables::class)->execute($import, $programsImport, $enrollmentsImport))
->dispatch();

$totalRows = $import->total_rows + ($programsImport?->total_rows ?? 0) + ($enrollmentsImport?->total_rows ?? 0);

Notification::make()
->title($action->getSuccessNotificationTitle())
->body(trans_choice('filament-actions::import.notifications.started.body', $totalRows, [
'count' => Number::format($totalRows),
]))
->title('Import started')
->body('Your import has begun and will be processed in the background.')
->success()
->send();
})
Expand Down
Loading

0 comments on commit d63a216

Please sign in to comment.