From 239ae85a5bd2d40ea4464b6e47a626f76913a6e1 Mon Sep 17 00:00:00 2001 From: Zhang Ziqing <69516975+ziqing26@users.noreply.github.com> Date: Sun, 31 Mar 2024 22:26:34 +0800 Subject: [PATCH] Add patch data migration script for usage stats (#12970) * Add patch data migration script for usage stats * Fix hibernate batch query * Revert log --- ...tchDataMigrationForUsageStatisticsSql.java | 293 ++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 src/client/java/teammates/client/scripts/sql/PatchDataMigrationForUsageStatisticsSql.java diff --git a/src/client/java/teammates/client/scripts/sql/PatchDataMigrationForUsageStatisticsSql.java b/src/client/java/teammates/client/scripts/sql/PatchDataMigrationForUsageStatisticsSql.java new file mode 100644 index 00000000000..96474589c15 --- /dev/null +++ b/src/client/java/teammates/client/scripts/sql/PatchDataMigrationForUsageStatisticsSql.java @@ -0,0 +1,293 @@ +package teammates.client.scripts.sql; + +// CHECKSTYLE.OFF:ImportOrder +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import com.google.cloud.datastore.Cursor; +import com.google.cloud.datastore.QueryResults; +import com.googlecode.objectify.cmd.Query; + +import jakarta.persistence.TypedQuery; +import jakarta.persistence.criteria.CriteriaBuilder; +import jakarta.persistence.criteria.CriteriaQuery; +import jakarta.persistence.criteria.Root; + +import teammates.client.connector.DatastoreClient; +import teammates.client.util.ClientProperties; +import teammates.common.util.Const; +import teammates.common.util.HibernateUtil; +import teammates.storage.sqlentity.UsageStatistics; +import teammates.test.FileHelper; + +// CHECKSTYLE.ON:ImportOrder +/** + * Patch Migration Script for Usage Statistics + * + * Batch select datastore usage statistics, and then batch select SQL entities + * with the same start timestamps. + * Compare the size of the two list, if is not equal, find the missing one in + * SQL and migrate it. + */ +@SuppressWarnings("PMD") +public class PatchDataMigrationForUsageStatisticsSql extends DatastoreClient { + // the folder where the cursor position and console output is saved as a file + private static final String BASE_LOG_URI = "src/client/java/teammates/client/scripts/log/"; + + // 100 is the optimal batch size as there won't be too much time interval + // between read and save (if transaction is not used) + // cannot set number greater than 300 + // see + // https://stackoverflow.com/questions/41499505/objectify-queries-setting-limit-above-300-does-not-work + private static final int BATCH_SIZE = 100; + + // Creates the folder that will contain the stored log. + static { + new File(BASE_LOG_URI).mkdir(); + } + + AtomicLong numberOfScannedKey; + AtomicLong numberOfAffectedEntities; + AtomicLong numberOfUpdatedEntities; + + private List entitiesSavingBuffer; + + private PatchDataMigrationForUsageStatisticsSql() { + numberOfScannedKey = new AtomicLong(); + numberOfAffectedEntities = new AtomicLong(); + numberOfUpdatedEntities = new AtomicLong(); + + entitiesSavingBuffer = new ArrayList<>(); + + String connectionUrl = ClientProperties.SCRIPT_API_URL; + String username = ClientProperties.SCRIPT_API_NAME; + String password = ClientProperties.SCRIPT_API_PASSWORD; + + HibernateUtil.buildSessionFactory(connectionUrl, username, password); + } + + public static void main(String[] args) { + new PatchDataMigrationForUsageStatisticsSql().doOperationRemotely(); + } + + /** + * Returns the log prefix. + */ + protected String getLogPrefix() { + return String.format("Usage Statistics Patch Migration:"); + } + + private boolean isPreview() { + return false; + } + + /** + * Returns whether the account has been migrated. + */ + protected boolean isMigrationNeeded(teammates.storage.entity.UsageStatistics entity) { + return true; + } + + /** + * Returns the filter query. + */ + protected Query getFilterQuery() { + return ofy().load().type(teammates.storage.entity.UsageStatistics.class); + } + + private void doMigration(teammates.storage.entity.UsageStatistics entity) { + try { + if (!isMigrationNeeded(entity)) { + return; + } + if (!isPreview()) { + migrateEntity(entity); + } + } catch (Exception e) { + logError("Problem migrating usage stats " + entity); + logError(e.getMessage()); + } + } + + /** + * Migrates the entity. In this case, add entity to buffer. + */ + protected void migrateEntity(teammates.storage.entity.UsageStatistics oldEntity) { + UsageStatistics newEntity = new UsageStatistics( + oldEntity.getStartTime(), + oldEntity.getTimePeriod(), + oldEntity.getNumResponses(), + oldEntity.getNumCourses(), + oldEntity.getNumStudents(), + oldEntity.getNumInstructors(), + oldEntity.getNumAccountRequests(), + oldEntity.getNumEmails(), + oldEntity.getNumSubmissions()); + + entitiesSavingBuffer.add(newEntity); + } + + @Override + protected void doOperation() { + log("Running " + getClass().getSimpleName() + "..."); + log("Preview: " + isPreview()); + + Cursor cursor = readPositionOfCursorFromFile().orElse(null); + if (cursor == null) { + log("Start from the beginning"); + } else { + log("Start from cursor position: " + cursor.toUrlSafe()); + } + + boolean shouldContinue = true; + while (shouldContinue) { + shouldContinue = false; + Query filterQueryKeys = getFilterQuery().limit(BATCH_SIZE); + if (cursor != null) { + filterQueryKeys = filterQueryKeys.startAt(cursor); + } + QueryResults iterator; + + iterator = filterQueryKeys.iterator(); + + while (iterator.hasNext()) { + shouldContinue = true; + + doMigration(iterator.next()); + + numberOfScannedKey.incrementAndGet(); + } + + if (shouldContinue) { + cursor = iterator.getCursorAfter(); + flushEntitiesSavingBuffer(); + savePositionOfCursorToFile(cursor); + log(String.format("Cursor Position: %s", cursor.toUrlSafe())); + log(String.format("Number Of Entity Key Scanned: %d", numberOfScannedKey.get())); + log(String.format("Number Of Entity affected: %d", numberOfAffectedEntities.get())); + log(String.format("Number Of Entity updated: %d", numberOfUpdatedEntities.get())); + } + } + + deleteCursorPositionFile(); + log(isPreview() ? "Preview Completed!" : "Migration Completed!"); + log("Total number of entities: " + numberOfScannedKey.get()); + log("Number of affected entities: " + numberOfAffectedEntities.get()); + log("Number of updated entities: " + numberOfUpdatedEntities.get()); + } + + /** + * Flushes the saving buffer by issuing Cloud SQL save request. + */ + private void flushEntitiesSavingBuffer() { + if (!entitiesSavingBuffer.isEmpty() && !isPreview()) { + log("Checking usage stats in batch..." + entitiesSavingBuffer.size()); + + // batch query from SQL (migrate if startTime in the datastore but not in sql) + List instantList = entitiesSavingBuffer.stream().map(entity -> entity.getStartTime()) + .collect(Collectors.toList()); + + HibernateUtil.beginTransaction(); + CriteriaBuilder cb = HibernateUtil.getCriteriaBuilder(); + CriteriaQuery cr = cb.createQuery( + teammates.storage.sqlentity.UsageStatistics.class); + Root root = cr + .from(teammates.storage.sqlentity.UsageStatistics.class); + + cr.select(root).where(root.get("startTime").in(instantList)); + TypedQuery query = HibernateUtil.createQuery(cr); + List sqlEntitiesFound = query.getResultList(); + if (sqlEntitiesFound.size() != entitiesSavingBuffer.size()) { + Set foundInstants = sqlEntitiesFound.stream().map(entity -> entity.getStartTime()) + .collect(Collectors.toSet()); + for (teammates.storage.sqlentity.UsageStatistics entity : entitiesSavingBuffer) { + if (foundInstants.contains(entity.getStartTime())) { + continue; + } + // entity is not found in SQL + log("Migrating missing usage stats: it's start time is: " + entity.getStartTime().toString()); + numberOfAffectedEntities.incrementAndGet(); + numberOfUpdatedEntities.incrementAndGet(); + HibernateUtil.persist(entity); + } + } + + HibernateUtil.flushSession(); + HibernateUtil.clearSession(); + HibernateUtil.commitTransaction(); + + } + entitiesSavingBuffer.clear(); + } + + /** + * Saves the cursor position to a file so it can be used in the next run. + */ + private void savePositionOfCursorToFile(Cursor cursor) { + try { + FileHelper.saveFile( + BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor", cursor.toUrlSafe()); + } catch (IOException e) { + logError("Fail to save cursor position " + e.getMessage()); + } + } + + /** + * Reads the cursor position from the saved file. + * + * @return cursor if the file can be properly decoded. + */ + private Optional readPositionOfCursorFromFile() { + try { + String cursorPosition = FileHelper.readFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor"); + return Optional.of(Cursor.fromUrlSafe(cursorPosition)); + } catch (IOException | IllegalArgumentException e) { + return Optional.empty(); + } + } + + /** + * Deletes the cursor position file. + */ + private void deleteCursorPositionFile() { + FileHelper.deleteFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor"); + } + + /** + * Logs a comment. + */ + protected void log(String logLine) { + System.out.println(String.format("%s %s", getLogPrefix(), logLine)); + + Path logPath = Paths.get(BASE_LOG_URI + this.getClass().getSimpleName() + ".log"); + try (OutputStream logFile = Files.newOutputStream(logPath, + StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) { + logFile.write((logLine + System.lineSeparator()).getBytes(Const.ENCODING)); + } catch (Exception e) { + System.err.println("Error writing log line: " + logLine); + System.err.println(e.getMessage()); + } + } + + /** + * Logs an error and persists it to the disk. + */ + protected void logError(String logLine) { + System.err.println(logLine); + + log("[ERROR]" + logLine); + } + +}