Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add patch data migration script for usage stats #12970

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package teammates.client.scripts.sql;

// CHECKSTYLE.OFF:ImportOrder
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.google.cloud.datastore.Cursor;
import com.google.cloud.datastore.QueryResults;
import com.googlecode.objectify.cmd.Query;

import jakarta.persistence.TypedQuery;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.Root;

import teammates.client.connector.DatastoreClient;
import teammates.client.util.ClientProperties;
import teammates.common.util.Const;
import teammates.common.util.HibernateUtil;
import teammates.storage.sqlentity.UsageStatistics;
import teammates.test.FileHelper;

// CHECKSTYLE.ON:ImportOrder
/**
* Patch Migration Script for Usage Statistics
*
* Batch select datastore usage statistics, and then batch select SQL entities
* with the same start timestamps.
* Compare the size of the two list, if is not equal, find the missing one in
* SQL and migrate it.
*/
@SuppressWarnings("PMD")
public class PatchDataMigrationForUsageStatisticsSql extends DatastoreClient {
// the folder where the cursor position and console output is saved as a file
private static final String BASE_LOG_URI = "src/client/java/teammates/client/scripts/log/";

// 100 is the optimal batch size as there won't be too much time interval
// between read and save (if transaction is not used)
// cannot set number greater than 300
// see
// https://stackoverflow.com/questions/41499505/objectify-queries-setting-limit-above-300-does-not-work
private static final int BATCH_SIZE = 100;

// Creates the folder that will contain the stored log.
static {
new File(BASE_LOG_URI).mkdir();
}

AtomicLong numberOfScannedKey;
AtomicLong numberOfAffectedEntities;
AtomicLong numberOfUpdatedEntities;

private List<teammates.storage.sqlentity.UsageStatistics> entitiesSavingBuffer;

private PatchDataMigrationForUsageStatisticsSql() {
numberOfScannedKey = new AtomicLong();
numberOfAffectedEntities = new AtomicLong();
numberOfUpdatedEntities = new AtomicLong();

entitiesSavingBuffer = new ArrayList<>();

String connectionUrl = ClientProperties.SCRIPT_API_URL;
String username = ClientProperties.SCRIPT_API_NAME;
String password = ClientProperties.SCRIPT_API_PASSWORD;

HibernateUtil.buildSessionFactory(connectionUrl, username, password);
}

public static void main(String[] args) {
new PatchDataMigrationForUsageStatisticsSql().doOperationRemotely();
}

/**
* Returns the log prefix.
*/
protected String getLogPrefix() {
return String.format("Usage Statistics Patch Migration:");
}

private boolean isPreview() {
return false;
}

/**
* Returns whether the account has been migrated.
*/
protected boolean isMigrationNeeded(teammates.storage.entity.UsageStatistics entity) {
return true;
}

/**
* Returns the filter query.
*/
protected Query<teammates.storage.entity.UsageStatistics> getFilterQuery() {
return ofy().load().type(teammates.storage.entity.UsageStatistics.class);
}

private void doMigration(teammates.storage.entity.UsageStatistics entity) {
try {
if (!isMigrationNeeded(entity)) {
return;
}
if (!isPreview()) {
migrateEntity(entity);
}
} catch (Exception e) {
logError("Problem migrating usage stats " + entity);
logError(e.getMessage());
}
}

/**
* Migrates the entity. In this case, add entity to buffer.
*/
protected void migrateEntity(teammates.storage.entity.UsageStatistics oldEntity) {
UsageStatistics newEntity = new UsageStatistics(
oldEntity.getStartTime(),
oldEntity.getTimePeriod(),
oldEntity.getNumResponses(),
oldEntity.getNumCourses(),
oldEntity.getNumStudents(),
oldEntity.getNumInstructors(),
oldEntity.getNumAccountRequests(),
oldEntity.getNumEmails(),
oldEntity.getNumSubmissions());

entitiesSavingBuffer.add(newEntity);
}

@Override
protected void doOperation() {
log("Running " + getClass().getSimpleName() + "...");
log("Preview: " + isPreview());

Cursor cursor = readPositionOfCursorFromFile().orElse(null);
if (cursor == null) {
log("Start from the beginning");
} else {
log("Start from cursor position: " + cursor.toUrlSafe());
}

boolean shouldContinue = true;
while (shouldContinue) {
shouldContinue = false;
Query<teammates.storage.entity.UsageStatistics> filterQueryKeys = getFilterQuery().limit(BATCH_SIZE);
if (cursor != null) {
filterQueryKeys = filterQueryKeys.startAt(cursor);
}
QueryResults<teammates.storage.entity.UsageStatistics> iterator;

iterator = filterQueryKeys.iterator();

while (iterator.hasNext()) {
shouldContinue = true;

doMigration(iterator.next());

numberOfScannedKey.incrementAndGet();
}

if (shouldContinue) {
cursor = iterator.getCursorAfter();
flushEntitiesSavingBuffer();
savePositionOfCursorToFile(cursor);
log(String.format("Cursor Position: %s", cursor.toUrlSafe()));
log(String.format("Number Of Entity Key Scanned: %d", numberOfScannedKey.get()));
log(String.format("Number Of Entity affected: %d", numberOfAffectedEntities.get()));
log(String.format("Number Of Entity updated: %d", numberOfUpdatedEntities.get()));
}
}

deleteCursorPositionFile();
log(isPreview() ? "Preview Completed!" : "Migration Completed!");
log("Total number of entities: " + numberOfScannedKey.get());
log("Number of affected entities: " + numberOfAffectedEntities.get());
log("Number of updated entities: " + numberOfUpdatedEntities.get());
}

/**
* Flushes the saving buffer by issuing Cloud SQL save request.
*/
private void flushEntitiesSavingBuffer() {
if (!entitiesSavingBuffer.isEmpty() && !isPreview()) {
log("Checking usage stats in batch..." + entitiesSavingBuffer.size());

// batch query from SQL (migrate if startTime in the datastore but not in sql)
List<Instant> instantList = entitiesSavingBuffer.stream().map(entity -> entity.getStartTime())
.collect(Collectors.toList());

HibernateUtil.beginTransaction();
CriteriaBuilder cb = HibernateUtil.getCriteriaBuilder();
CriteriaQuery<teammates.storage.sqlentity.UsageStatistics> cr = cb.createQuery(
teammates.storage.sqlentity.UsageStatistics.class);
Root<teammates.storage.sqlentity.UsageStatistics> root = cr
.from(teammates.storage.sqlentity.UsageStatistics.class);

cr.select(root).where(root.get("startTime").in(instantList));
TypedQuery<teammates.storage.sqlentity.UsageStatistics> query = HibernateUtil.createQuery(cr);
List<teammates.storage.sqlentity.UsageStatistics> sqlEntitiesFound = query.getResultList();
if (sqlEntitiesFound.size() != entitiesSavingBuffer.size()) {
Set<Instant> foundInstants = sqlEntitiesFound.stream().map(entity -> entity.getStartTime())
.collect(Collectors.toSet());
for (teammates.storage.sqlentity.UsageStatistics entity : entitiesSavingBuffer) {
if (foundInstants.contains(entity.getStartTime())) {
continue;
}
// entity is not found in SQL
log("Migrating missing usage stats: it's start time is: " + entity.getStartTime().toString());
numberOfAffectedEntities.incrementAndGet();
numberOfUpdatedEntities.incrementAndGet();
HibernateUtil.persist(entity);
}
}

HibernateUtil.flushSession();
HibernateUtil.clearSession();
HibernateUtil.commitTransaction();

}
entitiesSavingBuffer.clear();
}

/**
* Saves the cursor position to a file so it can be used in the next run.
*/
private void savePositionOfCursorToFile(Cursor cursor) {
try {
FileHelper.saveFile(
BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor", cursor.toUrlSafe());
} catch (IOException e) {
logError("Fail to save cursor position " + e.getMessage());
}
}

/**
* Reads the cursor position from the saved file.
*
* @return cursor if the file can be properly decoded.
*/
private Optional<Cursor> readPositionOfCursorFromFile() {
try {
String cursorPosition = FileHelper.readFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor");
return Optional.of(Cursor.fromUrlSafe(cursorPosition));
} catch (IOException | IllegalArgumentException e) {
return Optional.empty();
}
}

/**
* Deletes the cursor position file.
*/
private void deleteCursorPositionFile() {
FileHelper.deleteFile(BASE_LOG_URI + this.getClass().getSimpleName() + ".cursor");
}

/**
* Logs a comment.
*/
protected void log(String logLine) {
System.out.println(String.format("%s %s", getLogPrefix(), logLine));

Path logPath = Paths.get(BASE_LOG_URI + this.getClass().getSimpleName() + ".log");
try (OutputStream logFile = Files.newOutputStream(logPath,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
logFile.write((logLine + System.lineSeparator()).getBytes(Const.ENCODING));
} catch (Exception e) {
System.err.println("Error writing log line: " + logLine);
System.err.println(e.getMessage());
}
}

/**
* Logs an error and persists it to the disk.
*/
protected void logError(String logLine) {
System.err.println(logLine);

log("[ERROR]" + logLine);
}

}