Skip to content

Commit

Permalink
Merge commit '72938fbe6aabbbca39975a5cd4a6e40a920eb9c9' into v2.3
Browse files Browse the repository at this point in the history
* commit '72938fbe6aabbbca39975a5cd4a6e40a920eb9c9':
  v2.2.0
  修复 input mongo batch 全量后连接可能未关闭的bug
  input kafka 修复设置多个协程处理数据转换的时候,因为指针原因,导致 worker协程编号出错的bug
  修复修改数据源,但是数据源连接案例及描述等信息没有变更的bug #248
  修复批量添加表数据同步失败的BUG #249
  添加全量任务后,跳转到全量任务列表,采用新窗口跳转 #250
  input kafka 在多parrtition的情况下支持设置处理数据转换的协程数量 来提升性能,一个partition的数据,只能被一个协程处理,在单partition的情况下,设置多个无效,默认为 单协程
  input mongo 支持设置 全量,增量,以及先全量再增量的方式数据同步
  • Loading branch information
jc3wish committed Oct 6, 2023
2 parents 50a72c2 + 72938fb commit 5ff3894
Show file tree
Hide file tree
Showing 24 changed files with 918 additions and 36 deletions.
7 changes: 4 additions & 3 deletions README.EN.MD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Your star is the biggest support for Bifrost!!!
| kafka | NO | YES | debezium | debezium for mysql data |
| Mongo | NO | YES | oplog | |
| kafka | NO | YES | customer json | customer json to bifrost struct |
| Mock | NO | YES | MySQL Binlog | use for outut plugin develop |

---

Expand Down Expand Up @@ -117,11 +118,11 @@ After compiling, the corresponding platform name folder will be created in the t
##### Binary

```
wget https://github.com/brokercap/Bifrost/releases/download/v2.1.1-beta/bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
wget https://github.com/brokercap/Bifrost/releases/download/v2.2.0-beta/bifrost_v2.2.0-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.2.0-beta_Linux-amd64-bin.tar.gz
cd bifrost_v2.1.1-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
cd bifrost_v2.2.0-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
```

Expand Down
11 changes: 6 additions & 5 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
| kafka | NO | YES | debezium | debezium将数据写入kafka,Bifrost消费kafka的数据自动转换处理 |
| Mongo | NO | YES | oplog | |
| kafka | NO | YES | customer json | 支持自定义JSON格式数据转换Bifrost格式的功能 |
| Mock | NO | YES | MySQL Binlog | 对于OutPut插件开发的时候 ,不再需要部署MySQL等源端进行开发阶段的测试了 |

---

Expand Down Expand Up @@ -120,11 +121,11 @@ make install prefix=./target
##### 二进制文件安装
`````sh

wget https://github.com/brokercap/Bifrost/releases/download/v2.1.1-beta/bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
wget https://github.com/brokercap/Bifrost/releases/download/v2.2.0-beta/bifrost_v2.2.0-beta_Linux-amd64-bin.tar.gz

tar -zxvf bifrost_v2.1.1-beta_Linux-amd64-bin.tar.gz
tar -zxvf bifrost_v2.2.0-beta_Linux-amd64-bin.tar.gz

cd bifrost_v2.1.1-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*
cd bifrost_v2.2.0-beta_Linux-amd64-bin/bin && chmod a+x ./Bifrost*

`````

Expand Down Expand Up @@ -213,7 +214,7 @@ tls_key_file=./etc/server.key
tls_crt_file=./etc/server.crt

#是否开启文件队列功能 true|false
file_queue_usable=true
file_queue_usable=false

#统计是否启文件队列的时间,单位毫秒
file_queue_usable_count_time_diff=5000
Expand All @@ -231,7 +232,7 @@ meta_storage_path=127.0.0.1:6379
cluster_name=bifrostTestClusterName

#是否开启文件队列功能 true|false
file_queue_usable=true
file_queue_usable=false

