Skip to content

Commit

Permalink
fix ReadFromConnection for
Browse files Browse the repository at this point in the history
redis/tcpclient and fluentd
  • Loading branch information
dmachard committed Nov 4, 2023
1 parent d8372d8 commit 9560d81
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 83 deletions.
48 changes: 24 additions & 24 deletions loggers/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type FluentdClient struct {
doneProcess chan bool
stopRun chan bool
doneRun chan bool
stopReceive chan bool
doneReceive chan bool
stopRead chan bool
doneRead chan bool
inputChan chan dnsutils.DnsMessage
outputChan chan dnsutils.DnsMessage
config *dnsutils.Config
Expand All @@ -41,8 +41,8 @@ func NewFluentdClient(config *dnsutils.Config, logger *logger.Logger, name strin
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
stopReceive: make(chan bool),
doneReceive: make(chan bool),
stopRead: make(chan bool),
doneRead: make(chan bool),
inputChan: make(chan dnsutils.DnsMessage, config.Loggers.Fluentd.ChannelBufferSize),
outputChan: make(chan dnsutils.DnsMessage, config.Loggers.Fluentd.ChannelBufferSize),
transportReady: make(chan bool),
Expand Down Expand Up @@ -97,6 +97,10 @@ func (o *FluentdClient) Stop() {
o.stopRun <- true
<-o.doneRun

o.LogInfo("stopping to read...")
o.stopRead <- true
<-o.doneRead

o.LogInfo("stopping to process...")
o.stopProcess <- true
<-o.doneProcess
Expand All @@ -110,31 +114,27 @@ func (o *FluentdClient) Disconnect() {
}

func (o *FluentdClient) ReadFromConnection() {
buf := make([]byte, 4096)
buffer := make([]byte, 4096)

for {
select {
// Stop signal received, exit the goroutine
case <-o.stopReceive:
o.doneReceive <- true
return
default:
_, err := o.transportConn.Read(buf)
go func() {
for {
_, err := o.transportConn.Read(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
continue
}
// catch EOF error
if errors.Is(err, io.EOF) {
return
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
o.LogInfo("read from connection terminated")
break
}
o.LogError("Error reading from connection: %s", err.Error())
return
o.LogError("Error on reading: %s", err.Error())
}
// We just discard the data to avoid memory leak or blocking situation
// We just discard the data
}
}
}()

// block goroutine until receive true event in stopRead channel
<-o.stopRead
o.doneRead <- true

o.LogInfo("read goroutine terminated")
}

func (o *FluentdClient) ConnectToRemote() {
Expand Down
6 changes: 5 additions & 1 deletion loggers/fluentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_FluentdClient(t *testing.T) {
g := NewFluentdClient(cfg, logger.New(false), "test")

// fake msgpack receiver
fakeRcvr, err := net.Listen(tc.transport, ":24224")
fakeRcvr, err := net.Listen(tc.transport, tc.address)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -66,6 +66,10 @@ func Test_FluentdClient(t *testing.T) {
if dm.DNS.Qname != dmRcv.DNS.Qname {
t.Errorf("qname error want %s, got %s", dm.DNS.Qname, dmRcv.DNS.Qname)
}

// stop all
fakeRcvr.Close()
g.Stop()
})
}
}
51 changes: 23 additions & 28 deletions loggers/redispub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type RedisPub struct {
doneProcess chan bool
stopRun chan bool
doneRun chan bool
stopReceive chan bool
doneReceive chan bool
stopRead chan bool
doneRead chan bool
inputChan chan dnsutils.DnsMessage
outputChan chan dnsutils.DnsMessage
config *dnsutils.Config
Expand All @@ -46,8 +46,8 @@ func NewRedisPub(config *dnsutils.Config, logger *logger.Logger, name string) *R
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
stopReceive: make(chan bool),
doneReceive: make(chan bool),
stopRead: make(chan bool),
doneRead: make(chan bool),
inputChan: make(chan dnsutils.DnsMessage, config.Loggers.RedisPub.ChannelBufferSize),
outputChan: make(chan dnsutils.DnsMessage, config.Loggers.RedisPub.ChannelBufferSize),
transportReady: make(chan bool),
Expand Down Expand Up @@ -109,13 +109,13 @@ func (o *RedisPub) Stop() {
o.stopRun <- true
<-o.doneRun

o.LogInfo("stopping to receive...")
o.stopRead <- true
<-o.doneRead

o.LogInfo("stopping to process...")
o.stopProcess <- true
<-o.doneProcess

o.LogInfo("stopping to receive...")
o.stopReceive <- true
<-o.doneReceive
}

func (o *RedisPub) Disconnect() {
Expand All @@ -126,32 +126,27 @@ func (o *RedisPub) Disconnect() {
}

func (o *RedisPub) ReadFromConnection() {
buf := make([]byte, 4096)
buffer := make([]byte, 4096)

for {
select {
// Stop signal received, exit the goroutine
case <-o.stopReceive:
o.doneReceive <- true
return
default:
_, err := o.transportConn.Read(buf)
go func() {
for {
_, err := o.transportConn.Read(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
continue
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
o.LogInfo("read from connection terminated")
break
}
// catch EOF error
if errors.Is(err, io.EOF) {
return
}

o.LogError("Error reading from connection: %s", err.Error())
return
o.LogError("Error on reading: %s", err.Error())
}
// We just discard the data
}
}
}()

// block goroutine until receive true event in stopRead channel
<-o.stopRead
o.doneRead <- true

o.LogInfo("read goroutine terminated")
}

func (o *RedisPub) ConnectToRemote() {
Expand Down
4 changes: 4 additions & 0 deletions loggers/redispub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func Test_RedisPubRun(t *testing.T) {
if !pattern2.MatchString(line) {
t.Errorf("redis error want %s, got: %s", pattern2, line)
}

// stop all
fakeRcvr.Close()
g.Stop()
})
}
}
54 changes: 25 additions & 29 deletions loggers/tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type TcpClient struct {
doneProcess chan bool
stopRun chan bool
doneRun chan bool
stopReceive chan bool
doneReceive chan bool
stopRead chan bool
doneRead chan bool
inputChan chan dnsutils.DnsMessage
outputChan chan dnsutils.DnsMessage
config *dnsutils.Config
Expand All @@ -45,8 +45,8 @@ func NewTcpClient(config *dnsutils.Config, logger *logger.Logger, name string) *
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
stopReceive: make(chan bool),
doneReceive: make(chan bool),
stopRead: make(chan bool),
doneRead: make(chan bool),
inputChan: make(chan dnsutils.DnsMessage, config.Loggers.TcpClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DnsMessage, config.Loggers.TcpClient.ChannelBufferSize),
transportReady: make(chan bool),
Expand Down Expand Up @@ -107,13 +107,13 @@ func (o *TcpClient) Stop() {
o.stopRun <- true
<-o.doneRun

o.LogInfo("stopping to read...")
o.stopRead <- true
<-o.doneRead

o.LogInfo("stopping to process...")
o.stopProcess <- true
<-o.doneProcess

o.LogInfo("stopping to receive...")
o.stopReceive <- true
<-o.doneReceive
}

func (o *TcpClient) Disconnect() {
Expand All @@ -124,31 +124,27 @@ func (o *TcpClient) Disconnect() {
}

func (o *TcpClient) ReadFromConnection() {
buf := make([]byte, 4096)
buffer := make([]byte, 4096)

for {
select {
// Stop signal received, exit the goroutine
case <-o.stopReceive:
o.doneReceive <- true
return
default:
_, err := o.transportConn.Read(buf)
go func() {
for {
_, err := o.transportConn.Read(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
continue
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
o.LogInfo("read from connection terminated")
break
}
// catch EOF error
if errors.Is(err, io.EOF) {
return
}
o.LogError("Error reading from connection: %s", err.Error())
return
o.LogError("Error on reading: %s", err.Error())
}
// We just discard the data to avoid memory leak or blocking situation
// We just discard the data
}
}
}()

// block goroutine until receive true event in stopRead channel
<-o.stopRead
o.doneRead <- true

o.LogInfo("read goroutine terminated")
}

func (o *TcpClient) ConnectToRemote() {
Expand Down Expand Up @@ -351,7 +347,7 @@ PROCESS_LOOP:
bufferDm = append(bufferDm, dm)

// buffer is full ?
if len(bufferDm) >= o.config.Loggers.Fluentd.BufferSize {
if len(bufferDm) >= o.config.Loggers.TcpClient.BufferSize {
o.FlushBuffer(&bufferDm)
}

Expand Down
8 changes: 7 additions & 1 deletion loggers/tcpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func Test_TcpClientRun(t *testing.T) {
cfg.Loggers.TcpClient.FlushInterval = 1
cfg.Loggers.TcpClient.BufferSize = 0
cfg.Loggers.TcpClient.Mode = tc.mode
cfg.Loggers.TcpClient.RemoteAddress = "127.0.0.1"
cfg.Loggers.TcpClient.RemotePort = 9999

g := NewTcpClient(cfg, logger.New(false), "test")

Expand Down Expand Up @@ -73,8 +75,12 @@ func Test_TcpClientRun(t *testing.T) {

pattern := regexp.MustCompile(tc.pattern)
if !pattern.MatchString(line) {
t.Errorf("syslog error want %s, got: %s", tc.pattern, line)
t.Errorf("tcp error want %s, got: %s", tc.pattern, line)
}

// stop all
fakeRcvr.Close()
g.Stop()
})
}
}

0 comments on commit 9560d81

Please sign in to comment.