Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ovn-failover #2865

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/routeagent_driver/handlers/ovn/gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
Expand Down Expand Up @@ -68,11 +67,15 @@ func (h *GatewayRouteHandler) GetNetworkPlugins() []string {

func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
if h.State().IsOnGateway() {
_, err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
gwr := h.newGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), GatewayResourceInterface(h.smClient, endpoint.Namespace),
gwr, util.Replace(gwr))
if err != nil {
return errors.Wrapf(err, "error processing the remote endpoint creation for %q", endpoint.Name)
}

logger.Infof("GatewayRoute %s from remote endpoint %s: %s", result, endpoint.Name, resource.ToJSON(gwr))
}

return nil
Expand All @@ -81,9 +84,11 @@ func (h *GatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpo
func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
if h.State().IsOnGateway() {
if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
endpoint.Spec.ClusterID, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
aswinsuryan marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoint.Name)
}

logger.Infof("GatewayRoute %s deleted for remote endpoint %s", endpoint.Spec.ClusterID, endpoint.Name)
}

return nil
Expand All @@ -92,6 +97,14 @@ func (h *GatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpo
func (h *GatewayRouteHandler) TransitionToGateway() error {
endpoints := h.State().GetRemoteEndpoints()
for i := range endpoints {
// This piece of code is designed to manage upgrades from a version lower than 0.16.3 to a higher version,
// where we utilize the endpoint name as the identifier for gwr. It can be removed once we stop supporting
// the 0.16 version.
if err := h.smClient.SubmarinerV1().GatewayRoutes(endpoints[i].Namespace).Delete(context.TODO(),
endpoints[i].Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting gatewayRoute %q", endpoints[i].Name)
}

gwr := h.newGatewayRoute(&endpoints[i])

result, err := util.CreateOrUpdate(context.TODO(), GatewayResourceInterface(h.smClient, endpoints[i].Namespace),
Expand All @@ -100,7 +113,7 @@ func (h *GatewayRouteHandler) TransitionToGateway() error {
return errors.Wrapf(err, "error creating/updating GatewayRoute")
}

logger.V(log.TRACE).Infof("GatewayRoute %s: %#v", result, gwr)
logger.Infof("GatewayRoute %s from remote endpoint %s: %s", result, endpoints[i].Name, resource.ToJSON(gwr))
}

return nil
Expand All @@ -109,7 +122,7 @@ func (h *GatewayRouteHandler) TransitionToGateway() error {
func (h *GatewayRouteHandler) newGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.GatewayRoute {
return &submarinerv1.GatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Name: endpoint.Spec.ClusterID,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ = Describe("GatewayRouteHandler", func() {
})

awaitGatewayRoute := func(ep *submarinerv1.Endpoint) {
gwRoute := test.AwaitResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), ep.Name)
gwRoute := test.AwaitResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), ep.Spec.ClusterID)
Expect(gwRoute.RoutePolicySpec.RemoteCIDRs).To(Equal(ep.Spec.Subnets))
Expect(gwRoute.RoutePolicySpec.NextHops).To(Equal([]string{t.mgmntIntfIP}))
}
Expand All @@ -53,7 +53,7 @@ var _ = Describe("GatewayRouteHandler", func() {
awaitGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})

Context("and the GatewayRoute operations initially fail", func() {
Expand All @@ -69,15 +69,15 @@ var _ = Describe("GatewayRouteHandler", func() {
awaitGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})
})

Context("on transition to gateway", func() {
It("should create GatewayRoutes for all remote Endpoints", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster1", "host", "192.0.4.0/24"))
test.EnsureNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.GatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

localEndpoint := t.CreateLocalHostEndpoint()
awaitGatewayRoute(endpoint)
Expand Down
28 changes: 20 additions & 8 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/resource"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
Expand Down Expand Up @@ -85,13 +84,16 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
return nil
}

_, err := h.smClient.SubmarinerV1().
NonGatewayRoutes(endpoint.Namespace).Create(context.TODO(),
h.newNonGatewayRoute(endpoint), metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
ngwr := h.newNonGatewayRoute(endpoint)

result, err := util.CreateOrUpdate(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoint.Namespace),
ngwr, util.Replace(ngwr))
if err != nil {
return errors.Wrapf(err, "error processing the remote endpoint create event for %q", endpoint.Name)
}

logger.Infof("NonGatewayRoute %s from remote endpoint %s: %s", result, endpoint.Name, resource.ToJSON(ngwr))

return nil
}

Expand All @@ -101,10 +103,12 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
}

if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoint.Namespace).Delete(context.TODO(),
endpoint.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
endpoint.Spec.ClusterID, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoint.Name)
}

