Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于建临时表->灌数据->drop原表->改名为原表的动作 #807

Closed
welyss opened this issue Aug 22, 2019 · 5 comments
Closed

关于建临时表->灌数据->drop原表->改名为原表的动作 #807

welyss opened this issue Aug 22, 2019 · 5 comments

Comments

@welyss
Copy link

welyss commented Aug 22, 2019

建临时表->灌数据->drop原表->改名为原表的动作,自己这边生产环境偶尔会挂起,查了下原因,应该是灌临时表时,想要获取临时表的meta信息的时候回查源库时,已经被改名为原表,导致临时表表名回查失败。

===============

看了下canal的解决方案,貌似可以用tsdb保留meta信息,想问下otter里现在支持用这个吗,tsdb是普通数据库吗,我看了下相关xml文件,好像需要配置专用的数据源,otter的配置界面里只找到开关,没看到相关数据源的配置,有什么办法解决吗

===============

PS:发现select节点和load节点都会有类似错误发生,如果是tsdb的话,select节点和load节点都有相应的tsdb来维护吗,还是只是select节点维护,load会通过其他方式问select要?

@agapple
Copy link
Member

agapple commented Aug 28, 2019

主要是select节点来维护,otter支持canal开启tsdb

@agapple agapple closed this as completed Aug 28, 2019
@welyss
Copy link
Author

welyss commented Aug 28, 2019

这两天调查了下,发现setl中etl三个步骤里都有回查逻辑,整个链路上我测了下,3个相关类中findtable函数都有可能出错,原因就是回查数据库时,目标表已经被删除了。由于公司项目依赖第三方数据库,第三方库操作流程不可控,只好想其他办法解决。
思路是跟canal差不多,在抓到建表的ddl后,sql解析构成元数据信息灌入到cache里(canal好像是直接落地了),这样至少服务运行中回查时可以查到相关丢失的元数据信息。
结果改造过程中发现,etl过程是异步的,测试下来,挺大概率insert,update会在建表的ddl之前优先触发findtable函数导致找不到而中断任务。这个牵涉到架构了,也不好调整,只能再想办法绕开,目前最后调试成功的思路是:
1 抓到建表DDL后,sql解析灌入cache
2 findtable时如果由于异步处理顺序被打乱,会报错,如果报错,捕获异常,线程等待N秒后回调findtable,尝试M次后如果还是不行就走原来逻辑中断任务。

修改代码后,自己测试已经通过还没上生产

@agapple
Copy link
Member

agapple commented Aug 28, 2019

后端的etl可以选择关闭反查表结构的动作,前提是源库和目标是一致的结构

@welyss
Copy link
Author

welyss commented Aug 29, 2019

这样啊,我们这边源库目标库是一致的,具体怎么设置,我跟了下源码,不知道我理解的对不对

===============================================================

E阶段的DatabaseExtractor.java类:

            if (isRow && !flag) {
                // 提前判断一次,避免进入多线程进行竞争
                // 针对view视图的情况,会有后续再判断一次
                flag = checkNeedDbForRowMode(pipeline, eventData);
            }
  • 这里像是channel设置里的【同步模式:行记录同步】+【是否开启数据一致性:是】+【同步一致性:基于数据库反查】可以跳过这段检查,是不是只有一致性反查模式才行?

===============================================================

T阶段的RowDataTransformer.java类:

        TableInfoHolder tableHolder = null;
        if (useTableTransform || enableCompatibleMissColumn) {// 控制一下是否需要反查table
                                                              // meta信息,如果同构数据库,完全没必要反查
            // 获取目标库的表信息
            DbDialect dbDialect = dbDialectFactory.getDbDialect(dataMediaPair.getPipelineId(),
                (DbMediaSource) dataMedia.getSource());

            Table table = findTableRetry(dbDialect, result.getSchemaName(), result.getTableName(), 3);
            tableHolder = new TableInfoHolder(table, useTableTransform, enableCompatibleMissColumn);
        }
  • 这里是pipeline设置里的【跳过反查无记录数据】和【启用数据表类型转化】开启任意一个,这里需要两个都关闭?

===============================================================

我用:
【同步模式:行记录同步】
【是否开启数据一致性:是】
【同步一致性:基于数据库反查】
【跳过反查无记录数据:关闭】
【启用数据表类型转化:关闭】
的设置版本:v4.2.17跑了下,报错信息是:

pid:1 nid:1 exception:setl:com.alibaba.otter.node.etl.extract.exceptions.ExtractException: java.util.concurrent.ExecutionException: com.google.common.collect.ComputationException: org.apache.commons.lang.exception.NestableRuntimeException: find table [test.tmp_windtest] error 
Caused by: java.util.concurrent.ExecutionException: com.google.common.collect.ComputationException: org.apache.commons.lang.exception.NestableRuntimeException: find table [test.tmp_windtest] error 
at java.util.concurrent.FutureTask.report(Unknown Source) 
at java.util.concurrent.FutureTask.get(Unknown Source) 
at com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor.extract(DatabaseExtractor.java:160) 
at com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor.extract(DatabaseExtractor.java:81) 
at com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor$$FastClassByCGLIB$$f79e22c6.invoke() 
at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:191) 
at org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:618) 
at com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor$$EnhancerByCGLIB$$de7bc45b.extract() 
at com.alibaba.otter.node.etl.extract.extractor.OtterExtractorFactory.extract(OtterExtractorFactory.java:50) 
at com.alibaba.otter.node.etl.extract.ExtractTask$1.run(ExtractTask.java:79) 
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
at java.lang.Thread.run(Unknown Source) 
Caused by: com.google.common.collect.ComputationException: org.apache.commons.lang.exception.NestableRuntimeException: find table [test.tmp_windtest] error 
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889) 
at com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect.findTable(AbstractDbDialect.java:109) 
at com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect.findTable(AbstractDbDialect.java:113) 
at com.alibaba.otter.node.etl.extract.extractor.DatabaseExtractor$DatabaseExtractWorker.run(DatabaseExtractor.java:305) 
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
at java.util.concurrent.FutureTask.run(Unknown Source) 
... 5 more 
Caused by: org.apache.commons.lang.exception.NestableRuntimeException: find table [test.tmp_windtest] error 
at com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect$2.apply(AbstractDbDialect.java:188) 
at com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect$2.apply(AbstractDbDialect.java:172) 
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356) 
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182) 
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151) 
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67) 
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885) 
... 10 more 
Caused by: org.apache.commons.lang.exception.NestableRuntimeException: no found table [test.tmp_windtest] , pls check 
at com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect$2.apply(AbstractDbDialect.java:182) 
... 16 more

