Skip to content

Commit

Permalink
Merge pull request #916 from freehan/remove-deprecated-syncer
Browse files Browse the repository at this point in the history
remove batch syncer and move the unit test utils
  • Loading branch information
k8s-ci-robot authored Oct 26, 2019
2 parents 06469b5 + efade12 commit 1707686
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 903 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, neg.NegSyncerType(flags.F.NegSyncerType), flags.F.EnableReadinessReflector, flags.F.EnableCSM, flags.F.CSMServiceNEGSkipNamespaces)
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.EnableCSM, flags.F.CSMServiceNEGSkipNamespaces)

go negController.Run(stopCh)
klog.V(0).Infof("negController started")
Expand Down
2 changes: 0 additions & 2 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ var (
WatchNamespace string
NodePortRanges PortRanges
NegGCPeriod time.Duration
NegSyncerType string
EnableReadinessReflector bool
FinalizerAdd bool
FinalizerRemove bool
Expand Down Expand Up @@ -194,7 +193,6 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.StringVar(&F.LeaderElection.LockObjectName, "lock-object-name", F.LeaderElection.LockObjectName, "Define the name of the lock object.")
flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second,
`Relist and garbage collect NEGs this often.`)
flag.StringVar(&F.NegSyncerType, "neg-syncer-type", "transaction", "Define the NEG syncer type to use. Valid values are \"batch\" and \"transaction\"")
flag.BoolVar(&F.EnableReadinessReflector, "enable-readiness-reflector", true, "Enable NEG Readiness Reflector")
flag.BoolVar(&F.FinalizerAdd, "enable-finalizer-add",
F.FinalizerAdd, "Enable adding Finalizer to Ingress.")
Expand Down
3 changes: 1 addition & 2 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func NewController(
namer negtypes.NetworkEndpointGroupNamer,
resyncPeriod time.Duration,
gcPeriod time.Duration,
negSyncerType NegSyncerType,
enableReadinessReflector bool,
enableCSM bool,
csmServiceNEGSkipNamespaces []string,
Expand All @@ -111,7 +110,7 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})

manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType)
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer())
var reflector readiness.Reflector
if enableReadinessReflector {
reflector = readiness.NewReadinessReflector(ctx, manager)
Expand Down
1 change: 0 additions & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
namer,
1*time.Second,
1*time.Second,
transactionSyncer,
// TODO(freehan): enable readiness reflector for unit tests
false,
true,
Expand Down
58 changes: 16 additions & 42 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (k serviceKey) Key() string {

// syncerManager contains all the active syncer goroutines and manage their lifecycle.
type syncerManager struct {
negSyncerType NegSyncerType

namer negtypes.NetworkEndpointGroupNamer
recorder record.EventRecorder
cloud negtypes.NetworkEndpointGroupCloud
Expand All @@ -67,10 +65,8 @@ type syncerManager struct {
reflector readiness.Reflector
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager {
klog.V(2).Infof("NEG controller will use NEG syncer type: %q", negSyncerType)
func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager {
return &syncerManager{
negSyncerType: negSyncerType,
namer: namer,
recorder: recorder,
cloud: cloud,
Expand Down Expand Up @@ -114,44 +110,22 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
errList := []error{}
// Ensure a syncer is running for each port that is being added.
for svcPort, portInfo := range adds {
syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo)]
syncerKey := getSyncerKey(namespace, name, svcPort, portInfo)
syncer, ok := manager.syncerMap[syncerKey]
if !ok {
syncerKey := negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: svcPort.ServicePort,
TargetPort: portInfo.TargetPort,
Subset: portInfo.Subset,
SubsetLabels: portInfo.SubsetLabels,
}

if manager.negSyncerType == transactionSyncer {
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
portInfo.NegName,
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.podLister,
manager.serviceLister,
manager.endpointLister,
manager.reflector,
)
} else {
// Use batch syncer by default
syncer = negsyncer.NewBatchSyncer(
syncerKey,
portInfo.NegName,
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.serviceLister,
manager.endpointLister,
manager.podLister,
)
}

manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo)] = syncer
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
portInfo.NegName,
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.podLister,
manager.serviceLister,
manager.endpointLister,
manager.reflector,
)

manager.syncerMap[syncerKey] = syncer
}

if syncer.IsStopped() {
Expand Down
1 change: 0 additions & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager {
context.PodInformer.GetIndexer(),
context.ServiceInformer.GetIndexer(),
context.EndpointInformer.GetIndexer(),
transactionSyncer,
)
manager.reflector = readiness.NewReadinessReflector(context, manager)
return manager
Expand Down
Loading

0 comments on commit 1707686

Please sign in to comment.