Skip to content

Commit

Permalink
EA-2560: Entity migration fixes (#75)
Browse files Browse the repository at this point in the history
* EA-2560: Fix issues encountered during entity migration
  • Loading branch information
ikattey authored May 27, 2021
1 parent 5662427 commit 688dc54
Show file tree
Hide file tree
Showing 23 changed files with 280 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public JobExplorer getJobExplorer() throws Exception {


public void clearRepository() {
logger.debug("Clearing Batch JobRepository...");
logger.info("Clearing Batch JobRepository...");

sequenceRepository.drop();
// casting is safe here, as we know from the constructor what these instances are
Expand All @@ -94,6 +94,6 @@ public void clearRepository() {
((JobInstanceRepository) mongoJobInstanceDao).drop();
((StepExecutionRepository) mongoStepExecutionDao).drop();

logger.debug("Batch JobRepository cleared");
logger.info("Batch JobRepository cleared");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class EntityManagementConfiguration {
@Value("${batch.step.executor.queueSize: 50}")
private int batchStepExecutorQueueSize;

@Value("${batch.step.throttleLimit: 10}")
private int batchStepThrottleLimit;

@Value("${batch.computeMetrics: false}")
private boolean batchComputeMetrics;

Expand Down Expand Up @@ -174,4 +177,8 @@ public boolean isAuthEnabled() {
public String getEnrichmentsMigrationPassword() {
return enrichmentsMigrationPassword;
}

public int getBatchStepThrottleLimit() {
return batchStepThrottleLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ private EntityRecordFields() {

public static final String ENTITY_SAME_AS = "entity.sameAs";
public static final String ENTITY_EXACT_MATCH = "entity.exactMatch";
public static final String ENTITY_TYPE = "entity.type";

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
@dev.morphia.annotations.Entity("EntityRecord")
@Indexes({
@Index(fields = {@Field(ENTITY_EXACT_MATCH)}),
@Index(fields = {@Field(ENTITY_SAME_AS)})
@Index(fields = {@Field(ENTITY_SAME_AS)}),

// temporary index for migration
@Index(fields = @Field(ENTITY_TYPE))
})
@EntityListeners(EntityRecordWatcher.class)
public class EntityRecord {
Expand All @@ -44,6 +47,7 @@ public class EntityRecord {
private Date created;

@JsonIgnore
@Indexed
private Date modified;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ private Map normalizeMapField(Field field, Map fieldValue, Entity entity)
} else if (isMultipleValueStringMap(field)) {
return normalizeMultipleValueMap((Map<String, List<String>>) fieldValue);
} else {
logger.debug("normalization not supported for maps of type: {}", field.getGenericType());
if(logger.isTraceEnabled()) {
logger.trace("normalization not supported for maps of type: {}", field.getGenericType());
}
return fieldValue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ public void retrieveWithContentNegotiationInMozillaShouldBeSuccessful() throws E
String entityId = registeredEntityNode.get("id").asText();
String defaultMozillaAcceptHeader = "ext/html,application/xhtml+xml,application/xml;q=0.9,*/*";
String requestPath = getEntityRequestPath(entityId);
logger.debug("Retrieving entity record /{} with accept header: ", requestPath, defaultMozillaAcceptHeader);
ResultActions resultActions = mockMvc.perform(get(BASE_SERVICE_URL + "/" + requestPath)
.param(WebEntityConstants.QUERY_PARAM_PROFILE, "external")
.accept(defaultMozillaAcceptHeader));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
package eu.europeana.entitymanagement.batch;

import static eu.europeana.entitymanagement.batch.BatchUtils.JOB_RETRY_FAILED_ENTITIES;
import static eu.europeana.entitymanagement.batch.BatchUtils.JOB_UPDATE_ALL_ENTITIES;
import static eu.europeana.entitymanagement.batch.BatchUtils.JOB_UPDATE_METRICS_SPECIFIC_ENTITIES;
import static eu.europeana.entitymanagement.batch.BatchUtils.JOB_UPDATE_SPECIFIC_ENTITIES;
import static eu.europeana.entitymanagement.batch.BatchUtils.STEP_RETRY_FAILED_ENTITIES;
import static eu.europeana.entitymanagement.batch.BatchUtils.STEP_UPDATE_ENTITY;
import static eu.europeana.entitymanagement.common.config.AppConfigConstants.BEAN_JSON_MAPPER;
import static eu.europeana.entitymanagement.common.config.AppConfigConstants.BEAN_STEP_EXECUTOR;
import static eu.europeana.entitymanagement.common.config.AppConfigConstants.SYNC_TASK_EXECUTOR;
import static eu.europeana.entitymanagement.definitions.EntityRecordFields.ENTITY_ID;
import static eu.europeana.entitymanagement.definitions.EntityRecordFields.ENTITY_MODIFIED;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.morphia.query.experimental.filters.Filters;
import eu.europeana.entitymanagement.batch.errorhandling.EntitySkipPolicy;
import eu.europeana.entitymanagement.batch.errorhandling.FailedTaskService;
import eu.europeana.entitymanagement.batch.errorhandling.FailedTaskUtils;
import eu.europeana.entitymanagement.batch.listener.EntityUpdateListener;
Expand All @@ -26,9 +15,9 @@
import eu.europeana.entitymanagement.batch.writer.EntityRecordDatabaseWriter;
import eu.europeana.entitymanagement.common.config.EntityManagementConfiguration;
import eu.europeana.entitymanagement.definitions.model.EntityRecord;
import eu.europeana.entitymanagement.exception.EntityMismatchException;
import eu.europeana.entitymanagement.exception.MetisNotKnownException;
import eu.europeana.entitymanagement.web.service.EntityRecordService;
import java.util.Arrays;
import java.util.Date;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.core.ItemProcessListener;
Expand All @@ -50,6 +39,13 @@
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Date;

import static eu.europeana.entitymanagement.batch.BatchUtils.*;
import static eu.europeana.entitymanagement.common.config.AppConfigConstants.*;
import static eu.europeana.entitymanagement.definitions.EntityRecordFields.*;

@Component
public class BatchEntityUpdateConfig {

Expand Down Expand Up @@ -83,24 +79,26 @@ public class BatchEntityUpdateConfig {

private final int chunkSize;

private final int throttleLimit;

//TODO: Too many dependencies. Split up into multiple classes
@Autowired
public BatchEntityUpdateConfig(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
StepBuilderFactory stepBuilderFactory,
@Qualifier(SPECIFIC_ITEM_ENTITYRECORD_READER) ItemReader<EntityRecord> specificItemReader,
@Qualifier(ALL_ITEM_ENTITYRECORD_READER) ItemReader<EntityRecord> allEntityRecordReader,
@Qualifier(FAILED_TASK_READER) ItemReader<EntityRecord> failedTaskReader,
EntityDereferenceProcessor dereferenceProcessor,
EntityUpdateProcessor entityUpdateProcessor,
@Qualifier(FAILED_TASK_READER) ItemReader<EntityRecord> failedTaskReader,
EntityDereferenceProcessor dereferenceProcessor,
EntityUpdateProcessor entityUpdateProcessor,
EntityMetricsProcessor entityMetricsProcessor,
EntityRecordDatabaseWriter dbWriter,
EntityRecordService entityRecordService,
FailedTaskService failedTaskService,
EntityUpdateListener entityUpdateListener,
@Qualifier(BEAN_STEP_EXECUTOR) TaskExecutor stepThreadPoolExecutor,
@Qualifier(SYNC_TASK_EXECUTOR) TaskExecutor synchronousTaskExecutor,
@Qualifier(BEAN_JSON_MAPPER) ObjectMapper mapper,
EntityManagementConfiguration emConfig) {
EntityRecordDatabaseWriter dbWriter,
EntityRecordService entityRecordService,
FailedTaskService failedTaskService,
EntityUpdateListener entityUpdateListener,
@Qualifier(BEAN_STEP_EXECUTOR) TaskExecutor stepThreadPoolExecutor,
@Qualifier(SYNC_TASK_EXECUTOR) TaskExecutor synchronousTaskExecutor,
@Qualifier(BEAN_JSON_MAPPER) ObjectMapper mapper,
EntityManagementConfiguration emConfig) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.specificItemReader = specificItemReader;
Expand All @@ -117,6 +115,7 @@ public BatchEntityUpdateConfig(JobBuilderFactory jobBuilderFactory,
this.synchronousTaskExecutor = synchronousTaskExecutor;
this.mapper = mapper;
this.chunkSize = emConfig.getBatchChunkSize();
this.throttleLimit = emConfig.getBatchStepThrottleLimit();
}

/**
Expand All @@ -138,7 +137,9 @@ private EntityRecordDatabaseReader specificEntityRecordReader(@Value("#{jobParam
@StepScope
private SynchronizedItemStreamReader<EntityRecord> allEntityRecordReader(@Value("#{jobParameters[currentStartTime]}") Date currentStartTime) {
EntityRecordDatabaseReader reader = new EntityRecordDatabaseReader(entityRecordService, chunkSize,
Filters.lte(ENTITY_MODIFIED, currentStartTime));
Filters.lte(ENTITY_MODIFIED, currentStartTime),
// temp filter during migration. Only fetch records without a consolidated entity
Filters.exists(ENTITY_TYPE).not());

return threadSafeReader(reader);
}
Expand Down Expand Up @@ -192,8 +193,13 @@ private Step updateEntityStep(boolean singleEntity){
(ItemProcessListener<? super EntityRecord, ? super EntityRecord>) entityUpdateListener)
.reader(singleEntity ? specificItemReader : allEntityRecordReader)
.processor(compositeUpdateProcessor())
.faultTolerant()
.skipPolicy(new EntitySkipPolicy())
.skip(EntityMismatchException.class)
.skip(MetisNotKnownException.class)
.writer(dbWriter)
.taskExecutor(singleEntity ? synchronousTaskExecutor : stepThreadPoolExecutor)
.throttleLimit(throttleLimit)
.build();
}

Expand All @@ -205,7 +211,12 @@ private Step retryFailedEntitiesStep() {
.writer(dbWriter)
.listener(
(ItemProcessListener<? super EntityRecord, ? super EntityRecord>) entityUpdateListener)
.faultTolerant()
.skipPolicy(new EntitySkipPolicy())
.skip(EntityMismatchException.class)
.skip(MetisNotKnownException.class)
.taskExecutor(stepThreadPoolExecutor)
.throttleLimit(throttleLimit)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package eu.europeana.entitymanagement.batch.errorhandling;

import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.lang.NonNull;

public class EntitySkipPolicy implements SkipPolicy {

public boolean shouldSkip(@NonNull final Throwable t, final int skipCount) throws SkipLimitExceededException {
// do not fail job because of errors, since we can tackle them later
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void persistFailure(String entityId, Exception e) {
createUpdateFailure(entityId, Instant.now(), e.getMessage(),
ExceptionUtils.getStackTrace(e)));

logger.debug("Persisted update failure to db. entityId={} matched={}, modified={}", entityId,
logger.info("Persisted update failure to db. entityId={} matched={}, modified={}", entityId,
result.getMatchedCount(), result.getModifiedCount());
}

Expand All @@ -59,7 +59,7 @@ public void persistFailureBulk(List<? extends EntityRecord> entityRecords, Excep
Collectors.toList());

BulkWriteResult writeResult = failureRepository.upsertBulk(failures);
logger.debug("Persisted update failures to db: matched={}, modified={}, inserted={}",
logger.info("Persisted update failures to db: matched={}, modified={}, inserted={}",
writeResult.getMatchedCount(), writeResult.getModifiedCount(),
writeResult.getInsertedCount());
}
Expand All @@ -72,7 +72,7 @@ public void persistFailureBulk(List<? extends EntityRecord> entityRecords, Excep
public void removeFailures(List<String> entityIds) {
long removeCount = failureRepository.removeFailures(entityIds);
if(removeCount > 0) {
logger.debug("Removed update failures from db: count={}", removeCount);
logger.info("Removed update failures from db: count={}", removeCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

import eu.europeana.entitymanagement.batch.errorhandling.FailedTaskService;
import eu.europeana.entitymanagement.definitions.model.EntityRecord;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.core.listener.ItemListenerSupport;
Expand All @@ -19,48 +24,66 @@
@Component
public class EntityUpdateListener extends ItemListenerSupport<EntityRecord, EntityRecord> {

private static final Logger logger = LogManager.getLogger(EntityUpdateListener.class);
private static final Logger logger = LogManager.getLogger(EntityUpdateListener.class);

private final FailedTaskService failedTaskService;

private final FailedTaskService failedTaskService;
private Instant startRead, startProcess, startWrite;
private long readDuration, processDuration, writeDuration;

@Autowired
@Override
public void beforeRead() {
if(logger.isDebugEnabled()) {
startRead = Instant.now();
}
}

@Autowired
public EntityUpdateListener(
FailedTaskService failedTaskService) {
this.failedTaskService = failedTaskService;
}

@Override
public void beforeRead() {
logger.trace("beforeRead");
}

@Override
public void afterRead(EntityRecord item) {
logger.debug("afterRead: entityId={}", item.getEntityId());
if(logger.isDebugEnabled()) {
readDuration = Duration.between(startRead, Instant.now()).toMillis();
logger.debug("afterRead: entityId={}; duration={}ms", item.getEntityId(), readDuration);
}
}


@Override
public void beforeProcess(EntityRecord item) {
logger.debug("beforeProcess: entityId={}", item.getEntityId());
if(logger.isDebugEnabled()) {
startProcess = Instant.now();
}
}

@Override
public void afterProcess(EntityRecord item, EntityRecord result) {
logger.debug("afterProcess: entityId={}", item.getEntityId());
if(logger.isDebugEnabled()) {
processDuration = Duration.between(startProcess, Instant.now()).toMillis();
logger.debug("afterProcess: entityId={}; processDuration={}ms", item.getEntityId(), processDuration);
}
}

@Override
public void beforeWrite(@NonNull List<? extends EntityRecord> entityRecords) {
String[] entityIds = getEntityIds(entityRecords);
logger.debug("beforeWrite: entityIds={}", Arrays.toString(entityIds));
startWrite = Instant.now();
}


@Override
public void afterWrite(@NonNull List<? extends EntityRecord> entityRecords) {
String[] entityIds = getEntityIds(entityRecords);
logger.debug("afterWrite: entityIds={}, count={}", Arrays.toString(entityIds), entityIds.length);
if(entityRecords.isEmpty()){
return;
}
String[] entityIds = getEntityIds(entityRecords);
if(logger.isDebugEnabled()) {
writeDuration = Duration.between(startWrite, Instant.now()).toMillis();
logger.info("afterWrite: entityIds={}, count={}; writeDuration={}ms", Arrays.toString(entityIds), entityIds.length, writeDuration);
}

// Remove entries from the FailedTask collection if exists
failedTaskService.removeFailures(Arrays.asList(entityIds));
Expand Down
Loading

0 comments on commit 688dc54

Please sign in to comment.