执行的sql语句是:

CREATE TABLE `tmp_windtest` (
  `id` int(11) NOT NULL auto_increment,
  `fname` varchar(30) DEFAULT NULL,
  `lname` varchar(30) DEFAULT NULL,
  `store_id` bigint DEFAULT NULL,
  primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tmp_windtest values(1, 'a', 'a', 1);
insert into tmp_windtest values(2, 'b', 'b', 2);
insert into tmp_windtest values(3, 'a', 'a', 1);
insert into tmp_windtest values(4, 'b', 'b', 2);
insert into tmp_windtest values(5, 'a', 'a', 1);
insert into tmp_windtest values(6, 'b', 'b', 2);

alter table `tmp_windtest` rename `windtest`;


CREATE TABLE `tmp_windtest1` (
  `id` int(11) NOT NULL auto_increment,
  `fname` varchar(30) DEFAULT NULL,
  `lname` varchar(30) DEFAULT NULL,
  `store_id` bigint DEFAULT NULL,
  primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tmp_windtest1 values(1, 'a', 'a', 1);
insert into tmp_windtest1 values(2, 'b', 'b', 2);

alter table `tmp_windtest1` rename `windtest1`;


CREATE TABLE `tmp_windtest2` (
  `id` int(11) NOT NULL auto_increment,
  `fname` varchar(30) DEFAULT NULL,
  `lname` varchar(30) DEFAULT NULL,
  `store_id` bigint DEFAULT NULL,
  primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tmp_windtest2 values(1, 'a', 'a', 1);
insert into tmp_windtest2 values(2, 'b', 'b', 2);

alter table `tmp_windtest2` rename `windtest2`;

CREATE TABLE `tmp_windtest3` (
  `id` int(11) NOT NULL auto_increment,
  `fname` varchar(30) DEFAULT NULL,
  `lname` varchar(30) DEFAULT NULL,
  `store_id` bigint DEFAULT NULL,
  primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tmp_windtest3 values(1, 'a', 'a', 1);
insert into tmp_windtest3 values(2, 'b', 'b', 2);

alter table `tmp_windtest3` rename `windtest3`;

CREATE TABLE `tmp_windtest4` (
  `id` int(11) NOT NULL auto_increment,
  `fname` varchar(30) DEFAULT NULL,
  `lname` varchar(30) DEFAULT NULL,
  `store_id` bigint DEFAULT NULL,
  primary key(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tmp_windtest4 values(1, 'a', 'a', 1);
insert into tmp_windtest4 values(2, 'b', 'b', 2);

alter table `tmp_windtest4` rename `windtest4`;


应该是DatabaseExtractor.java文件里DatabaseExtractWorker类的run函数里的

                // 获取数据表信息
                DataMedia dataMedia = ConfigHelper.findDataMedia(pipeline, eventData.getTableId());
                DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
                    (DbMediaSource) dataMedia.getSource());
                Table table = dbDialect.findTable(eventData.getSchemaName(), eventData.getTableName());

这里的findtable函数报的,是我漏了什么设置项吗?

PS:另外之前在L阶段的DbLoadAction.java类里的findtable函数好像也有报过,那边好像没看到可以跳过的判断,但是L阶段应该是目标库上找表信息了,理论上如果是顺序执行的话,应该不会找不到,不过这个类里有一段代码好像判断逻辑不太对,不知道会不会影响到这个逻辑:

                Boolean result = dbDialect.getJdbcTemplate().execute(new StatementCallback<Boolean>() {

                    public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessException {
                        Boolean result = true;
                        if (dbDialect instanceof MysqlDialect && StringUtils.isNotEmpty(data.getDdlSchemaName())) {
                            // 如果mysql,执行ddl时,切换到在源库执行的schema上
                            // result &= stmt.execute("use " +
                            // data.getDdlSchemaName());

                            // 解决当数据库名称为关键字如"Order"的时候,会报错,无法同步
                            result &= stmt.execute("use `" + data.getDdlSchemaName() + "`");
                        }
                        result &= stmt.execute(data.getSql());
                        return result;
                    }
                });
                if (result) {
                    context.getProcessedDatas().add(data); // 记录为成功处理的sql
                } else {
                    context.getFailedDatas().add(data);
                }
  • 这里的stmt.execute返回结果是resultset时才返回true,其他情况都是false,ddl语句一般都是没有结果集的,这里的result在语句成功后任然会返回false,导致后面记录为成功记录时都走下面那段逻辑。

@agapple
Copy link
Member

agapple commented Oct 8, 2019

同步一致性,选择基于binlog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants