Skip to content

Commit

Permalink
feat-cdc:add consumer kafka
Browse files Browse the repository at this point in the history
Signed-off-by: LeeHao <[email protected]>
  • Loading branch information
ForestLH committed Aug 20, 2024
1 parent 152695a commit f4f52bc
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 50 deletions.
5 changes: 3 additions & 2 deletions tools/pika_cdc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ make
## Todo:

Consumer side:
- [x] redis
- [ ] kafka
- [x] **redis**
- [x] **kafka** Create a topic of the same name for each pika's DB
- [ ] **bifrost**
5 changes: 2 additions & 3 deletions tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# pika_server
pika_server : 127.0.0.1:11221
# For data from one DB of one pika, a separate MQ topic is created,
# and the name of the topic is the dbname of the pika
kafka_servers:
- 127.0.0.1:9092
redis_servers:
- 127.0.0.1:6379
pulsar_servers:
- 127.0.0.1:6650
# will create this topic, if this topic not exist
# and all servers will use same name topic
topic : test
# retry times while send message failed
retries : 0
# retry interval while send message failed(ms)
Expand Down
1 change: 0 additions & 1 deletion tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type PikaCdcConfig struct {
KafkaServers []string `yaml:"kafka_servers"`
RedisServers []string `yaml:"redis_servers"`
PulsarServers []string `yaml:"pulsar_servers"`
Topic string `yaml:"topic"`
Retries int `yaml:"retries"`
RetryInterval int `yaml:"retry_interval"`
ParallelThreadSize int `yaml:"parallel_thread_size"`
Expand Down
6 changes: 4 additions & 2 deletions tools/pika_cdc/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []by
var consumers []Consumer

// kafka
kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries, msgChanns)
consumers = append(consumers, kafka)
for _, k := range config.KafkaServers {
kafka, _ := NewKafka(k, config.Retries, msgChanns)
consumers = append(consumers, kafka)
}

// redis
for _, r := range config.RedisServers {
Expand Down
85 changes: 43 additions & 42 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,56 @@
package consumer

import (
"context"
"github.com/segmentio/kafka-go"
"sync"
)

type Kafka struct {
servers []string
topic string
retries int
conns []*kafka.Conn
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
once sync.Once
protocol Protocol
servers string
topics []string
retries int
kafkaConns map[string]*kafka.Conn
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
once sync.Once
protocol KafkaProtocol
}

func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error {
//retries := k.retries
//select {
//case *k.messageChan <- k.protocol.ToConsumer(msg):
// return nil
//case <-time.After(2 * time.Second):
// e := errors.New("send pika cmd timeout and retry send pika cmd")
// logrus.Warn("{}", e)
// retries--
// if retries <= 0 {
// break
// }
//}
k.kafkaConns[dbName].Write(k.protocol.ToConsumer(msg))
return nil
}

func (k *Kafka) Name() string {
return "Kafka"
}

func NewKafka(servers []string, topic string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) {
func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) {
k := &Kafka{}
//k.protocol = &KafkaProtocol{}
//for _, server := range servers {
// conn, err := kafka.DialLeader(context.Background(), "tcp", server, topic, 0)
// if err != nil {
// return k, err
// } else {
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// k.conns = append(k.conns, conn)
// }
//}
//k.messageChan = msgChanns
//k.stopChan = make(chan bool)
//k.retries = retries
k.protocol = KafkaProtocol{}
k.kafkaConns = make(map[string]*kafka.Conn)
k.msgChanns = make(map[string]chan []byte)
for dbname, chann := range msgChanns {
conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0)
if err != nil {
return k, err
} else {
k.kafkaConns[dbname] = conn
}
k.msgChanns[dbname] = chann
}
k.stopChan = make(chan bool)
k.retries = retries
return k, nil
}

func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.conns {
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
Expand All @@ -78,12 +69,22 @@ func (k *Kafka) Close() error {
return nil
}
func (k *Kafka) Run() {
//select {
//case msg := <-*k.messageChan:
// k.SendCmdMessage(msg)
//case <-k.stopChan:
// return
//}
var wg sync.WaitGroup
for dbName, chann := range k.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
k.SendCmdMessage(dbName, msg)
case <-k.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (k *Kafka) Stop() {
k.stopChan <- true
Expand Down

0 comments on commit f4f52bc

Please sign in to comment.