diff --git a/.golangci.yml b/.golangci.yml index 9a70a9f0a0..362a1d63fb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -253,7 +253,3 @@ issues: linters: - gocyclo text: "processEvent" - - path: pkg/networkservice/common/serialize/common.go - linters: - - scopelint - text: "Using the variable on range scope `shouldRetry` in function literal" diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index e8f782fc2a..f7c67a5be7 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -60,9 +60,9 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS rv = chain.NewNetworkServiceClient( append( append([]networkservice.NetworkServiceClient{ - serialize.NewClient(), authorize.NewClient(), updatepath.NewClient(name), + serialize.NewClient(), heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), onHeal), refresh.NewClient(ctx), }, additionalFunctionality...), diff --git a/pkg/networkservice/chains/endpoint/server.go b/pkg/networkservice/chains/endpoint/server.go index 377fd0c265..5be75d12ad 100644 --- a/pkg/networkservice/chains/endpoint/server.go +++ b/pkg/networkservice/chains/endpoint/server.go @@ -64,9 +64,9 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw rv.NetworkServiceServer = chain.NewNamedNetworkServiceServer( name, append([]networkservice.NetworkServiceServer{ - serialize.NewServer(), authzServer, updatepath.NewServer(name), + serialize.NewServer(), // `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so // chain elements before the `timeout` in chain shouldn't make any updates to the Close context and // shouldn't be closed on Connection Close. diff --git a/pkg/networkservice/common/serialize/README.md b/pkg/networkservice/common/serialize/README.md index a2ae97e7b3..a570227848 100644 --- a/pkg/networkservice/common/serialize/README.md +++ b/pkg/networkservice/common/serialize/README.md @@ -4,25 +4,29 @@ # Implementation -## serializeServer, serializeClient +## serializer -serialize chain elements keep [serialize.executorMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/gen.go#L25) -mapping incoming `Connection.ID` to a [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go). -New executors are created on Request. Close deletes existing executor for the request. +`serializer` maps incoming `Connection.ID` to a [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go), +request count and state. New mappings are created on Request. If request count and state are equal 0, mapping becomes +free and can be cleaned up. Request count is incremented with each Request before requesting the next chain element and +decrements after. All request count modifications are performed under the `serializer.executor`. State is being changed +in Request from 0 to random, or in Close from not 0 to 0. If state is equal 0, connection is closed so all incoming +Close events should not be processed. To make possible a new chain element firing asynchronously with `Request`, `Close` events, serialize chain elements wraps -per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L35), -[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L50) and inserts them into the `Request` -context. Such thing is not performed for the `Close` context because the executor will already be cancelled by the time -it becomes free. +per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L36), +[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L40) and inserts them into the `Request` +context. Such a thing is not being performed for the `Close` context because the executor will already be cancelled by +the time it becomes free. For the generated events state should be equal the original state for the Request/CloseExecutor. Correct `Request` firing chain element example: ```go func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { executor := serialize.RequestExecutor(ctx) go func() { - executor.AsyncExec(func() { + executor.AsyncExec(func() error { _, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request) + return nil }) }() @@ -40,25 +44,18 @@ func (s *closeServer) Request(ctx context.Context, request *networkservice.Netwo executor := serialize.CloseExecutor(ctx) go func() { - executor.AsyncExec(func() { + executor.AsyncExec(func() error { _, _ = next.Server(ctx).Close(context.TODO(), conn) + return errors.New() // I don't want to close the chain. + }) + }() + go func() { + executor.AsyncExec(func() error { + _, _ = next.Server(ctx).Close(context.TODO(), conn) + return nil // I want to close the chain. }) }() return conn, err } ``` - -# race condition case - -There is a possible case for the race condition: -``` - 1. -> close : locking `executor` - 2. -> request-1 : waiting on `executor` - 3. close -> : unlocking `executor`, removing it from `executors` - 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec` - 5. -request-1-> : locking `executor`, trying to store it into `executors` -``` -at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored in `executors`. It means -that `request-2` and all subsequent events will be executed in parallel with `request-1`. If such case happens, `request-1` -fails with _race condition_ error. \ No newline at end of file diff --git a/pkg/networkservice/common/serialize/client.go b/pkg/networkservice/common/serialize/client.go index 2fa1be1993..2a4686adfa 100644 --- a/pkg/networkservice/common/serialize/client.go +++ b/pkg/networkservice/common/serialize/client.go @@ -28,23 +28,27 @@ import ( ) type serializeClient struct { - executors executorMap + serializer } // NewClient returns a new serialize client chain element func NewClient() networkservice.NetworkServiceClient { - return new(serializeClient) + return &serializeClient{ + serializer{ + executors: map[string]*executor{}, + }, + } } func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { connID := request.GetConnection().GetId() - return requestConnection(ctx, &c.executors, connID, func(requestCtx context.Context) (*networkservice.Connection, error) { + return c.requestConnection(ctx, connID, func(requestCtx context.Context) (*networkservice.Connection, error) { return next.Client(ctx).Request(requestCtx, request, opts...) }) } func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) { - return closeConnection(ctx, &c.executors, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) { + return c.closeConnection(ctx, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) { return next.Client(ctx).Close(closeCtx, conn, opts...) }) } diff --git a/pkg/networkservice/common/serialize/common.go b/pkg/networkservice/common/serialize/common.go deleted file mode 100644 index 49a493e071..0000000000 --- a/pkg/networkservice/common/serialize/common.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package serialize provides chain elements to make Request, Close processing in chain serial -package serialize - -import ( - "context" - - "github.com/edwarnicke/serialize" - "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/tools/log" -) - -func requestConnection( - ctx context.Context, - executors *executorMap, - connID string, - requestConn func(context.Context) (*networkservice.Connection, error), -) (conn *networkservice.Connection, err error) { - // If we create an executor, we should be the first one who uses it, or else we can get a double Close issue: - // 1. close -> : closing the Connection - // 2. request -> : creating `newExecutor`, storing into `executors` - // 3. close -> : locking `newExecutor`, checking `newExecutor == executors[connID]` - looks like - // a retry Close case (but it isn't), double closing the Connection - newExecutor := new(serialize.Executor) - <-newExecutor.AsyncExec(func() { - for shouldRetry := true; shouldRetry; { - shouldRetry = false - - executor, loaded := executors.LoadOrStore(connID, newExecutor) - // We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements - // can generate new Request, Close events and insert them into the chain in a serial way. - requestExecutor := newRequestExecutor(executor, connID, executors) - closeExecutor := newCloseExecutor(executor, connID, executors) - if loaded { - <-executor.AsyncExec(func() { - // Executor has been possibly removed at this moment, we need to store it back. - exec, _ := executors.LoadOrStore(connID, executor) - if exec != executor { - // It can happen in such situation: - // 1. -> close : locking `executor` - // 2. -> request-1 : waiting on `executor` - // 3. close -> : unlocking `executor`, removing it from `executors` - // 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec` - // 5. -request-1-> : locking `executor`, trying to store it into `executors` - // at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored - // in `executors`. It means that `request-2` and all subsequent events will be executed in parallel - // with `request-1`. - shouldRetry = true - return - } - if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { - executors.Delete(connID) - } - }) - } else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { - executors.Delete(connID) - } - } - }) - - return conn, err -} - -func closeConnection( - ctx context.Context, - executors *executorMap, - connID string, - closeConn func(context.Context) (*empty.Empty, error), -) (_ *empty.Empty, err error) { - logEntry := log.Entry(ctx).WithField("serialize", "close") - - executor, ok := executors.Load(connID) - if ok { - <-executor.AsyncExec(func() { - var exec *serialize.Executor - if exec, ok = executors.Load(connID); ok && exec == executor { - // We don't set `requestExecutor`, `closeExecutor` into the close context because they should be - // canceled by the time anything can be executed with them. - _, err = closeConn(ctx) - executors.Delete(connID) - } else { - ok = false - } - }) - } - if !ok { - logEntry.Warnf("connection has been already closed: %v", connID) - return &empty.Empty{}, nil - } - - return &empty.Empty{}, err -} diff --git a/pkg/networkservice/common/serialize/executor.go b/pkg/networkservice/common/serialize/executor.go index f736bb4b2c..f32de11eb2 100644 --- a/pkg/networkservice/common/serialize/executor.go +++ b/pkg/networkservice/common/serialize/executor.go @@ -17,42 +17,52 @@ package serialize import ( - "github.com/edwarnicke/serialize" + "sync/atomic" + "github.com/pkg/errors" ) // Executor is same as serialize.Executor except that it returns error channel type Executor interface { - AsyncExec(f func()) <-chan error + AsyncExec(f func() error) <-chan error } -type executorFunc func(f func()) <-chan error +type executorFunc func(f func() error) <-chan error -func (ef executorFunc) AsyncExec(f func()) <-chan error { +func (ef executorFunc) AsyncExec(f func() error) <-chan error { return ef(f) } -func newRequestExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor { - return executorFunc(func(f func()) <-chan error { +func newRequestExecutor(exec *executor, id string) Executor { + return newExecutor(exec, exec.state, id) +} + +func newCloseExecutor(exec *executor, id string, clean func()) Executor { + state := exec.state + return executorFunc(func(f func() error) <-chan error { + executor := newExecutor(exec, state, id) + return executor.AsyncExec(func() error { + if err := f(); err != nil { + return err + } + atomic.StoreUint32(&exec.state, 0) + clean() + return nil + }) + }) +} + +func newExecutor(exec *executor, state uint32, id string) Executor { + return executorFunc(func(f func() error) <-chan error { errCh := make(chan error, 1) - executor.AsyncExec(func() { - if exec, ok := executors.Load(id); !ok || exec != executor { + exec.executor.AsyncExec(func() { + if exec.state != state { errCh <- errors.Errorf("connection is already closed: %v", id) return } - f() + errCh <- f() close(errCh) }) return errCh }) } - -func newCloseExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor { - return executorFunc(func(f func()) <-chan error { - exec := newRequestExecutor(executor, id, executors) - return exec.AsyncExec(func() { - f() - executors.Delete(id) - }) - }) -} diff --git a/pkg/networkservice/common/serialize/executor_map.gen.go b/pkg/networkservice/common/serialize/executor_map.gen.go deleted file mode 100644 index b0003262c0..0000000000 --- a/pkg/networkservice/common/serialize/executor_map.gen.go +++ /dev/null @@ -1,75 +0,0 @@ -// Code generated by "-output executor_map.gen.go -type executorMap -output executor_map.gen.go -type executorMap"; DO NOT EDIT. -package serialize - -import ( - "sync" // Used by sync.Map. - - "github.com/edwarnicke/serialize" -) - -// Generate code that will fail if the constants change value. -func _() { - // An "cannot convert executorMap literal (type executorMap) to type sync.Map" compiler error signifies that the base type have changed. - // Re-run the go-syncmap command to generate them again. - _ = (sync.Map)(executorMap{}) -} - -var _nil_executorMap_serialize_Executor_value = func() (val *serialize.Executor) { return }() - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *executorMap) Load(key string) (*serialize.Executor, bool) { - value, ok := (*sync.Map)(m).Load(key) - if value == nil { - return _nil_executorMap_serialize_Executor_value, ok - } - return value.(*serialize.Executor), ok -} - -// Store sets the value for a key. -func (m *executorMap) Store(key string, value *serialize.Executor) { - (*sync.Map)(m).Store(key, value) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *executorMap) LoadOrStore(key string, value *serialize.Executor) (*serialize.Executor, bool) { - actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) - if actual == nil { - return _nil_executorMap_serialize_Executor_value, loaded - } - return actual.(*serialize.Executor), loaded -} - -// LoadAndDelete deletes the value for a key, returning the previous value if any. -// The loaded result reports whether the key was present. -func (m *executorMap) LoadAndDelete(key string) (value *serialize.Executor, loaded bool) { - actual, loaded := (*sync.Map)(m).LoadAndDelete(key) - if actual == nil { - return _nil_executorMap_serialize_Executor_value, loaded - } - return actual.(*serialize.Executor), loaded -} - -// Delete deletes the value for a key. -func (m *executorMap) Delete(key string) { - (*sync.Map)(m).Delete(key) -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *executorMap) Range(f func(key string, value *serialize.Executor) bool) { - (*sync.Map)(m).Range(func(key, value interface{}) bool { - return f(key.(string), value.(*serialize.Executor)) - }) -} diff --git a/pkg/networkservice/common/serialize/gen.go b/pkg/networkservice/common/serialize/gen.go deleted file mode 100644 index 4e35dde62d..0000000000 --- a/pkg/networkservice/common/serialize/gen.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package serialize - -import ( - "sync" -) - -//go:generate go-syncmap -output executor_map.gen.go -type executorMap - -type executorMap sync.Map diff --git a/pkg/networkservice/common/serialize/serializer.go b/pkg/networkservice/common/serialize/serializer.go new file mode 100644 index 0000000000..8d7b886bb8 --- /dev/null +++ b/pkg/networkservice/common/serialize/serializer.go @@ -0,0 +1,132 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package serialize provides chain elements to make Request, Close processing in chain serial +package serialize + +import ( + "context" + "sync/atomic" + + "github.com/edwarnicke/serialize" + "github.com/golang/protobuf/ptypes/empty" + "github.com/google/uuid" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type serializer struct { + executors map[string]*executor + executor serialize.Executor +} + +type executor struct { + executor serialize.Executor + requestCount int + // state can be accessed in several ways: + // * executor is locked + requestCount is incremented : read, write without atomic + // * executor is locked : read without atomic, write with atomic + // * requestCount == 0 : read with atomic + state uint32 +} + +func (s *serializer) requestConnection( + ctx context.Context, + connID string, + requestConn func(context.Context) (*networkservice.Connection, error), +) (conn *networkservice.Connection, err error) { + var requestCh <-chan struct{} + var exec *executor + <-s.executor.AsyncExec(func() { + var ok bool + if exec, ok = s.executors[connID]; !ok { + exec = new(executor) + s.executors[connID] = exec + } + exec.requestCount++ + + requestCh = exec.executor.AsyncExec(func() { + for exec.state == 0 { + exec.state = uuid.New().ID() + } + conn, err = requestConn(withExecutors(ctx, + newRequestExecutor(exec, connID), + newCloseExecutor(exec, connID, func() { + s.executor.AsyncExec(func() { + s.clean(connID) + }) + }))) + if err != nil { + exec.state = 0 + } + }) + }) + + <-requestCh + + s.executor.AsyncExec(func() { + exec.requestCount-- + s.clean(connID) + }) + + return conn, err +} + +func (s *serializer) closeConnection( + ctx context.Context, + connID string, + closeConn func(context.Context) (*empty.Empty, error), +) (_ *empty.Empty, err error) { + logEntry := log.Entry(ctx).WithField("serializer", "closeConnection") + + var closeCh <-chan struct{} + var exec *executor + <-s.executor.AsyncExec(func() { + var ok bool + if exec, ok = s.executors[connID]; !ok { + logEntry.Warnf("connection has been already closed: %v", connID) + return + } + + closeCh = exec.executor.AsyncExec(func() { + if exec.state == 0 { + logEntry.Warnf("connection has been already closed: %v", connID) + return + } + _, err = closeConn(ctx) + atomic.StoreUint32(&exec.state, 0) + }) + }) + + if exec != nil { + <-closeCh + + s.executor.AsyncExec(func() { + s.clean(connID) + }) + } + + return &empty.Empty{}, err +} + +func (s *serializer) clean(connID string) { + exec, ok := s.executors[connID] + if ok && exec.requestCount == 0 && atomic.LoadUint32(&exec.state) == 0 { + delete(s.executors, connID) + } +} diff --git a/pkg/networkservice/common/serialize/server.go b/pkg/networkservice/common/serialize/server.go index 008ae86f2e..cea54081ab 100644 --- a/pkg/networkservice/common/serialize/server.go +++ b/pkg/networkservice/common/serialize/server.go @@ -20,6 +20,7 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" + "github.com/sirupsen/logrus" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -27,23 +28,29 @@ import ( ) type serializeServer struct { - executors executorMap + serializer } // NewServer returns a new serialize server chain element func NewServer() networkservice.NetworkServiceServer { - return new(serializeServer) + return &serializeServer{ + serializer{ + executors: map[string]*executor{}, + }, + } } func (s *serializeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { connID := request.GetConnection().GetId() - return requestConnection(ctx, &s.executors, connID, func(requestCtx context.Context) (*networkservice.Connection, error) { + logrus.Infof("REQUEST: %v", connID) + return s.requestConnection(ctx, connID, func(requestCtx context.Context) (*networkservice.Connection, error) { return next.Server(ctx).Request(requestCtx, request) }) } func (s *serializeServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) { - return closeConnection(ctx, &s.executors, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) { + logrus.Infof("CLOSE: %v", conn.GetId()) + return s.closeConnection(ctx, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) { return next.Server(ctx).Close(closeCtx, conn) }) } diff --git a/pkg/networkservice/common/serialize/server_test.go b/pkg/networkservice/common/serialize/server_test.go index 4152fa4e54..530865d579 100644 --- a/pkg/networkservice/common/serialize/server_test.go +++ b/pkg/networkservice/common/serialize/server_test.go @@ -77,8 +77,9 @@ type requestServer struct{} func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { executor := serialize.RequestExecutor(ctx) go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request) + executor.AsyncExec(func() error { + _, err := next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request) + return err }) }() @@ -99,8 +100,9 @@ func (s *closeServer) Request(ctx context.Context, request *networkservice.Netwo executor := serialize.CloseExecutor(ctx) go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Close(context.TODO(), conn) + executor.AsyncExec(func() error { + _, err = next.Server(ctx).Close(context.TODO(), conn) + return err }) }() diff --git a/pkg/networkservice/common/timeout/README.md b/pkg/networkservice/common/timeout/README.md index 12ea486a51..43beac7540 100644 --- a/pkg/networkservice/common/timeout/README.md +++ b/pkg/networkservice/common/timeout/README.md @@ -18,9 +18,9 @@ Example of correct timeoutServer usage in [endpoint.NewServer](https://github.co ```go rv.NetworkServiceServer = chain.NewNetworkServiceServer( append([]networkservice.NetworkServiceServer{ - serialize.NewServer(ctx) // <-- should be before the timeoutServer authzServer, // <-- shouldn't be closed, don't set anything to the context updatepath.NewServer(name), // <-- same + serialize.NewServer(ctx) // <-- should be before the timeoutServer timeout.NewServer(ctx), // <-- timeoutServer monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed updatetoken.NewServer(tokenGenerator), // <-- should be closed diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index 03acbfec19..4ecdf6a5f9 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -100,37 +100,26 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co timerPtr := new(*time.Timer) *timerPtr = time.AfterFunc(time.Until(expireTime), func() { - if err := <-executor.AsyncExec(func() { - if timer, ok := t.timers.Load(conn.GetId()); !ok || timer != *timerPtr { - logEntry.Warnf("timer has been already stopped: %v", conn.GetId()) - return + if err := <-executor.AsyncExec(func() error { + if timer, _ := t.timers.Load(conn.GetId()); timer != *timerPtr { + return errors.Errorf("timer has been already stopped: %v", conn.GetId()) } - if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil { + t.timers.Delete(conn.GetId()) + if _, err := next.Server(ctx).Close(t.ctx, conn); err != nil { logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err) } + return nil }); err != nil { - logEntry.Warnf("connection has been already closed: %v", conn.GetId()) + logEntry.Warn(err) } }) return *timerPtr, nil } -func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) { - return &empty.Empty{}, t.close(ctx, conn, next.Server(ctx)) -} - -func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection, nextServer networkservice.NetworkServiceServer) error { - logEntry := log.Entry(ctx).WithField("timeoutServer", "close") - - timer, ok := t.timers.LoadAndDelete(conn.GetId()) - if !ok { - logEntry.Warnf("connection has been already closed: %v", conn.GetId()) - return nil - } +func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + timer, _ := t.timers.LoadAndDelete(conn.GetId()) timer.Stop() - _, err := nextServer.Close(ctx, conn) - - return err + return next.Server(ctx).Close(ctx, conn) }