#统计是否启文件队列的时间,单位毫秒
file_queue_usable_count_time_diff=5000
Expand Down
3 changes: 2 additions & 1 deletion admin/view/template/db.detail.history.add.html
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ <h3 class="modal-title">Add New History</h3>
var callback = function (data) {
if(data.status) {
alert("success,请点击开始按钮进行开启 初始化操作")
window.location.href = "/history/index?DbName=" + DbName + "&SchemaName=" + SchemaName + "&TableName=" + TableName;
newWindowsurl = "/history/index?DbName=" + DbName + "&SchemaName=" + SchemaName + "&TableName=" + TableName;
window.open(newWindowsurl);
}else{
alert(data.msg);
}
Expand Down
4 changes: 3 additions & 1 deletion admin/view/template/db.detail.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ <h5>Table ToServer List
&nbsp;
<button class="btn-sm btn-primary" id="historyAddBtn" type="button" style="margin-top: -8px" title="点击后可配置读取数据表的数据进行全量数据初始化">刷全量数据</button>
&nbsp;
<a href="#" id="tableHistoryListBtn">
<a href="#" id="tableHistoryListBtn" target="_blank">
<button class="btn-sm btn-primary" id="" type="button" style="margin-top: -8px">查看全量任务列表</button>
</a>
</span>
Expand Down Expand Up @@ -664,6 +664,8 @@ <h3 class="modal-title" id="showLikeTable_Title"></h3>
var DbName = $("#DbName").val();
var url = "/db/table/list";
setSchemaName(SchemaName);
$("#tableToServerListContair").attr("DbName",DbName);
$("#tableToServerListContair").attr("schema",SchemaName);
var showTableList = function(data){
$("#TableListContair").html("");
$.each(data,function(index,v) {
Expand Down
1 change: 1 addition & 0 deletions admin/view/template/db.list.html
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ <h5 id="opContairTitle">Add new DB</h5>
isNewDB = false;
location.hash = "#newOrUpdateDB";
$("#update_toserver_contair").show();
DoInputTypeChange();
}
);

Expand Down
13 changes: 13 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
v2.2.0-beta 2023-10-06
1. input kafka 在多parrtition的情况下支持设置处理数据转换的协程数量 来提升性能,一个partition的数据,只能被一个协程处理,在单partition的情况下,设置多个无效,默认为 单协程
2. input mongo 支持设置 全量,增量,以及先全量再增量的方式数据同步
3. 修复批量添加表数据同步失败的BUG
https://github.com/brokercap/Bifrost/issues/249
4. 修复修改数据源,但是数据源连接案例及描述等信息没有变更的bug
https://github.com/brokercap/Bifrost/issues/248
5. 添加全量任务后,跳转到全量任务列表,采用新窗口跳转
https://github.com/brokercap/Bifrost/issues/250
6. 支持 input mock ,在开发output的时候 可以更方便的进行测试
7. input 插件开发 支持获取需要同步的库表列表
8. Bristol mysql驱动 支持 conn.Exec conn.Query 带参数请求访问接口

v2.1.1-beta 2023-09-17
1. 修复同步配置字段映射显示undefined的bug
https://github.com/brokercap/Bifrost/issues/237
Expand Down
2 changes: 1 addition & 1 deletion config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ limitations under the License.

package config

const VERSION = "v2.1.1-beta"
const VERSION = "v2.2.0-beta"
2 changes: 1 addition & 1 deletion etc/Bifrost.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tls_key_file=./etc/server.key
tls_crt_file=./etc/server.crt

#是否开启文件队列功能 true|false
file_queue_usable=true
file_queue_usable=false

#统计是否启文件队列的时间,单位毫秒
file_queue_usable_count_time_diff=5000
Expand Down
13 changes: 11 additions & 2 deletions input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (
"fmt"
"github.com/Shopify/sarama"
"io/ioutil"
"strconv"
"strings"
)

type ParamMap map[string]string

type Config struct {
BrokerServerList []string
GroupId string
Topics []string
ParamConfig *sarama.Config
SkipSerializeErr bool
ParamMap map[string]string
ParamMap ParamMap
CosumerCount int // 处理从kafka连接出来之后的处理数据的协程数量,默认 1
}

type TLSConfig struct {
Expand Down Expand Up @@ -158,7 +162,12 @@ func getKafkaConnectConfig(config map[string]string) (kafkaConnectConfig *Config
if _, ok := config["skip.serialize.err"]; ok {
kafkaConnectConfig.SkipSerializeErr = true
}
if _, ok := config["consumer.count"]; ok {
kafkaConnectConfig.CosumerCount, _ = strconv.Atoi(config["consumer.count"])
}
if kafkaConnectConfig.CosumerCount <= 0 {
kafkaConnectConfig.CosumerCount = DefaultConsumerCount
}

return kafkaConnectConfig, err

}
38 changes: 38 additions & 0 deletions input/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/smartystreets/goconvey/convey"
"testing"
)

func Test_getKafkaConnectConfig(t *testing.T) {
convey.Convey("normal", t, func() {
url := "127.0.0.1:9092,192.168.1.10/topic1,topic2?from.beginning=true&version=2.7.1&consumer.count=3&skip.serialize.err=true"
configMap := ParseDSN(url)
c, err := getKafkaConnectConfig(configMap)
version, _ := sarama.ParseKafkaVersion("2.7.1")
convey.So(err, convey.ShouldBeNil)
convey.So(c.CosumerCount, convey.ShouldEqual, 3)
convey.So(c.SkipSerializeErr, convey.ShouldEqual, true)
convey.So(len(c.Topics), convey.ShouldEqual, 2)
convey.So(c.Topics[0], convey.ShouldEqual, "topic1")
convey.So(c.Topics[1], convey.ShouldEqual, "topic2")
convey.So(c.ParamConfig.Version.String(), convey.ShouldEqual, version.String())
convey.So(c.ParamConfig.Consumer.Offsets.Initial, convey.ShouldEqual, sarama.OffsetOldest)
})

convey.Convey("normal default", t, func() {
url := "127.0.0.1:9092,192.168.1.10/topic1,topic2"
configMap := ParseDSN(url)
c, err := getKafkaConnectConfig(configMap)
convey.So(err, convey.ShouldBeNil)
convey.So(c.CosumerCount, convey.ShouldEqual, 1)
convey.So(c.SkipSerializeErr, convey.ShouldEqual, false)
convey.So(len(c.Topics), convey.ShouldEqual, 2)
convey.So(c.Topics[0], convey.ShouldEqual, "topic1")
convey.So(c.Topics[1], convey.ShouldEqual, "topic2")
convey.So(c.ParamConfig.Version.String(), convey.ShouldEqual, defaultKafkaVersion)
convey.So(c.ParamConfig.Consumer.Offsets.Initial, convey.ShouldEqual, sarama.OffsetNewest)
})
}
2 changes: 2 additions & 0 deletions input/kafka/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ const partitionTableNamePrefix = "partition_"
const DefaultBinlogFileName = "bifrost.000001"

const DefaultBinlogPosition = 0

const DefaultConsumerCount = 1
3 changes: 3 additions & 0 deletions input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type InputKafka struct {
consumeClaimCtx context.Context
consumeClaimCancle context.CancelFunc

inputCosumeList []chan *sarama.ConsumerMessage

topics map[string]map[string]bool

positionMap map[string]map[int32]int64
Expand Down Expand Up @@ -123,6 +125,7 @@ func (c *InputKafka) Start(ch chan *inputDriver.PluginStatus) error {

func (c *InputKafka) Start0() error {
c.kafkaGroupCtx, c.kafkaGroupCancel = context.WithCancel(context.Background())
c.InitInputCosume(c.config.CosumerCount)
timeout := 2 * time.Second
var timer *time.Timer
for {
Expand Down
87 changes: 87 additions & 0 deletions input/kafka/input_msg_consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"hash/crc32"
"log"
"sync"
)

//一个input可以设置多个协程进行处理反序列化解析的数据,同时需要保证同一个partition的数据只能被一个协程处理

func (c *InputKafka) InitInputCosume(workerCount int) *sync.WaitGroup {
ws := &sync.WaitGroup{}
ws.Add(workerCount)
for i := 0; i < workerCount; i++ {
ch := make(chan *sarama.ConsumerMessage, 5000)
c.inputCosumeList = append(c.inputCosumeList, ch)
go func(workerId int) {
defer ws.Done()
c.InputCosume(c.kafkaGroupCtx, workerId, ch)
}(i)
}
return ws
}

// 当前这个方法,主要用于单测,实际业务代码中,没有场景使用到
// 假如中途close了chan,异步协程是有可能会写入失败的
func (c *InputKafka) CloseInputCosume() {
if len(c.inputCosumeList) == 0 {
return
}
for i := range c.inputCosumeList {
close(c.inputCosumeList[i])
}
}

func (c *InputKafka) SendToInputConsume(kafkaMsg *sarama.ConsumerMessage) {
crc32Int := c.CRC32KafkaMsgTopicAndPartition(kafkaMsg)
i := crc32Int % len(c.inputCosumeList)
select {
case c.inputCosumeList[i] <- kafkaMsg:
break
case <-c.kafkaGroupCtx.Done():
break
}
}

func (c *InputKafka) CRC32KafkaMsgTopicAndPartition(kafkaMsg *sarama.ConsumerMessage) int {
key := fmt.Sprintf("%s_%d", kafkaMsg.Topic, kafkaMsg.Partition)
crc32Int := int(crc32.ChecksumIEEE([]byte(key)))
return crc32Int
}

func (c *InputKafka) InputCosume(ctx context.Context, workerId int, ch chan *sarama.ConsumerMessage) {
log.Printf("[INFO] output[%s] InputCosume workeId:%d starting\n", "kafka", workerId)
defer log.Printf("[INFO] output[%s] InputCosume workeId:%d end\n", "kafka", workerId)
for {
select {
case <-ctx.Done():
return
case kafkaMsg := <-ch:
if kafkaMsg == nil {
return
}
c.ToChildCallback(kafkaMsg)
break
}
}
}
78 changes: 78 additions & 0 deletions input/kafka/input_msg_consume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package kafka

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/smartystreets/goconvey/convey"
"testing"
"time"
)

func TestInputCosume(t *testing.T) {
convey.Convey("normal", t, func() {
var dataList []*sarama.ConsumerMessage
var callbackFunc = func(message *sarama.ConsumerMessage) error {
dataList = append(dataList, message)
return nil
}
c := NewInputKafka()
c.kafkaGroupCtx, c.kafkaGroupCancel = context.WithCancel(context.Background())
c.childCallBack = callbackFunc
ws := c.InitInputCosume(5)
for i := 0; i < 20; i++ {
kafkaMsg := &sarama.ConsumerMessage{
Topic: fmt.Sprintf("topic_%d", i%5),
Partition: 0,
}
c.SendToInputConsume(kafkaMsg)
}
c.CloseInputCosume()
ws.Wait()
convey.So(len(dataList), convey.ShouldEqual, 20)
})
}

func TestInputKafka_CloseInputCosume(t *testing.T) {
c := NewInputKafka()
ws := c.InitInputCosume(0)
c.CloseInputCosume()
ws.Wait()
}

func TestInputKafka_CRC32KafkaMsgTopicAndPartition(t *testing.T) {
convey.Convey("normal", t, func() {
c := NewInputKafka()
kafkaMsg1 := &sarama.ConsumerMessage{
Topic: fmt.Sprintf("topic_%d", 1),
Partition: 0,
}
kafkaMsg1CRC32 := c.CRC32KafkaMsgTopicAndPartition(kafkaMsg1)
kafkaMsg2 := &sarama.ConsumerMessage{
Topic: fmt.Sprintf("topic_%d", 2),
Partition: 0,
}
kafkaMsg2CRC32 := c.CRC32KafkaMsgTopicAndPartition(kafkaMsg2)
convey.So(kafkaMsg1CRC32, convey.ShouldNotEqual, kafkaMsg2CRC32)
})
}

func TestInputKafka_SendToInputConsume(t *testing.T) {
convey.Convey("send chan lock,and consume cancle", t, func() {
c := NewInputKafka()
c.kafkaGroupCtx, c.kafkaGroupCancel = context.WithCancel(context.Background())
c.inputCosumeList = make([]chan *sarama.ConsumerMessage, 0)
c.inputCosumeList = append(c.inputCosumeList, make(chan *sarama.ConsumerMessage, 1))
go func() {
<-time.After(1 * time.Second)
c.kafkaGroupCancel()
}()
for i := 0; i < 3; i++ {
kafkaMsg := &sarama.ConsumerMessage{
Topic: fmt.Sprintf("topic_%d", i%5),
Partition: 0,
}
c.SendToInputConsume(kafkaMsg)
}
})
}
Loading

0 comments on commit 5ff3894

Please sign in to comment.