logger.Infof("NonGatewayRoute %s deleted for remote endpoint %s", endpoint.Spec.ClusterID, endpoint.Name)

return nil
}

Expand All @@ -115,6 +119,14 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {

endpoints := h.State().GetRemoteEndpoints()
for i := range endpoints {
// This piece of code is designed to manage upgrades from a version lower than 0.16.3 to a higher version,
// where we utilize the endpoint name as the identifier for ngwr. It can be removed once we stop supporting
// the 0.16 version.
if err := h.smClient.SubmarinerV1().NonGatewayRoutes(endpoints[i].Namespace).Delete(context.TODO(),
endpoints[i].Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting nonGatewayRoute %q", endpoints[i].Name)
}

ngwr := h.newNonGatewayRoute(&endpoints[i])

result, err := util.CreateOrUpdate(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace),
Expand All @@ -123,7 +135,7 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
return errors.Wrapf(err, "error creating/updating NonGatewayRoute")
}

logger.V(log.TRACE).Infof("NonGatewayRoute %s: %#v", result, ngwr)
logger.Infof("NonGatewayRoute %s from remote endpoint %s: %s", result, endpoints[i].Name, resource.ToJSON(ngwr))
}

return nil
Expand All @@ -132,7 +144,7 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: endpoint.Name,
Name: endpoint.Spec.ClusterID,
Namespace: endpoint.Namespace,
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
})

awaitNonGatewayRoute := func(ep *submarinerv1.Endpoint) {
nonGWRoute := test.AwaitResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), ep.Name)
nonGWRoute := test.AwaitResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), ep.Spec.ClusterID)
Expect(nonGWRoute.RoutePolicySpec.RemoteCIDRs).To(Equal(ep.Spec.Subnets))
Expect(nonGWRoute.RoutePolicySpec.NextHops).To(Equal([]string{t.transitSwitchIP}))
}
Expand All @@ -53,7 +53,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
awaitNonGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})

Context("and the NonGatewayRoute operations initially fail", func() {
Expand All @@ -69,7 +69,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
awaitNonGatewayRoute(endpoint)

t.DeleteEndpoint(endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.AwaitNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})

Expand All @@ -80,7 +80,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {

It("should not create a NonGatewayRoute", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

t.submClient.Fake.ClearActions()
t.DeleteEndpoint(endpoint.Name)
Expand All @@ -92,7 +92,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
Context("on transition to gateway", func() {
It("should create NonGatewayRoutes for all remote Endpoints", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)

localEndpoint := t.CreateLocalHostEndpoint()
awaitNonGatewayRoute(endpoint)
Expand All @@ -112,7 +112,7 @@ var _ = Describe("NonGatewayRouteHandler", func() {
It("should not create any NonGatewayRoutes", func() {
endpoint := t.CreateEndpoint(testing.NewEndpoint("remote-cluster", "host", "193.0.4.0/24"))
t.CreateLocalHostEndpoint()
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Name)
test.EnsureNoResource(ovn.NonGatewayResourceInterface(t.submClient, testing.Namespace), endpoint.Spec.ClusterID)
})
})
})
Expand Down
Loading