Skip to content

Commit

Permalink
Fix race condition in ovn-kubernetes CNI
Browse files Browse the repository at this point in the history
In OVN-Kubernetes out of order event lead to stale gatewayroutes and
nongatewayroutes. This lead to incorrect flow in datatpath. Now gatewayroutes
and nongatewayroutes use cluster ID instead of endpoint name. This ensures that
there is only one resource and it gets updated when remote endpoint is created
and updated and preventing stale endpoints. The gatewayroutes only care about
remoteCIDRs does not matter if the remote gateway switches.

Signed-off-by: Aswin Suryanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Jan 5, 2024
1 parent 6a588b1 commit 40fb429
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
18 changes: 13 additions & 5 deletions pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,28 @@ func (h *GatewayRouteHandler) GetNetworkPlugins() []string {

func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
if h.State().IsOnGateway() {
_, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.V(log.TRACE).Infof("Remote endpoint %q created event received on gateway node", endpoint.Name)

gwr := h.newGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), GatewayResourceInterface(h.smClient, endpoint.Namespace),
gwr, util.Replace(gwr))
if err != nil {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}

logger.V(log.TRACE).Infof("GatewayRoute %s: %#v", result, gwr)
}

return nil
}

func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
if h.State().IsOnGateway() {
logger.V(log.TRACE).Infof("Endpoint %q removed event received on gateway node ", endpoint.Name)

if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
endpoint.Spec.ClusterID, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}
}
Expand Down Expand Up @@ -109,7 +117,7 @@ func (h *GatewayRouteHandler) TransitionToGateway() error {
func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Name: endpoint.Spec.ClusterID,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
Expand Down
10 changes: 5 additions & 5 deletions pkg/routeagent_driver/handlers/ovn/gateway_route_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn"
)

var _ = Describe("GatewayRouteHandler", func() {
var _ = Describe("x", func() {
t := newTestDriver()

JustBeforeEach(func() {
t.Start(ovn.NewGatewayRouteHandler(t.submClient))
})

awaitGatewayRoute := func(ep *submarinerv1.Endpoint) {
gwRoute := test.AwaitResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), ep.Name)
gwRoute := test.AwaitResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), ep.Spec.ClusterID)
Expect(gwRoute.RoutePolicySpec.RemoteCIDRs).To(Equal(ep.Spec.Subnets))
Expect(gwRoute.RoutePolicySpec.NextHops).To(Equal([]string{t.mgmntIntfIP}))
}
Expand All @@ -53,7 +53,7 @@ var _ = Describe("GatewayRouteHandler", func() {
awaitGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})

Context("and the GatewayRoute operations initially fail", func() {
Expand All @@ -69,15 +69,15 @@ var _ = Describe("GatewayRouteHandler", func() {
awaitGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})
})

Context("on transition to gateway", func() {
It("should create GatewayRoutes for all remote Endpoints", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster1", "host", "192.0.4.0/24"))
test.EnsureNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

localEndpoint := t.CreateLocalHostEndpoint()
awaitGatewayRoute(endpoint)
Expand Down
19 changes: 13 additions & 6 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,18 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
return nil
}

_, err := h.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newNonGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.V(log.TRACE).Infof("Remote endpoint %q created event received on gateway node", endpoint.Name)

ngwr := h.newNonGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoint.Namespace),
ngwr, util.Replace(ngwr))
if err != nil {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}

logger.V(log.TRACE).Infof("NonGatewayRoute %s: %#v", result, ngwr)

return nil
}

Expand All @@ -100,8 +105,10 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
return nil
}

logger.V(log.TRACE).Infof("Endpoint %q removed event received on gateway node ", endpoint.Name)

if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
endpoint.Spec.ClusterID, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}

Expand Down Expand Up @@ -132,7 +139,7 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Name: endpoint.Spec.ClusterID,
Namespace: endpoint.Namespace,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
})

awaitNonGatewayRoute := func(ep *submarinerv1.Endpoint) {
nonGWRoute := test.AwaitResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), ep.Name)
nonGWRoute := test.AwaitResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), ep.Spec.ClusterID)
Expect(nonGWRoute.RoutePolicySpec.RemoteCIDRs).To(Equal(ep.Spec.Subnets))
Expect(nonGWRoute.RoutePolicySpec.NextHops).To(Equal([]string{t.transitSwitchIP}))
}
Expand All @@ -53,7 +53,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
awaitNonGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})

Context("and the NonGatewayRoute operations initially fail", func() {
Expand All @@ -69,7 +69,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
awaitNonGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})

Expand All @@ -80,7 +80,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {

It("should not create a NonGatewayRoute", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

t.submClient.Fake.ClearActions()
t.DeleteEndpoint(endpoint.Name)
Expand All @@ -92,7 +92,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
Context("on transition to gateway", func() {
It("should create NonGatewayRoutes for all remote Endpoints", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

localEndpoint := t.CreateLocalHostEndpoint()
awaitNonGatewayRoute(endpoint)
Expand All @@ -112,7 +112,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
It("should not create any NonGatewayRoutes", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
t.CreateLocalHostEndpoint()
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})
})
Expand Down

0 comments on commit 40fb429

Please sign in to comment.