Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修改了otter的google的jar包依赖,添加了clickhouse的支持 #452

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,9 @@ otter之前开源的一个子项目,开源链接地址:<a href="http://githu
<li>阿里巴巴去Oracle数据迁移同步工具(目标支持MySQL/DRDS):<a href="http://github.com/alibaba/yugong">http://github.com/alibaba/yugong</a></li>
</ol>

<p> </p>
<h1>
<a name="%E9%97%AE%E9%A2%98%E5%8F%8D%E9%A6%88" class="anchor" href="#%E9%97%AE%E9%A2%98%E5%8F%8D%E9%A6%88"><span class="octicon octicon-link"></span></a>问题反馈</h1>
<h3>
<a name="%E6%B3%A8%E6%84%8Fcanalotter-qq%E8%AE%A8%E8%AE%BA%E7%BE%A4%E5%B7%B2%E7%BB%8F%E5%BB%BA%E7%AB%8B%E7%BE%A4%E5%8F%B7161559791-%E6%AC%A2%E8%BF%8E%E5%8A%A0%E5%85%A5%E8%BF%9B%E8%A1%8C%E6%8A%80%E6%9C%AF%E8%AE%A8%E8%AE%BA" class="anchor" href="#%E6%B3%A8%E6%84%8Fcanalotter-qq%E8%AE%A8%E8%AE%BA%E7%BE%A4%E5%B7%B2%E7%BB%8F%E5%BB%BA%E7%AB%8B%E7%BE%A4%E5%8F%B7161559791-%E6%AC%A2%E8%BF%8E%E5%8A%A0%E5%85%A5%E8%BF%9B%E8%A1%8C%E6%8A%80%E6%9C%AF%E8%AE%A8%E8%AE%BA"><span class="octicon octicon-link"></span></a>注意:canal&amp;otter QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</h3>

<p>1. <span>qq交流群: 161559791</span></p>
<p><span>2. </span><span>邮件交流: [email protected]</span></p>
<p><span>3. </span><span>新浪微博: agapple0002</span></p>
<p><span>4. </span><span>报告issue:</span><a href="https://github.com/alibaba/otter/issues">issues</a></p>
<p> </p>
<pre>
【招聘】阿里巴巴中间件团队招聘JAVA高级工程师
岗位主要为技术型内容(非业务部门),阿里中间件整个体系对于未来想在技术上有所沉淀的同学还是非常有帮助的
工作地点:杭州、北京均可. ps. 阿里待遇向来都是不错的,有意者可以QQ、微博私聊.
具体招聘内容:https://job.alibaba.com/zhaopin/position_detail.htm?positionId=32666
这个是基于阿里的otter修改的,因otter的不少包太老,特别是guava ,老的不行,本版本修改了google包的依赖
添加了clickhouse的支持,postsql在完善中
这个版本在公司里已经平稳运行一年,请放心使用
</pre>
6 changes: 6 additions & 0 deletions manager/biz/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<artifactId>ojdbc14</artifactId>
</dependency>

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.24</version>
</dependency>

