Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#9052] Improve AlarmJob performance #9077

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,144 @@
package com.navercorp.pinpoint.batch.alarm;

import com.navercorp.pinpoint.batch.alarm.checker.AlarmChecker;
import com.navercorp.pinpoint.batch.alarm.collector.DataCollector;
import com.navercorp.pinpoint.batch.alarm.vo.AppAlarmChecker;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.web.alarm.CheckerCategory;
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.alarm.vo.Rule;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.service.AgentInfoService;
import com.navercorp.pinpoint.web.service.AlarmService;
import com.navercorp.pinpoint.web.vo.Application;
import org.springframework.batch.item.ItemProcessor;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author minwoo.jung
*/
public class AlarmProcessor implements ItemProcessor<AlarmChecker<?>, AlarmChecker<?>> {

public AlarmChecker<?> process(AlarmChecker<?> checker) {
checker.check();
return checker;
public class AlarmProcessor implements ItemProcessor<Application, AppAlarmChecker> {

private static final long activeDuration = TimeUnit.MINUTES.toMillis(5);

private final AlarmService alarmService;

private final DataCollectorFactory dataCollectorFactory;

private final ApplicationIndexDao applicationIndexDao;

private final AgentInfoService agentInfoService;

public AlarmProcessor(
DataCollectorFactory dataCollectorFactory,
AlarmService alarmService,
ApplicationIndexDao applicationIndexDao,
AgentInfoService agentInfoService
) {
this.dataCollectorFactory = Objects.requireNonNull(dataCollectorFactory, "dataCollectorFactory");
this.alarmService = Objects.requireNonNull(alarmService, "alarmService");
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
this.agentInfoService = Objects.requireNonNull(agentInfoService, "agentInfoService");
}

public AppAlarmChecker process(@Nonnull Application application) {
List<AlarmChecker<?>> checkers = getAlarmCheckers(application);
if (CollectionUtils.isEmpty(checkers)) {
return null;
}

AppAlarmChecker appChecker = new AppAlarmChecker(checkers);
appChecker.check();

return appChecker;
}

private List<AlarmChecker<?>> getAlarmCheckers(Application application) {
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
List<AlarmChecker<?>> checkers = new ArrayList<>(rules.size());

long now = System.currentTimeMillis();
List<String> agentIds = prepareActiveAgentIds(application, rules, now);

RuleTransformer transformer = new RuleTransformer(application, agentIds, now, dataCollectorFactory);
for (Rule rule: rules) {
checkers.add(transformer.apply(rule));
}

return checkers;
}

@Nullable
private List<String> prepareActiveAgentIds(Application application, List<Rule> rules, long now) {
Range activeRange = Range.between(now - activeDuration, now);
List<String> agentIds = null;
if (isRequireAgentList(rules)) {
agentIds = fetchActiveAgents(application.getName(), activeRange);
}
return agentIds;
}

private static boolean isRequireAgentList(List<Rule> rules) {
return rules.stream()
.anyMatch(rule ->
CheckerCategory.getValue(rule.getCheckerName())
.getDataCollectorCategory()
.isRequireAgentList()
);
}

private List<String> fetchActiveAgents(String applicationId, Range activeRange) {
return applicationIndexDao.selectAgentIds(applicationId)
.stream()
.filter(id -> agentInfoService.isActiveAgent(id, activeRange))
.collect(Collectors.toUnmodifiableList());
}

private static class RuleTransformer implements Function<Rule, AlarmChecker<?>> {

private static final CheckerRegistry checkerRegistry = CheckerRegistry.newCheckerRegistry();

private final long timeSlotEndTime;
private final Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();

private final Application application;
private final List<String> agentIds;
private final DataCollectorFactory dataCollectorFactory;

public RuleTransformer(
Application application,
List<String> agentIds,
long timeSlotEndTime,
DataCollectorFactory dataCollectorFactory
) {
this.application = application;
this.agentIds = agentIds;
this.timeSlotEndTime = timeSlotEndTime;
this.dataCollectorFactory = dataCollectorFactory;
}

@Override
public AlarmChecker<?> apply(Rule rule) {
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());

DataCollector collector = collectorMap.computeIfAbsent(
checkerCategory.getDataCollectorCategory(),
k -> dataCollectorFactory.createDataCollector(
checkerCategory, application, agentIds, timeSlotEndTime
)
);

return checkerRegistry
.getCheckerFactory(checkerCategory)
.createChecker(collector, rule);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package com.navercorp.pinpoint.batch.alarm;

import com.navercorp.pinpoint.batch.alarm.checker.AlarmChecker;
import com.navercorp.pinpoint.batch.alarm.collector.DataCollector;
import com.navercorp.pinpoint.web.alarm.CheckerCategory;
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.alarm.vo.Rule;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.service.AlarmService;
import com.navercorp.pinpoint.web.vo.Application;
Expand All @@ -29,65 +24,52 @@
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemReader;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @author minwoo.jung
*/
public class AlarmReader implements ItemReader<AlarmChecker<?>>, StepExecutionListener {

private final DataCollectorFactory dataCollectorFactory;

public class AlarmReader implements ItemReader<Application>, StepExecutionListener {

private final ApplicationIndexDao applicationIndexDao;

private final AlarmService alarmService;

private final Queue<AlarmChecker<?>> checkers = new ConcurrentLinkedDeque<>();

private final CheckerRegistry checkerRegistry = CheckerRegistry.newCheckerRegistry();
private Queue<Application> applicationQueue;

public AlarmReader(DataCollectorFactory dataCollectorFactory, ApplicationIndexDao applicationIndexDao, AlarmService alarmService) {
this.dataCollectorFactory = Objects.requireNonNull(dataCollectorFactory, "dataCollectorFactory");
public AlarmReader(ApplicationIndexDao applicationIndexDao, AlarmService alarmService) {
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
this.alarmService = Objects.requireNonNull(alarmService, "alarmService");
}

public AlarmChecker<?> read() {
return checkers.poll();
public Application read() {
return applicationQueue.poll();
}

@Override
public void beforeStep(StepExecution stepExecution) {
List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();

for (Application application : applicationList) {
addChecker(application);
}
public void beforeStep(@Nonnull StepExecution stepExecution) {
this.applicationQueue = new ConcurrentLinkedQueue<>(fetchApplications());
}

private void addChecker(Application application) {
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
long timeSlotEndTime = System.currentTimeMillis();
Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();

for (Rule rule : rules) {
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
AlarmCheckerFactory factory = checkerRegistry.getCheckerFactory(checkerCategory);
DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
if (collector == null) {
collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
collectorMap.put(collector.getDataCollectorCategory(), collector);
private List<Application> fetchApplications() {
List<Application> applications = applicationIndexDao.selectAllApplicationNames();
List<String> validApplicationIds = alarmService.selectApplicationId();

List<Application> validApplications = new ArrayList<>(applications.size());
for (Application application: applications) {
if (validApplicationIds.contains(application.getName())) {
validApplications.add(application);
}

AlarmChecker<?> checker = factory.createChecker(collector, rule);
checkers.add(checker);
}

return validApplications;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
public ExitStatus afterStep(@Nonnull StepExecution stepExecution) {
return null;
}
}
Loading