Skip to content

Commit

Permalink
[#9052] Improve AlarmJob performance
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Jul 29, 2022
1 parent a66193c commit 691cf0e
Show file tree
Hide file tree
Showing 27 changed files with 586 additions and 457 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,144 @@
package com.navercorp.pinpoint.batch.alarm;

import com.navercorp.pinpoint.batch.alarm.checker.AlarmChecker;
import com.navercorp.pinpoint.batch.alarm.collector.DataCollector;
import com.navercorp.pinpoint.batch.alarm.vo.AppAlarmChecker;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.web.alarm.CheckerCategory;
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.alarm.vo.Rule;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.service.AgentInfoService;
import com.navercorp.pinpoint.web.service.AlarmService;
import com.navercorp.pinpoint.web.vo.Application;
import org.springframework.batch.item.ItemProcessor;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author minwoo.jung
*/
public class AlarmProcessor implements ItemProcessor<AlarmChecker<?>, AlarmChecker<?>> {

public AlarmChecker<?> process(AlarmChecker<?> checker) {
checker.check();
return checker;
public class AlarmProcessor implements ItemProcessor<Application, AppAlarmChecker> {

private static final long activeDuration = TimeUnit.MINUTES.toMillis(5);

private final AlarmService alarmService;

private final DataCollectorFactory dataCollectorFactory;

private final ApplicationIndexDao applicationIndexDao;

private final AgentInfoService agentInfoService;

public AlarmProcessor(
DataCollectorFactory dataCollectorFactory,
AlarmService alarmService,
ApplicationIndexDao applicationIndexDao,
AgentInfoService agentInfoService
) {
this.dataCollectorFactory = Objects.requireNonNull(dataCollectorFactory, "dataCollectorFactory");
this.alarmService = Objects.requireNonNull(alarmService, "alarmService");
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
this.agentInfoService = Objects.requireNonNull(agentInfoService, "agentInfoService");
}

public AppAlarmChecker process(@Nonnull Application application) {
List<AlarmChecker<?>> checkers = getAlarmCheckers(application);
if (CollectionUtils.isEmpty(checkers)) {
return null;
}

AppAlarmChecker appChecker = new AppAlarmChecker(checkers);
appChecker.check();

return appChecker;
}

private List<AlarmChecker<?>> getAlarmCheckers(Application application) {
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
List<AlarmChecker<?>> checkers = new ArrayList<>(rules.size());

long now = System.currentTimeMillis();
List<String> agentIds = prepareActiveAgentIds(application, rules, now);

RuleTransformer transformer = new RuleTransformer(application, agentIds, now, dataCollectorFactory);
for (Rule rule: rules) {
checkers.add(transformer.apply(rule));
}

return checkers;
}

@Nullable
private List<String> prepareActiveAgentIds(Application application, List<Rule> rules, long now) {
Range activeRange = Range.between(now - activeDuration, now);
List<String> agentIds = null;
if (isRequireAgentList(rules)) {
agentIds = fetchActiveAgents(application.getName(), activeRange);
}
return agentIds;
}

private static boolean isRequireAgentList(List<Rule> rules) {
return rules.stream()
.anyMatch(rule ->
CheckerCategory.getValue(rule.getCheckerName())
.getDataCollectorCategory()
.isRequireAgentList()
);
}

private List<String> fetchActiveAgents(String applicationId, Range activeRange) {
return applicationIndexDao.selectAgentIds(applicationId)
.stream()
.filter(id -> agentInfoService.isActiveAgent(id, activeRange))
.collect(Collectors.toUnmodifiableList());
}

private static class RuleTransformer implements Function<Rule, AlarmChecker<?>> {

private static final CheckerRegistry checkerRegistry = CheckerRegistry.newCheckerRegistry();

private final long timeSlotEndTime;
private final Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();

private final Application application;
private final List<String> agentIds;
private final DataCollectorFactory dataCollectorFactory;

public RuleTransformer(
Application application,
List<String> agentIds,
long timeSlotEndTime,
DataCollectorFactory dataCollectorFactory
) {
this.application = application;
this.agentIds = agentIds;
this.timeSlotEndTime = timeSlotEndTime;
this.dataCollectorFactory = dataCollectorFactory;
}

@Override
public AlarmChecker<?> apply(Rule rule) {
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());

DataCollector collector = collectorMap.computeIfAbsent(
checkerCategory.getDataCollectorCategory(),
k -> dataCollectorFactory.createDataCollector(
checkerCategory, application, agentIds, timeSlotEndTime
)
);

return checkerRegistry
.getCheckerFactory(checkerCategory)
.createChecker(collector, rule);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package com.navercorp.pinpoint.batch.alarm;

import com.navercorp.pinpoint.batch.alarm.checker.AlarmChecker;
import com.navercorp.pinpoint.batch.alarm.collector.DataCollector;
import com.navercorp.pinpoint.web.alarm.CheckerCategory;
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.alarm.vo.Rule;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.service.AlarmService;
import com.navercorp.pinpoint.web.vo.Application;
Expand All @@ -29,65 +24,52 @@
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemReader;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @author minwoo.jung
*/
public class AlarmReader implements ItemReader<AlarmChecker<?>>, StepExecutionListener {

private final DataCollectorFactory dataCollectorFactory;

public class AlarmReader implements ItemReader<Application>, StepExecutionListener {

private final ApplicationIndexDao applicationIndexDao;

private final AlarmService alarmService;

private final Queue<AlarmChecker<?>> checkers = new ConcurrentLinkedDeque<>();

private final CheckerRegistry checkerRegistry = CheckerRegistry.newCheckerRegistry();
private Queue<Application> applicationQueue;

public AlarmReader(DataCollectorFactory dataCollectorFactory, ApplicationIndexDao applicationIndexDao, AlarmService alarmService) {
this.dataCollectorFactory = Objects.requireNonNull(dataCollectorFactory, "dataCollectorFactory");
public AlarmReader(ApplicationIndexDao applicationIndexDao, AlarmService alarmService) {
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
this.alarmService = Objects.requireNonNull(alarmService, "alarmService");
}

public AlarmChecker<?> read() {
return checkers.poll();
public Application read() {
return applicationQueue.poll();
}

@Override
public void beforeStep(StepExecution stepExecution) {
List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();

for (Application application : applicationList) {
addChecker(application);
}
public void beforeStep(@Nonnull StepExecution stepExecution) {
this.applicationQueue = new ConcurrentLinkedQueue<>(fetchApplications());
}

private void addChecker(Application application) {
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
long timeSlotEndTime = System.currentTimeMillis();
Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();

for (Rule rule : rules) {
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
AlarmCheckerFactory factory = checkerRegistry.getCheckerFactory(checkerCategory);
DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
if (collector == null) {
collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
collectorMap.put(collector.getDataCollectorCategory(), collector);
private List<Application> fetchApplications() {
List<Application> applications = applicationIndexDao.selectAllApplicationNames();
List<String> validApplicationIds = alarmService.selectApplicationId();

List<Application> validApplications = new ArrayList<>(applications.size());
for (Application application: applications) {
if (validApplicationIds.contains(application.getName())) {
validApplications.add(application);
}

AlarmChecker<?> checker = factory.createChecker(collector, rule);
checkers.add(checker);
}

return validApplications;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
public ExitStatus afterStep(@Nonnull StepExecution stepExecution) {
return null;
}
}
Loading

0 comments on commit 691cf0e

Please sign in to comment.