Skip to content

Commit

Permalink
fix: fix rest and sql error and log (#2620) (#2634)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored Feb 21, 2024
1 parent c3627a1 commit 7ad5783
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
13 changes: 13 additions & 0 deletions extensions/sources/sql/ext/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sql
import (
"database/sql"
"fmt"
"strings"
"time"

driver2 "github.com/lf-edge/ekuiper/extensions/sqldatabase/driver"
Expand Down Expand Up @@ -98,11 +99,19 @@ func (m *sqlsource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple,
rows, err := m.db.Query(query)
if err != nil {
logger.Errorf("sql source meet error, try to reconnection, err:%v, query:%v", err, query)
if !isConnectionError(err) {
consumer <- &xsql.ErrorSourceTuple{
Error: err,
}
continue
}
err2 := m.Reconnect()
if err2 != nil {
consumer <- &xsql.ErrorSourceTuple{
Error: fmt.Errorf("reconnect failed, reconnect err:%v", err2),
}
} else {
logger.Info("sql source reconnect successfully")
}
continue
}
Expand Down Expand Up @@ -171,3 +180,7 @@ func (m *sqlsource) Reconnect() error {
func GetSource() api.Source {
return &sqlsource{}
}

func isConnectionError(err error) bool {
return strings.Contains(err.Error(), "connection refused")
}
5 changes: 4 additions & 1 deletion internal/io/http/rest_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ func (ms *RestSink) collectWithUrl(ctx api.StreamContext, item interface{}, desU

resp, err := ms.sendWithUrl(ctx, decodedData, item, desUrl)
if err != nil {
originErr := err
logger.Errorf("rest sink meet error:%v", originErr.Error())
e := err.Error()
if urlErr, ok := err.(*url.Error); ok {
// consider timeout and temporary error as recoverable
if urlErr.Timeout() || urlErr.Temporary() {
e = errorx.IOErr
}
}
return fmt.Errorf(`%s: rest sink fails to send out the data: method=%s path="%s" request_body="%s"`,
return fmt.Errorf(`%s: rest sink fails to send out the data:err=%s method=%s path="%s" request_body="%s"`,
originErr.Error(),
e,
ms.config.Method,
ms.config.Url,
Expand Down
1 change: 0 additions & 1 deletion internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,6 @@ func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
}
// Update to db after validation
_, err = ruleProcessor.ExecUpdate(k, v)

if err != nil {
ruleSetRsp.Rules[k] = err.Error()
continue
Expand Down

0 comments on commit 7ad5783

Please sign in to comment.