Skip to content

Commit

Permalink
[pinpoint-apm#9086] Lightened agentCountJob
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Aug 8, 2022
1 parent 796bd8d commit d8862a6
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 NAVER Corp.
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,37 +13,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.batch.job;

import com.navercorp.pinpoint.web.util.DateTimeUtils;
import com.navercorp.pinpoint.web.vo.AgentCountStatistics;
import com.navercorp.pinpoint.web.vo.ApplicationAgentList;
import com.navercorp.pinpoint.web.vo.ApplicationAgentsList;
import com.navercorp.pinpoint.batch.common.BatchConfiguration;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.service.AgentInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.item.ItemProcessor;

import java.util.List;
import javax.annotation.Nonnull;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* @author Taejin Koo
* @author youngjin.kim2
*/
public class AgentCountProcessor implements ItemProcessor<ApplicationAgentsList, AgentCountStatistics> {
public class AgentCountProcessor implements ItemProcessor<String, Integer> {

@Override
public AgentCountStatistics process(ApplicationAgentsList item) throws Exception {
if (item == null) {
return null;
}

int agentCount = getAgentCount(item.getApplicationAgentLists());
AgentCountStatistics agentCountStatistics = new AgentCountStatistics(agentCount, DateTimeUtils.timestampToStartOfDay(System.currentTimeMillis()));
return agentCountStatistics;
private final Logger logger = LogManager.getLogger(this.getClass());

private final ApplicationIndexDao applicationIndexDao;
private final AgentInfoService agentInfoService;
private final long duration;

public AgentCountProcessor(
ApplicationIndexDao applicationIndexDao,
AgentInfoService agentInfoService,
BatchConfiguration batchConfiguration
) {
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
this.agentInfoService = Objects.requireNonNull(agentInfoService, "agentInfoService");

long durationDays = batchConfiguration.getCleanupInactiveAgentsDurationDays();
this.duration = TimeUnit.DAYS.toMillis(durationDays);
}

private int getAgentCount(List<ApplicationAgentList> applicationAgentLists) {
return applicationAgentLists.stream()
.mapToInt(applicationAgentList -> applicationAgentList.getAgentStatusAndLinks().size())
.sum();
@Override
public Integer process(@Nonnull String applicationName) {
long localCount = applicationIndexDao.selectAgentIds(applicationName)
.stream()
.filter(this::isActive)
.count();
logger.info("Application {} has {} agents", applicationName, localCount);
return Math.toIntExact(localCount);
}

private boolean isActive(String agentId) {
long now = System.currentTimeMillis();
Range range = Range.between(now - duration, now);
return agentInfoService.isActiveAgent(agentId, range);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,50 @@

package com.navercorp.pinpoint.batch.job;

import com.navercorp.pinpoint.web.service.AgentInfoService;
import com.navercorp.pinpoint.web.vo.AgentInfoFilter;
import com.navercorp.pinpoint.web.vo.ApplicationAgentsList;
import com.navercorp.pinpoint.web.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.web.vo.Application;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.LinkedList;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/**
* @author Taejin Koo
* @author youngjin.kim2
*/
public class AgentCountReader implements ItemReader<ApplicationAgentsList>, StepExecutionListener {
public class AgentCountReader implements ItemReader<String>, StepExecutionListener {

private final AgentInfoService agentInfoService;
private final ApplicationIndexDao applicationIndexDao;

private final Queue<ApplicationAgentsList> queue = new LinkedList<>();
private Queue<String> applicationNameQueue;

public AgentCountReader(AgentInfoService agentInfoService) {
this.agentInfoService = Objects.requireNonNull(agentInfoService, "agentInfoService");
public AgentCountReader(ApplicationIndexDao applicationIndexDao) {
this.applicationIndexDao = Objects.requireNonNull(applicationIndexDao, "applicationIndexDao");
}

@Override
public void beforeStep(StepExecution stepExecution) {
long timestamp = System.currentTimeMillis();
ApplicationAgentsList applicationAgentList = agentInfoService.getAllApplicationAgentsList(AgentInfoFilter::accept, timestamp);
queue.add(applicationAgentList);
public void beforeStep(@Nonnull StepExecution stepExecution) {
List<String> applicationNames = applicationIndexDao.selectAllApplicationNames()
.stream()
.map(Application::getName)
.collect(Collectors.toUnmodifiableList());
this.applicationNameQueue = new ConcurrentLinkedQueue<>(applicationNames);
}

@Override
public ApplicationAgentsList read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return queue.poll();
public ExitStatus afterStep(@Nonnull StepExecution stepExecution) {
return null;
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
public String read() {
return applicationNameQueue.poll();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,61 @@
package com.navercorp.pinpoint.batch.job;

import com.navercorp.pinpoint.web.dao.AgentStatisticsDao;
import com.navercorp.pinpoint.web.util.DateTimeUtils;
import com.navercorp.pinpoint.web.vo.AgentCountStatistics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author Taejin Koo
* @author youngjin.kim2
*/
public class AgentCountWriter implements ItemWriter<AgentCountStatistics> {
public class AgentCountWriter implements ItemWriter<Integer>, StepExecutionListener {

private final Logger logger = LogManager.getLogger(this.getClass());
private final AgentStatisticsDao agentStatisticsDao;
private final AtomicInteger count = new AtomicInteger(0);
private final long timestamp = DateTimeUtils.timestampToStartOfDay(System.currentTimeMillis());

public AgentCountWriter(AgentStatisticsDao agentStatisticsDao) {
this.agentStatisticsDao = Objects.requireNonNull(agentStatisticsDao, "agentStatisticsDao");
}

@Override
public void write(List<? extends AgentCountStatistics> items) throws Exception {
if (items.size() == 1) {
AgentCountStatistics agentCountStatistics = items.get(0);
if (agentCountStatistics == null || agentCountStatistics.getAgentCount() < 0) {
throw new JobExecutionException("Bad parameter");
}
boolean success = agentStatisticsDao.insertAgentCount(agentCountStatistics);
if (!success) {
throw new JobExecutionException("insert AgentCount failed.");
}
} else {
throw new JobExecutionException("Bad parameter");
public void beforeStep(@Nonnull StepExecution stepExecution) {}

@Override
public ExitStatus afterStep(@Nonnull StepExecution stepExecution) {
try {
writeCount(count.get());
} catch (JobExecutionException e) {
logger.error("Failed to store agentCount in DB (agentCount: {})", count.get(), e);
return ExitStatus.FAILED;
}
return ExitStatus.COMPLETED;
}

@Override
public void write(List<? extends Integer> items) {
count.getAndAdd(items.stream().mapToInt(el -> el).sum());
}

private void writeCount(int count) throws JobExecutionException {
logger.info("{} agents are alive", count);

AgentCountStatistics statistics = new AgentCountStatistics(count, timestamp);

if (!agentStatisticsDao.insertAgentCount(statistics)) {
throw new JobExecutionException("Failed to insert AgentCountStatistics: " + statistics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
Expand All @@ -26,10 +25,15 @@

<batch:step id="agentCountStep">
<batch:tasklet>
<batch:chunk reader="agentCountReader" processor="agentCountProcessor" writer="agentCountWriter"
commit-interval="1" retry-limit="2">
<batch:chunk
reader="agentCountReader"
processor="agentCountProcessor"
writer="agentCountWriter"
commit-interval="10000"
retry-limit="10"
>
<batch:retryable-exception-classes>
<batch:include class="org.springframework.batch.core.JobExecutionException"/>
<batch:include class="java.lang.Exception"/>
</batch:retryable-exception-classes>
</batch:chunk>
</batch:tasklet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ public List<AgentCountStatistics> selectAgentCount(@RequestParam("from") long fr

return agentCountStatisticsList;
}

@GetMapping(value = "/selectLatestAgentCount")
public List<AgentCountStatistics> selectLatestAgentCount(@RequestParam(value = "size", defaultValue = "1") int size) {
return agentStatisticsService.selectLatestAgentCount(size);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface AgentStatisticsDao {

List<AgentCountStatistics> selectAgentCount(Range range);

List<AgentCountStatistics> selectLatestAgentCount(Integer size);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MemoryAgentStatisticsDao implements AgentStatisticsDao {

private static final Comparator<Long> REVERSE = Collections.reverseOrder(Long::compare);

private final Map<Long, Integer> agentCountPerTime = new TreeMap<>(REVERSE);
private final TreeMap<Long, Integer> agentCountPerTime = new TreeMap<>(REVERSE);

@Override
public boolean insertAgentCount(AgentCountStatistics agentCountStatistics) {
Expand Down Expand Up @@ -66,6 +66,24 @@ public List<AgentCountStatistics> selectAgentCount(Range range) {
return result;
}

@Override
public List<AgentCountStatistics> selectLatestAgentCount(Integer size) {
if (agentCountPerTime.isEmpty()) {
return null;
}

List<AgentCountStatistics> result = new ArrayList<>();

for (Long timestamp: agentCountPerTime.descendingKeySet()) {
Integer agentCount = agentCountPerTime.get(timestamp);
result.add(new AgentCountStatistics(agentCount, timestamp));

if (result.size() >= size) {
break;
}
}

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public List<AgentCountStatistics> selectAgentCount(Range range) {
return sqlSessionTemplate.selectList(NAMESPACE + "selectAgentCount", range);
}

@Override
public List<AgentCountStatistics> selectLatestAgentCount(Integer size) {
return sqlSessionTemplate.selectList(NAMESPACE + "selectLatestAgentCount", size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface AgentStatisticsService {

List<AgentCountStatistics> selectAgentCount(Range range);

List<AgentCountStatistics> selectLatestAgentCount(int size);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public List<AgentCountStatistics> selectAgentCount(Range range) {
return agentStatisticsDao.selectAgentCount(range);
}

@Override
public List<AgentCountStatistics> selectLatestAgentCount(int size) {
return agentStatisticsDao.selectLatestAgentCount(size);
}

}
7 changes: 7 additions & 0 deletions web/src/main/resources/mapper/AgentStaticsMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@
BETWEEN #{fromDateTime} AND #{toDateTime}
</select>

<select id="selectLatestAgentCount" parameterType="java.lang.Integer" resultType="AgentCountStatistics">
SELECT *
FROM agent_statistics
ORDER BY date_val DESC
LIMIT #{value}
</select>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public void simpleTest() {
range = Range.between(currentTime, currentTime + 100);
agentCountStatisticses = dao.selectAgentCount(range);
Assertions.assertEquals(0, agentCountStatisticses.size());

agentCountStatisticses = dao.selectLatestAgentCount(10);
Assertions.assertEquals(10, agentCountStatisticses.size());
}

}

0 comments on commit d8862a6

Please sign in to comment.