<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ private DataSource createDataSource(String url, String userName, String password
}
}
// dbcpDs.setValidationQuery("select 1");
} else if (dataMediaType.isClickHouse()) {

dbcpDs.setValidationQuery("select 1");

} else {
logger.error("ERROR ## Unknow database type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.List;
import java.util.Map;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.springframework.beans.factory.InitializingBean;

import com.alibaba.otter.manager.biz.config.channel.ChannelService;
Expand All @@ -45,7 +48,7 @@ public class ArbitrateConfigImpl implements ArbitrateConfig, InitializingBean {
private static final Long DEFAULT_PERIOD = 60 * 1000L;
private Long timeout = DEFAULT_PERIOD;
private RefreshMemoryMirror<Long, Channel> channelCache;
private Map<Long, Long> channelMapping;
private LoadingCache<Long, Long> channelMapping;
private ChannelService channelService;
private NodeService nodeService;
private RefreshMemoryMirror<Long, Node> nodeCache;
Expand All @@ -68,12 +71,12 @@ public Channel findChannel(Long channelId) {
}

public Channel findChannelByPipelineId(Long pipelineId) {
Long channelId = channelMapping.get(pipelineId);
Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
return channelCache.get(channelId);
}

public Pipeline findOppositePipeline(Long pipelineId) {
Long channelId = channelMapping.get(pipelineId);
Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List<Pipeline> pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
Expand All @@ -86,7 +89,7 @@ public Pipeline findOppositePipeline(Long pipelineId) {
}

public Pipeline findPipeline(Long pipelineId) {
Long channelId = channelMapping.get(pipelineId);
Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List<Pipeline> pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
Expand All @@ -100,6 +103,7 @@ public Pipeline findPipeline(Long pipelineId) {

public void afterPropertiesSet() throws Exception {
// 获取一下nid变量
/* delete by liyc
channelMapping = new MapMaker().makeComputingMap(new Function<Long, Long>() {

public Long apply(Long pipelineId) {
Expand All @@ -115,7 +119,22 @@ public Long apply(Long pipelineId) {

}
});
*/
channelMapping = CacheBuilder.newBuilder().build(new CacheLoader<Long, Long>() {

public Long load(Long pipelineId) {
// 处理下pipline -> channel映射关系不存在的情况
Channel channel = channelService.findByPipelineId(pipelineId);
if (channel == null) {
throw new ConfigException("No Such Channel by pipelineId[" + pipelineId + "]");
}

updateMapping(channel, pipelineId);// 排除下自己
channelCache.put(channel.getId(), channel);// 更新下channelCache
return channel.getId();

}
});
channelCache = new RefreshMemoryMirror<Long, Channel>(timeout, new ComputeFunction<Long, Channel>() {

public Channel apply(Long key, Channel oldValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ private DataMedia doToModel(DataMediaDO dataMediaDo) {
DataMedia dataMedia = null;
try {
DataMediaSource dataMediaSource = dataMediaSourceService.findById(dataMediaDo.getDataMediaSourceId());
if (dataMediaSource.getType().isMysql() || dataMediaSource.getType().isOracle()) {
if (dataMediaSource.getType().isMysql()
|| dataMediaSource.getType().isOracle()
|| dataMediaSource.getType().isClickHouse()) {
dataMedia = JsonUtils.unmarshalFromString(dataMediaDo.getProperties(), DbDataMedia.class);
dataMedia.setSource(dataMediaSource);
} else if (dataMediaSource.getType().isNapoli() || dataMediaSource.getType().isMq()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ private DataMediaSource doToModel(DataMediaSourceDO dataMediaSourceDo) {

DataMediaSource dataMediaSource = new DbMediaSource();
try {
if (dataMediaSourceDo.getType().isMysql() || dataMediaSourceDo.getType().isOracle()) {
if (dataMediaSourceDo.getType().isMysql()
|| dataMediaSourceDo.getType().isOracle()
|| dataMediaSourceDo.getType().isClickHouse()) {
dataMediaSource = JsonUtils.unmarshalFromString(dataMediaSourceDo.getProperties(), DbMediaSource.class);
} else if (dataMediaSourceDo.getType().isNapoli() || dataMediaSourceDo.getType().isMq()) {
dataMediaSource = JsonUtils.unmarshalFromString(dataMediaSourceDo.getProperties(), MqMediaSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.alibaba.otter.manager.biz.monitor.AlarmRecovery;
import com.alibaba.otter.shared.common.model.config.alarm.AlarmRule;
import com.alibaba.otter.shared.common.model.config.alarm.MonitorName;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.MapMaker;

/**
Expand All @@ -34,7 +36,10 @@ public class DefaultAlarmController implements AlarmController {

// seconds
private Long DEFAULT_THRESHOLD = 1800L;
private Map<PoolKey, PoolValue> pool = new MapMaker().expireAfterWrite(1, TimeUnit.HOURS).makeMap();
// private Map<PoolKey, PoolValue> pool = new MapMaker().expireAfterWrite(1, TimeUnit.HOURS).makeMap();
//add by liyc
private Cache<PoolKey, PoolValue> pool = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build();

private AlarmRecovery restartAlarmRecovery;

@Override
Expand All @@ -48,7 +53,7 @@ public AlarmMessage control(AlarmRule rule, String message, AlarmMessage data) {
Long threshold = rule.getIntervalTime() == null ? DEFAULT_THRESHOLD : rule.getIntervalTime();

PoolKey key = new PoolKey(rule, message, data);
PoolValue value = pool.get(key);
PoolValue value = pool.getIfPresent(key); //edit by liyc
boolean needAlarm = true;

Long now = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import com.alibaba.otter.manager.biz.remote.NodeRemoteService;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.google.common.base.Function;
import com.google.common.collect.GenericMapMaker;
import com.google.common.collect.MapEvictionListener;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
//import com.google.common.collect.GenericMapMaker;
//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;

/**
Expand All @@ -47,15 +50,15 @@ public class NodeMBeanServiceImpl implements NodeRemoteService {
private static final String SERVICE_URL = "service:jmx:rmi://{0}/jndi/rmi://{0}:{1}/mbean";
private ObjectName objectName;
private NodeService nodeService;
private Map<Long, MBeanServerConnection> mbeanServers;
private LoadingCache<Long, MBeanServerConnection> mbeanServers;

public NodeMBeanServiceImpl(){
try {
objectName = new ObjectName(MBEAN_NAME);
} catch (Exception e) {
throw new ManagerException(e);
}

/*
GenericMapMaker mapMaker = null;
mapMaker = new MapMaker().expireAfterAccess(5, TimeUnit.MINUTES)
.softValues()
Expand Down Expand Up @@ -94,6 +97,36 @@ public MBeanServerConnection apply(Long nid) {
}

});
*/
mbeanServers= CacheBuilder.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.build(new CacheLoader<Long, MBeanServerConnection>() {
public MBeanServerConnection load(Long nid) {
Node node = nodeService.findById(nid);
String ip = node.getIp();
if (node.getParameters().getUseExternalIp()) {
ip = node.getParameters().getExternalIp();
}

int port = node.getPort().intValue() + 1;
Integer mbeanPort = node.getParameters().getMbeanPort();
if (mbeanPort != null && mbeanPort != 0) {// 做个兼容处理,<=4.2.2版本没有mbeanPort设置
port = mbeanPort;
}

try {
JMXServiceURL serviceURL = new JMXServiceURL(MessageFormat.format(SERVICE_URL,
ip,
String.valueOf(port)));
JMXConnector cntor = JMXConnectorFactory.connect(serviceURL, null);
MBeanServerConnection mbsc = cntor.getMBeanServerConnection();
return mbsc;
} catch (Exception e) {
throw new ManagerException(e);
}
}

});
}

public String getHeapMemoryUsage(Long nid) {
Expand Down Expand Up @@ -127,7 +160,8 @@ public void setProfile(Long nid, boolean profile) {
new Object[] { profile },
new String[] { "java.lang.Boolean" });
} catch (Exception e) {
mbeanServers.remove(nid);
//mbeanServers.remove(nid); edit by liyc
mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
Expand All @@ -139,7 +173,8 @@ public void setThreadPoolSize(Long nid, int size) {
new Object[] { size },
new String[] { "java.lang.Integer" });
} catch (Exception e) {
mbeanServers.remove(nid);
//mbeanServers.remove(nid);edit by liyc
mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
Expand Down Expand Up @@ -200,7 +235,8 @@ private Object getAttribute(Long nid, String attribute) {
try {
return mbeanServers.get(nid).getAttribute(objectName, attribute);
} catch (Exception e) {
mbeanServers.remove(nid);
//mbeanServers.remove(nid);edit by liyc
mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
Expand All @@ -212,7 +248,8 @@ private Object invoke(Long nid, Long pipelineId, String method) {
new Object[] { pipelineId },
new String[] { "java.lang.Long" });
} catch (Exception e) {
mbeanServers.remove(nid);
//mbeanServers.remove(nid);edit by liyc
mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -61,15 +64,16 @@ public class StatsRemoteServiceImpl implements StatsRemoteService {
private ThroughputStatService throughputStatService;
private Long statUnit = 60 * 1000L; //统计周期,默认60秒
private ScheduledThreadPoolExecutor scheduler;
private Map<Long, AvgStat> delayStats;
private Map<Long, Map<ThroughputType, ThroughputStat>> throughputStats;
private LoadingCache<Long, AvgStat> delayStats;
private LoadingCache<Long, Map<ThroughputType, ThroughputStat>> throughputStats;

public StatsRemoteServiceImpl(){
// 注册一下事件处理
CommunicationRegistry.regist(StatisticsEventType.delayCount, this);
CommunicationRegistry.regist(StatisticsEventType.tableStat, this);
CommunicationRegistry.regist(StatisticsEventType.throughputStat, this);

/* delete by liyc
delayStats = new MapMaker().makeComputingMap(new Function<Long, AvgStat>() {

public AvgStat apply(Long pipelineId) {
Expand All @@ -82,7 +86,20 @@ public Map<ThroughputType, ThroughputStat> apply(Long pipelineId) {
return new HashMap<ThroughputType, ThroughputStat>();
}
});
*/
delayStats = CacheBuilder.newBuilder().build(new CacheLoader<Long, AvgStat>() {

public AvgStat load(Long pipelineId) {
return new AvgStat();
}
});

throughputStats = CacheBuilder.newBuilder().build(new CacheLoader<Long, Map<ThroughputType, ThroughputStat>>() {

public Map<ThroughputType, ThroughputStat> load(Long pipelineId) {
return new HashMap<ThroughputType, ThroughputStat>();
}
});
scheduler = new ScheduledThreadPoolExecutor(DEFAULT_POOL, new NamedThreadFactory("Otter-Statistics-Server"),
new ThreadPoolExecutor.CallerRunsPolicy());
if (statUnit > 0) {
Expand Down Expand Up @@ -126,7 +143,7 @@ public void onDelayCount(DelayCountEvent event) {
delayStatService.createDelayStat(stat);
} else {
synchronized (delayStats) {
delayStats.get(count.getPipelineId()).merge(stat);
delayStats.getUnchecked(count.getPipelineId()).merge(stat); //edit by liyc
}
}
}
Expand All @@ -141,7 +158,7 @@ public void onThroughputStat(ThroughputStatEvent event) {
} else {
synchronized (throughputStats) {
for (ThroughputStat stat : event.getStats()) {
Map<ThroughputType, ThroughputStat> data = throughputStats.get(stat.getPipelineId());
Map<ThroughputType, ThroughputStat> data = throughputStats.getUnchecked(stat.getPipelineId()); //edit by liyc
ThroughputStat old = data.get(stat.getType());
if (old != null) {
//执行合并
Expand Down Expand Up @@ -173,7 +190,7 @@ public void onTableStat(TableStatEvent event) {
private void flushDelayStat() {
synchronized (delayStats) {
// 需要做同步,避免delay数据丢失
for (Map.Entry<Long, AvgStat> stat : delayStats.entrySet()) {
for (Map.Entry<Long, AvgStat> stat : delayStats.asMap().entrySet()) { //edit by liyc
if (stat.getValue().count.get() > 0) {
DelayStat delay = new DelayStat();
delay.setPipelineId(stat.getKey());
Expand All @@ -182,19 +199,19 @@ private void flushDelayStat() {
delayStatService.createDelayStat(delay);
}
}
delayStats.clear();
delayStats.cleanUp(); //edit by liyc
}
}

private void flushThroughputStat() {
synchronized (throughputStats) {
Collection<Map<ThroughputType, ThroughputStat>> stats = throughputStats.values();
Collection<Map<ThroughputType, ThroughputStat>> stats = throughputStats.asMap().values(); //edit by liyc
for (Map<ThroughputType, ThroughputStat> stat : stats) {
for (ThroughputStat data : stat.values()) {
throughputStatService.createOrUpdateThroughput(data);
}
}
throughputStats.clear();
throughputStats.cleanUp(); //edit by liyc
}
}

Expand Down
Loading