forked from TEAMMATES/teammates
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add patch data migration script for usage stats (TEAMMATES#12970)
* Add patch data migration script for usage stats * Fix hibernate batch query * Revert log
- Loading branch information
Showing
1 changed file
with
293 additions
and
0 deletions.
There are no files selected for viewing
293 changes: 293 additions & 0 deletions
293
src/client/java/teammates/client/scripts/sql/PatchDataMigrationForUsageStatisticsSql.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
package teammates.client.scripts.sql; | ||
|
||
// CHECKSTYLE.OFF:ImportOrder | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.nio.file.StandardOpenOption; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.stream.Collectors; | ||
|
||
import com.google.cloud.datastore.Cursor; | ||
import com.google.cloud.datastore.QueryResults; | ||
import com.googlecode.objectify.cmd.Query; | ||
|
||
import jakarta.persistence.TypedQuery; | ||
import jakarta.persistence.criteria.CriteriaBuilder; | ||
import jakarta.persistence.criteria.CriteriaQuery; | ||
import jakarta.persistence.criteria.Root; | ||
|
||
import teammates.client.connector.DatastoreClient; | ||
import teammates.client.util.ClientProperties; | ||
import teammates.common.util.Const; | ||
import teammates.common.util.HibernateUtil; | ||
import teammates.storage.sqlentity.UsageStatistics; | ||
import teammates.test.FileHelper; | ||
|
||
// CHECKSTYLE.ON:ImportOrder | ||
/** | ||
* Patch Migration Script for Usage Statistics | ||
* | ||
* Batch select datastore usage statistics, and then batch select SQL entities | ||
* with the same start timestamps. | ||
* Compare the size of the two list, if is not equal, find the missing one in | ||
* SQL and migrate it. | ||
*/ | ||
@SuppressWarnings("PMD") | ||
public class PatchDataMigrationForUsageStatisticsSql extends DatastoreClient { | ||
// the folder where the cursor position and console output is saved as a file | ||
private static final String BASE_LOG_URI = "src/client/java/teammates/client/scripts/log/"; | ||
|
||
// 100 is the optimal batch size as there won't be too much time interval | ||
// between read and save (if transaction is not used) | ||
// cannot set number greater than 300 | ||
// see | ||
// https://stackoverflow.com/questions/41499505/objectify-queries-setting-limit-above-300-does-not-work | ||
private static final int BATCH_SIZE = 100; | ||
|
||
// Creates the folder that will contain the stored log. | ||
static { | ||
new File(BASE_LOG_URI).mkdir(); | ||
} | ||
|
||
AtomicLong numberOfScannedKey; | ||
AtomicLong numberOfAffectedEntities; | ||
AtomicLong numberOfUpdatedEntities; | ||
|
||
private List<teammates.storage.sqlentity.UsageStatistics> entitiesSavingBuffer; | ||
|
||
private PatchDataMigrationForUsageStatisticsSql() { | ||
numberOfScannedKey = new AtomicLong(); | ||
numberOfAffectedEntities = new AtomicLong(); | ||
numberOfUpdatedEntities = new AtomicLong(); | ||
|
||
entitiesSavingBuffer = new ArrayList<>(); | ||
|
||
String connectionUrl = ClientProperties.SCRIPT_API_URL; | ||
String username = ClientProperties.SCRIPT_API_NAME; | ||
String password = ClientProperties.SCRIPT_API_PASSWORD; | ||
|
||
HibernateUtil.buildSessionFactory(connectionUrl, username, password); | ||
} | ||
|
||
public static void main(String[] args) { | ||
new PatchDataMigrationForUsageStatisticsSql().doOperationRemotely(); | ||
} | ||
|
||
/** | ||
* Returns the log prefix. | ||
*/ | ||
protected String getLogPrefix() { | ||
return String.format("Usage Statistics Patch Migration:"); | ||
} | ||
|
||
private boolean isPreview() { | ||
return false; | ||
} | ||
|
||
/** | ||
* Returns whether the account has been migrated. | ||
*/ | ||
protected boolean isMigrationNeeded(teammates.storage.entity.UsageStatistics entity) { | ||
return true; | ||
} | ||
|
||
/** | ||
* Returns the filter query. | ||
*/ | ||
protected Query<teammates.storage.entity.UsageStatistics> getFilterQuery() { | ||
return ofy().load().type(teammates.storage.entity.UsageStatistics.class); | ||
} | ||
|
||
private void doMigration(teammates.storage.entity.UsageStatistics entity) { | ||
try { | ||
if (!isMigrationNeeded(entity)) { | ||
return; | ||
} | ||
if (!isPreview()) { | ||
migrateEntity(entity); | ||
} | ||
} catch (Exception e) { | ||
logError("Problem migrating usage stats " + entity); | ||
logError(e.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* Migrates the entity. In this case, add entity to buffer. | ||
*/ | ||
protected void migrateEntity(teammates.storage.entity.UsageStatistics oldEntity) { | ||
UsageStatistics newEntity = new UsageStatistics( | ||
oldEntity.getStartTime(), | ||
oldEntity.getTimePeriod(), | ||
oldEntity.getNumResponses(), | ||
oldEntity.getNumCourses(), | ||
oldEntity.getNumStudents(), | ||
oldEntity.getNumInstructors(), | ||
oldEntity.getNumAccountRequests(), | ||
oldEntity.getNumEmails(), | ||
oldEntity.getNumSubmissions()); | ||
|
||
entitiesSavingBuffer.add(newEntity); | ||
} | ||
|
||
@Override | ||
protected void doOperation() { | ||
log("Running " + getClass().getSimpleName() + "..."); | ||
log("Preview: " + isPreview()); | ||
|
||
Cursor cursor = readPositionOfCursorFromFile().orElse(null); | ||
if (cursor == null) { | ||
log("Start from the beginning"); | ||
} else { | ||
log("Start from cursor position: " + cursor.toUrlSafe()); | ||
} | ||
|
||
boolean shouldContinue = true; | ||
while (shouldContinue) { | ||
shouldContinue = false; | ||
Query<teammates.storage.entity.UsageStatistics> filterQueryKeys = getFilterQuery().limit(BATCH_SIZE); | ||
if (cursor != null) { | ||
filterQueryKeys = filterQueryKeys.startAt(cursor); | ||
} | ||
QueryResults<teammates.storage.entity.UsageStatistics> iterator; | ||
|
||
iterator = filterQueryKeys.iterator(); | ||
|
||
while (iterator.hasNext()) { | ||
shouldContinue = true; | ||
|
||
doMigration(iterator.next()); | ||
|
||
numberOfScannedKey.incrementAndGet(); | ||
} | ||
|
||
if (shouldContinue) { | ||
cursor = iterator.getCursorAfter(); | ||
flushEntitiesSavingBuffer(); | ||
savePositionOfCursorToFile(cursor); | ||
log(String.format("Cursor Position: %s", cursor.toUrlSafe())); | ||
log(String.format("Number Of Entity Key Scanned: %d", numberOfScannedKey.get())); | ||
log(String.format("Number Of Entity affected: %d", numberOfAffectedEntities.get())); | ||
log(String.format("Number Of Entity updated: %d", numberOfUpdatedEntities.get())); | ||
} | ||
} | ||
|
||
deleteCursorPositionFile(); | ||
log(isPreview() ? "Preview Completed!" : "Migration Completed!"); | ||
log("Total number of entities: " + numberOfScannedKey.get()); | ||
log("Number of affected entities: " + numberOfAffectedEntities.get()); | ||
log("Number of updated entities: " + numberOfUpdatedEntities.get()); | ||
} | ||
|
||
/** | ||
* Flushes the saving buffer by issuing Cloud SQL save request. | ||
*/ | ||
private void flushEntitiesSavingBuffer() { | ||
if (!entitiesSavingBuffer.isEmpty() && !isPreview()) { | ||
log("Checking usage stats in batch..." + entitiesSavingBuffer.size()); | ||
|
||
// batch query from SQL (migrate if startTime in the datastore but not in sql) | ||
List<Instant> instantList = entitiesSavingBuffer.stream().map(entity -> entity.getStartTime()) | ||
.collect(Collectors.toList()); | ||
|
||
HibernateUtil.beginTransaction(); | ||
CriteriaBuilder cb = HibernateUtil.getCriteriaBuilder(); | ||
CriteriaQuery<teammates.storage.sqlentity.UsageStatistics> cr = cb.createQuery( | ||
teammates.storage.sqlentity.UsageStatistics.class); | ||
Root<teammates.storage.sqlentity.UsageStatistics> root = cr | ||
.from(teammates.storage.sqlentity.UsageStatistics.class); | ||
|
||
cr.select(root).where(root.get("startTime").in(instantList)); | ||
TypedQuery<teammates.storage.sqlentity.UsageStatistics> query = HibernateUtil.createQuery(cr); | ||
List<teammates.storage.sqlentity.UsageStatistics> sqlEntitiesFound = query.getResultList(); | ||
if (sqlEntitiesFound.size() != entitiesSavingBuffer.size()) { | ||
Set<Instant> foundInstants = sqlEntitiesFound.stream().map(entity -> entity.getStartTime()) | ||
.collect(Collectors.toSet()); | ||
for (teammates.storage.sqlentity.UsageStatistics entity : entitiesSavingBuffer) { | ||
if (foundInstants.contains(entity.getStartTime())) { | ||
continue; | ||
} | ||
// entity is not found in SQL | ||
log("Migrating missing usage stats: it's start time is: " + entity.getStartTime().toString()); | ||
numberOfAffectedEntities.incrementAndGet(); | ||
numberOfUpdatedEntities.incrementAndGet(); | ||
HibernateUtil.persist(entity); | ||
} | ||
} | ||
|
||
HibernateUtil.flushSession(); | ||
HibernateUtil.clearSession(); | ||
HibernateUtil.commitTransaction(); | ||
|
||
} | ||
entitiesSavingBuffer.clear(); | ||
} | ||
|
||
/** | ||
* Saves the cursor position to a file so it can be used in the next run. | ||
*/ | ||
private void savePositionOfCursorToFile(Cursor cursor) { | ||
try { | ||
FileHelper.saveFile( | ||
BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor", cursor.toUrlSafe()); | ||
} catch (IOException e) { | ||
logError("Fail to save cursor position " + e.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* Reads the cursor position from the saved file. | ||
* | ||
* @return cursor if the file can be properly decoded. | ||
*/ | ||
private Optional<Cursor> readPositionOfCursorFromFile() { | ||
try { | ||
String cursorPosition = FileHelper.readFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor"); | ||
return Optional.of(Cursor.fromUrlSafe(cursorPosition)); | ||
} catch (IOException | IllegalArgumentException e) { | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
/** | ||
* Deletes the cursor position file. | ||
*/ | ||
private void deleteCursorPositionFile() { | ||
FileHelper.deleteFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor"); | ||
} | ||
|
||
/** | ||
* Logs a comment. | ||
*/ | ||
protected void log(String logLine) { | ||
System.out.println(String.format("%s %s", getLogPrefix(), logLine)); | ||
|
||
Path logPath = Paths.get(BASE_LOG_URI + this.getClass().getSimpleName() + ".log"); | ||
try (OutputStream logFile = Files.newOutputStream(logPath, | ||
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) { | ||
logFile.write((logLine + System.lineSeparator()).getBytes(Const.ENCODING)); | ||
} catch (Exception e) { | ||
System.err.println("Error writing log line: " + logLine); | ||
System.err.println(e.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* Logs an error and persists it to the disk. | ||
*/ | ||
protected void logError(String logLine) { | ||
System.err.println(logLine); | ||
|
||
log("[ERROR]" + logLine); | ||
} | ||
|
||
} |