Skip to content

Commit

Permalink
chore(plc4go): add more wait groups
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 10, 2025
1 parent 8b3977b commit 385255d
Show file tree
Hide file tree
Showing 71 changed files with 442 additions and 93 deletions.
2 changes: 2 additions & 0 deletions plc4go/internal/ads/Browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func (m *Connection) Browse(ctx context.Context, browseRequest apiModel.PlcBrows

func (m *Connection) BrowseWithInterceptor(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
result := make(chan apiModel.PlcBrowseRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcBrowseRequestResult(browseRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down
12 changes: 11 additions & 1 deletion plc4go/internal/ads/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -56,6 +57,8 @@ type Connection struct {

subscriptions map[uint32]apiModel.PlcSubscriptionHandle

wg sync.WaitGroup // use to track spawned go routines

passLogToModel bool
log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
Expand Down Expand Up @@ -116,8 +119,9 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo

// Reset the driver context (Actually this should not be required, but just to be on the safe side)
m.driverContext.clear()

m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down Expand Up @@ -174,7 +178,9 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
// Start the worker for handling incoming messages
// (Messages that are not responses to outgoing messages)
defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel()
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -231,6 +237,10 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
}).
Build()
if err != nil {
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, err)
return
}
subscriptionResultChan := versionChangeRequest.Execute()
subscriptionRequestResult := <-subscriptionResultChan
if subscriptionRequestResult.GetErr() != nil {
Expand Down
12 changes: 12 additions & 0 deletions plc4go/internal/ads/Interactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model.AdsReadDeviceInfoResponse, error) {
responseChannel := make(chan model.AdsReadDeviceInfoResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -75,7 +77,9 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model

func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32) (model.AdsReadResponse, error) {
responseChannel := make(chan model.AdsReadResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -119,7 +123,9 @@ func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint3

func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, data []byte) (model.AdsWriteResponse, error) {
responseChannel := make(chan model.AdsWriteResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -163,7 +169,9 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint

func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, readLength uint32, items []model.AdsMultiRequestItem, writeData []byte) (model.AdsReadWriteResponse, error) {
responseChannel := make(chan model.AdsReadWriteResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -207,7 +215,9 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup

func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32, transmissionMode model.AdsTransMode, maxDelay uint32, cycleTime uint32) (model.AdsAddDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsAddDeviceNotificationResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -251,7 +261,9 @@ func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,

func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Context, notificationHandle uint32) (model.AdsDeleteDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down
4 changes: 4 additions & 0 deletions plc4go/internal/ads/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down Expand Up @@ -95,7 +97,9 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
return
}

m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down
4 changes: 4 additions & 0 deletions plc4go/internal/ads/Subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel

// Create a new result-channel, which completes as soon as all sub-result-channels have returned
globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().
Expand Down Expand Up @@ -132,7 +134,9 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel

func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
responseChan := make(chan apiModel.PlcSubscriptionRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down
4 changes: 4 additions & 0 deletions plc4go/internal/ads/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
m.log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down Expand Up @@ -104,7 +106,9 @@ func (m *Connection) singleWrite(ctx context.Context, writeRequest apiModel.PlcW
}
data := io.GetBytes()

m.wg.Add(1)
go func() {
defer m.wg.Done()
defer func() {
if err := recover(); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down
13 changes: 12 additions & 1 deletion plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"net"
"net/url"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -51,6 +52,8 @@ type ApplicationLayerMessageCodec struct {
localAddress *net.UDPAddr `stringer:"true"`
remoteAddress *net.UDPAddr `stringer:"true"`

wg sync.WaitGroup // use to track spawned go routines

log zerolog.Logger
}

Expand Down Expand Up @@ -125,8 +128,12 @@ func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.wg.Add(1)
go func() {
defer m.wg.Done()
if err := m.bipSimpleApplication.RequestIO(iocb); err != nil {
m.log.Debug().Err(err).Msg("errored")
}
Expand Down Expand Up @@ -159,10 +166,14 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.wg.Add(1)
go func() {
defer m.wg.Done()
if err := m.bipSimpleApplication.RequestIO(iocb); err != nil {

m.log.Error().Err(err).Msg("errored")
}
}()
iocb.Wait()
Expand Down
6 changes: 6 additions & 0 deletions plc4go/internal/bacnetip/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer

wg sync.WaitGroup // use to track spawned go routines

log zerolog.Logger
_options []options.WithOption // Used to pass them downstream
}
Expand Down Expand Up @@ -89,14 +91,18 @@ func (c *Connection) GetTracer() tracer.Tracer {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
connectionConnectResult := <-c.DefaultConnection.ConnectWithContext(ctx)
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer func() {
if err := recover(); err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
Expand Down
7 changes: 7 additions & 0 deletions plc4go/internal/bacnetip/Discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/IBM/netaddr"
Expand All @@ -43,6 +44,8 @@ import (
type Discoverer struct {
messageCodec spi.MessageCodec

wg sync.WaitGroup // use to track spawned go routines

passLogToModel bool
log zerolog.Logger
}
Expand Down Expand Up @@ -168,7 +171,9 @@ func (d *Discoverer) broadcastAndDiscover(ctx context.Context, communicationChan
return
}
blockingReadChan := make(chan bool)
d.wg.Add(1)
go func() {
defer d.wg.Done()
buf := make([]byte, 4096)
n, addr, err := communicationChannelInstance.unicastConnection.ReadFrom(buf)
if err != nil {
Expand Down Expand Up @@ -208,7 +213,9 @@ func (d *Discoverer) broadcastAndDiscover(ctx context.Context, communicationChan
return
}
blockingReadChan := make(chan bool)
d.wg.Add(1)
go func() {
defer d.wg.Done()
buf := make([]byte, 4096)
n, addr, err := communicationChannelInstance.broadcastConnection.ReadFrom(buf)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions plc4go/internal/bacnetip/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package bacnetip
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -45,6 +46,8 @@ type Reader struct {
maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted

wg sync.WaitGroup // use to track spawned go routines

log zerolog.Logger
}

Expand All @@ -66,7 +69,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
// TODO: handle ctx
m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
if len(readRequest.GetTagNames()) == 0 {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("at least one field required"))
return
Expand Down
5 changes: 5 additions & 0 deletions plc4go/internal/bacnetip/Subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package bacnetip

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand All @@ -35,6 +36,8 @@ type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer

wg sync.WaitGroup // use to track spawned go routines

log zerolog.Logger `ignore:"true"`
_options []options.WithOption // Used to pass them downstream
}
Expand All @@ -52,7 +55,9 @@ func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subs

func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
m.wg.Add(1)
go func() {
defer m.wg.Done()
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)

// Add this subscriber to the connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *NameValue) Decode(arg Arg) error {
// look for the context encoded character string
tag := tagList.Peek()
if tag == nil || (tag.GetTagClass() != readWriteModel.TagClass_CONTEXT_SPECIFIC_TAGS) || (tag.GetTagNumber() != 0) {
return MissingRequiredParameter{RejectException{Exception: Exception{Message: fmt.Sprintf("%s is a missing required element of %p", s.name, s)}}}
return MissingRequiredParameter{RejectException: RejectException{Exception: Exception{Message: fmt.Sprintf("%s is a missing required element of %p", s.name, s)}}}
}

// pop it off and save the value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func NewUDPMultiplexer(localLog zerolog.Logger, address any, noBroadcast bool, o
if address == nil {
address, _ := NewAddress(NoArgs)
u.address = address
u.addrTuple = &AddressTuple[string, uint16]{"", 47808}
u.addrBroadcastTuple = &AddressTuple[string, uint16]{"255.255.255.255", 47808}
u.addrTuple = &AddressTuple[string, uint16]{Right: 47808}
u.addrBroadcastTuple = &AddressTuple[string, uint16]{Left: "255.255.255.255", Right: 47808}
} else {
// allow the address to be cast
if caddress, ok := address.(*Address); ok {
Expand All @@ -86,7 +86,7 @@ func NewUDPMultiplexer(localLog zerolog.Logger, address any, noBroadcast bool, o
noBroadcast = true
} else if u.addrTuple == u.addrBroadcastTuple {
// old school broadcast address
u.addrBroadcastTuple = &AddressTuple[string, uint16]{"255.255.255.255", u.addrTuple.Right}
u.addrBroadcastTuple = &AddressTuple[string, uint16]{Left: "255.255.255.255", Right: u.addrTuple.Right}
} else {
specialBroadcast = true
}
Expand Down
Loading

0 comments on commit 385255d

Please sign in to comment.