diff --git a/Makefile b/Makefile index 7cd68c621a..3215faa2de 100644 --- a/Makefile +++ b/Makefile @@ -363,6 +363,15 @@ dist/calico-felix/calico-felix: bin/calico-felix mkdir -p dist/calico-felix/ cp bin/calico-felix dist/calico-felix/calico-felix +# Cross-compile Felix for Windows +bin/calico-felix.exe: $(FELIX_GO_FILES) vendor/.up-to-date + @echo Building felix for Windows... + mkdir -p bin + $(DOCKER_GO_BUILD) \ + sh -c 'GOOS=windows go build -v -o $@ -v $(LDFLAGS) "github.com/projectcalico/felix" && \ + ( ldd $@ 2>&1 | grep -q "Not a valid dynamic program" || \ + ( echo "Error: $@ was not statically linked"; false ) )' + # Install or update the tools used by the build .PHONY: update-tools update-tools: @@ -449,12 +458,12 @@ static-checks: .PHONY: ut-no-cover ut-no-cover: vendor/.up-to-date $(FELIX_GO_FILES) @echo Running Go UTs without coverage. - $(DOCKER_GO_BUILD) ginkgo -r -skipPackage fv,k8sfv $(GINKGO_OPTIONS) + $(DOCKER_GO_BUILD) ginkgo -r -skipPackage fv,k8sfv,windows $(GINKGO_OPTIONS) .PHONY: ut-watch ut-watch: vendor/.up-to-date $(FELIX_GO_FILES) @echo Watching go UTs for changes... - $(DOCKER_GO_BUILD) ginkgo watch -r -skipPackage fv,k8sfv $(GINKGO_OPTIONS) + $(DOCKER_GO_BUILD) ginkgo watch -r -skipPackage fv,k8sfv,windows $(GINKGO_OPTIONS) # Launch a browser with Go coverage stats for the whole project. .PHONY: cover-browser diff --git a/dataplane/driver.go b/dataplane/driver.go new file mode 100644 index 0000000000..089b0d20be --- /dev/null +++ b/dataplane/driver.go @@ -0,0 +1,123 @@ +// +build !windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataplane + +import ( + "net" + "os/exec" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/config" + "github.com/projectcalico/felix/dataplane/external" + "github.com/projectcalico/felix/dataplane/linux" + "github.com/projectcalico/felix/ifacemonitor" + "github.com/projectcalico/felix/ipsets" + "github.com/projectcalico/felix/logutils" + "github.com/projectcalico/felix/rules" + "github.com/projectcalico/libcalico-go/lib/health" +) + +func StartDataplaneDriver(configParams *config.Config, healthAggregator *health.HealthAggregator) (DataplaneDriver, *exec.Cmd) { + if configParams.UseInternalDataplaneDriver { + log.Info("Using internal (linux) dataplane driver.") + // Dedicated mark bits for accept and pass actions. These are long lived bits + // that we use for communicating between chains. + markAccept := configParams.NextIptablesMark() + markPass := configParams.NextIptablesMark() + // Short-lived mark bits for local calculations within a chain. + markScratch0 := configParams.NextIptablesMark() + markScratch1 := configParams.NextIptablesMark() + log.WithFields(log.Fields{ + "acceptMark": markAccept, + "passMark": markPass, + "scratch0Mark": markScratch0, + "scratch1Mark": markScratch1, + }).Info("Calculated iptables mark bits") + dpConfig := intdataplane.Config{ + IfaceMonitorConfig: ifacemonitor.Config{ + InterfaceExcludes: configParams.InterfaceExcludes(), + }, + RulesConfig: rules.Config{ + WorkloadIfacePrefixes: configParams.InterfacePrefixes(), + + IPSetConfigV4: ipsets.NewIPVersionConfig( + ipsets.IPFamilyV4, + rules.IPSetNamePrefix, + rules.AllHistoricIPSetNamePrefixes, + rules.LegacyV4IPSetNames, + ), + IPSetConfigV6: ipsets.NewIPVersionConfig( + ipsets.IPFamilyV6, + rules.IPSetNamePrefix, + rules.AllHistoricIPSetNamePrefixes, + nil, + ), + + OpenStackSpecialCasesEnabled: configParams.OpenstackActive(), + OpenStackMetadataIP: net.ParseIP(configParams.MetadataAddr), + OpenStackMetadataPort: uint16(configParams.MetadataPort), + + IptablesMarkAccept: markAccept, + IptablesMarkPass: markPass, + IptablesMarkScratch0: markScratch0, + IptablesMarkScratch1: markScratch1, + + IPIPEnabled: configParams.IpInIpEnabled, + IPIPTunnelAddress: configParams.IpInIpTunnelAddr, + + IptablesLogPrefix: configParams.LogPrefix, + EndpointToHostAction: configParams.DefaultEndpointToHostAction, + IptablesFilterAllowAction: configParams.IptablesFilterAllowAction, + IptablesMangleAllowAction: configParams.IptablesMangleAllowAction, + + FailsafeInboundHostPorts: configParams.FailsafeInboundHostPorts, + FailsafeOutboundHostPorts: configParams.FailsafeOutboundHostPorts, + + DisableConntrackInvalid: configParams.DisableConntrackInvalidCheck, + }, + IPIPMTU: configParams.IpInIpMtu, + IptablesRefreshInterval: configParams.IptablesRefreshInterval, + RouteRefreshInterval: configParams.RouteRefreshInterval, + IPSetsRefreshInterval: configParams.IpsetsRefreshInterval, + IptablesPostWriteCheckInterval: configParams.IptablesPostWriteCheckIntervalSecs, + IptablesInsertMode: configParams.ChainInsertMode, + IptablesLockFilePath: configParams.IptablesLockFilePath, + IptablesLockTimeout: configParams.IptablesLockTimeoutSecs, + IptablesLockProbeInterval: configParams.IptablesLockProbeIntervalMillis, + MaxIPSetSize: configParams.MaxIpsetSize, + IgnoreLooseRPF: configParams.IgnoreLooseRPF, + IPv6Enabled: configParams.Ipv6Support, + StatusReportingInterval: configParams.ReportingIntervalSecs, + + NetlinkTimeout: configParams.NetlinkTimeoutSecs, + + PostInSyncCallback: func() { logutils.DumpHeapMemoryProfile(configParams) }, + HealthAggregator: healthAggregator, + DebugSimulateDataplaneHangAfter: configParams.DebugSimulateDataplaneHangAfter, + } + intDP := intdataplane.NewIntDataplaneDriver(dpConfig) + intDP.Start() + + return intDP, nil + } else { + log.WithField("driver", configParams.DataplaneDriver).Info( + "Using external dataplane driver.") + + return extdataplane.StartExtDataplaneDriver(configParams.DataplaneDriver) + } +} diff --git a/dataplane/driver_defs.go b/dataplane/driver_defs.go new file mode 100644 index 0000000000..c48f26561e --- /dev/null +++ b/dataplane/driver_defs.go @@ -0,0 +1,20 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataplane + +type DataplaneDriver interface { + SendMessage(msg interface{}) error + RecvMessage() (msg interface{}, err error) +} diff --git a/dataplane/driver_windows.go b/dataplane/driver_windows.go new file mode 100644 index 0000000000..72ea89ffe4 --- /dev/null +++ b/dataplane/driver_windows.go @@ -0,0 +1,39 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataplane + +import ( + "os/exec" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/config" + "github.com/projectcalico/felix/dataplane/windows" + "github.com/projectcalico/libcalico-go/lib/health" +) + +func StartDataplaneDriver(configParams *config.Config, healthAggregator *health.HealthAggregator) (DataplaneDriver, *exec.Cmd) { + log.Info("Using Windows dataplane driver.") + + dpConfig := windataplane.Config{ + IPv6Enabled: configParams.Ipv6Support, + HealthAggregator: healthAggregator, + } + + winDP := windataplane.NewWinDataplaneDriver(dpConfig) + winDP.Start() + + return winDP, nil +} diff --git a/extdataplane/ext_dataplane.go b/dataplane/external/ext_dataplane.go similarity index 100% rename from extdataplane/ext_dataplane.go rename to dataplane/external/ext_dataplane.go diff --git a/intdataplane/endpoint_mgr.go b/dataplane/linux/endpoint_mgr.go similarity index 100% rename from intdataplane/endpoint_mgr.go rename to dataplane/linux/endpoint_mgr.go diff --git a/intdataplane/endpoint_mgr_test.go b/dataplane/linux/endpoint_mgr_test.go similarity index 100% rename from intdataplane/endpoint_mgr_test.go rename to dataplane/linux/endpoint_mgr_test.go diff --git a/intdataplane/floating_ip_mgr.go b/dataplane/linux/floating_ip_mgr.go similarity index 100% rename from intdataplane/floating_ip_mgr.go rename to dataplane/linux/floating_ip_mgr.go diff --git a/intdataplane/floating_ip_mgr_test.go b/dataplane/linux/floating_ip_mgr_test.go similarity index 100% rename from intdataplane/floating_ip_mgr_test.go rename to dataplane/linux/floating_ip_mgr_test.go diff --git a/intdataplane/int_dataplane.go b/dataplane/linux/int_dataplane.go similarity index 96% rename from intdataplane/int_dataplane.go rename to dataplane/linux/int_dataplane.go index f8357deaf6..fbdeab8108 100644 --- a/intdataplane/int_dataplane.go +++ b/dataplane/linux/int_dataplane.go @@ -15,7 +15,6 @@ package intdataplane import ( - "fmt" "io/ioutil" "os" "reflect" @@ -571,7 +570,7 @@ func (d *InternalDataplane) loopUpdatingDataplane() { datastoreInSync := false processMsgFromCalcGraph := func(msg interface{}) { - log.WithField("msg", msgStringer{msg: msg}).Infof( + log.WithField("msg", proto.MsgStringer{Msg: msg}).Infof( "Received %T update from calculation graph", msg) d.recordMsgStat(msg) for _, mgr := range d.allManagers { @@ -943,42 +942,6 @@ type iptablesTable interface { RemoveChainByName(name string) } -// msgStringer wraps an API message to customise how we stringify it. For example, it truncates -// the lists of members in the (potentially very large) IPSetsUpdate messages. -type msgStringer struct { - msg interface{} -} - -func (m msgStringer) String() string { - if log.GetLevel() < log.DebugLevel && m.msg != nil { - const truncateAt = 10 - switch msg := m.msg.(type) { - case *proto.IPSetUpdate: - if len(msg.Members) < truncateAt { - return fmt.Sprintf("%v", msg) - } - return fmt.Sprintf("id:%#v members(%d):%#v(truncated)", - msg.Id, len(msg.Members), msg.Members[:truncateAt]) - case *proto.IPSetDeltaUpdate: - if len(msg.AddedMembers) < truncateAt && len(msg.RemovedMembers) < truncateAt { - return fmt.Sprintf("%v", msg) - } - addedNum := truncateAt - removedNum := truncateAt - if len(msg.AddedMembers) < addedNum { - addedNum = len(msg.AddedMembers) - } - if len(msg.RemovedMembers) < removedNum { - removedNum = len(msg.RemovedMembers) - } - return fmt.Sprintf("id:%#v addedMembers(%d):%#v(truncated) removedMembers(%d):%#v(truncated)", - msg.Id, len(msg.AddedMembers), msg.AddedMembers[:addedNum], - len(msg.RemovedMembers), msg.RemovedMembers[:removedNum]) - } - } - return fmt.Sprintf("%v", m.msg) -} - func (d *InternalDataplane) reportHealth() { if d.config.HealthAggregator != nil { d.config.HealthAggregator.Report( diff --git a/intdataplane/int_dataplane_test.go b/dataplane/linux/int_dataplane_test.go similarity index 98% rename from intdataplane/int_dataplane_test.go rename to dataplane/linux/int_dataplane_test.go index 0c7c73fb77..60ded9111f 100644 --- a/intdataplane/int_dataplane_test.go +++ b/dataplane/linux/int_dataplane_test.go @@ -21,8 +21,8 @@ import ( . "github.com/onsi/gomega" "github.com/projectcalico/felix/config" + "github.com/projectcalico/felix/dataplane/linux" "github.com/projectcalico/felix/ifacemonitor" - "github.com/projectcalico/felix/intdataplane" "github.com/projectcalico/felix/ipsets" "github.com/projectcalico/felix/rules" "github.com/projectcalico/libcalico-go/lib/health" diff --git a/intdataplane/intdataplane_fv_suite_test.go b/dataplane/linux/intdataplane_fv_suite_test.go similarity index 100% rename from intdataplane/intdataplane_fv_suite_test.go rename to dataplane/linux/intdataplane_fv_suite_test.go diff --git a/intdataplane/intdataplane_ut_suite_test.go b/dataplane/linux/intdataplane_ut_suite_test.go similarity index 100% rename from intdataplane/intdataplane_ut_suite_test.go rename to dataplane/linux/intdataplane_ut_suite_test.go diff --git a/intdataplane/ipip_mgr.go b/dataplane/linux/ipip_mgr.go similarity index 100% rename from intdataplane/ipip_mgr.go rename to dataplane/linux/ipip_mgr.go diff --git a/intdataplane/ipip_mgr_netlink.go b/dataplane/linux/ipip_mgr_netlink.go similarity index 100% rename from intdataplane/ipip_mgr_netlink.go rename to dataplane/linux/ipip_mgr_netlink.go diff --git a/intdataplane/ipip_mgr_test.go b/dataplane/linux/ipip_mgr_test.go similarity index 100% rename from intdataplane/ipip_mgr_test.go rename to dataplane/linux/ipip_mgr_test.go diff --git a/intdataplane/ipsets_mgr.go b/dataplane/linux/ipsets_mgr.go similarity index 100% rename from intdataplane/ipsets_mgr.go rename to dataplane/linux/ipsets_mgr.go diff --git a/intdataplane/ipsets_mgr_test.go b/dataplane/linux/ipsets_mgr_test.go similarity index 100% rename from intdataplane/ipsets_mgr_test.go rename to dataplane/linux/ipsets_mgr_test.go diff --git a/intdataplane/masq_mgr.go b/dataplane/linux/masq_mgr.go similarity index 100% rename from intdataplane/masq_mgr.go rename to dataplane/linux/masq_mgr.go diff --git a/intdataplane/masq_mgr_test.go b/dataplane/linux/masq_mgr_test.go similarity index 100% rename from intdataplane/masq_mgr_test.go rename to dataplane/linux/masq_mgr_test.go diff --git a/intdataplane/mock_ipsets_for_test.go b/dataplane/linux/mock_ipsets_for_test.go similarity index 100% rename from intdataplane/mock_ipsets_for_test.go rename to dataplane/linux/mock_ipsets_for_test.go diff --git a/intdataplane/mock_iptables_for_test.go b/dataplane/linux/mock_iptables_for_test.go similarity index 100% rename from intdataplane/mock_iptables_for_test.go rename to dataplane/linux/mock_iptables_for_test.go diff --git a/intdataplane/policy_mgr.go b/dataplane/linux/policy_mgr.go similarity index 100% rename from intdataplane/policy_mgr.go rename to dataplane/linux/policy_mgr.go diff --git a/intdataplane/policy_mgr_test.go b/dataplane/linux/policy_mgr_test.go similarity index 100% rename from intdataplane/policy_mgr_test.go rename to dataplane/linux/policy_mgr_test.go diff --git a/intdataplane/status_combiner.go b/dataplane/linux/status_combiner.go similarity index 100% rename from intdataplane/status_combiner.go rename to dataplane/linux/status_combiner.go diff --git a/intdataplane/status_combiner_test.go b/dataplane/linux/status_combiner_test.go similarity index 100% rename from intdataplane/status_combiner_test.go rename to dataplane/linux/status_combiner_test.go diff --git a/dataplane/windows/endpoint_mgr.go b/dataplane/windows/endpoint_mgr.go new file mode 100644 index 0000000000..bdc0c94aac --- /dev/null +++ b/dataplane/windows/endpoint_mgr.go @@ -0,0 +1,304 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane + +import ( + "errors" + "os" + "strings" + "time" + + hns "github.com/Microsoft/hcsshim" + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/dataplane/windows/policysets" + "github.com/projectcalico/felix/proto" +) + +const ( + // cacheTimeout specifies the time after which our hns endpoint id cache + // will be considered stale and need to be resync'd with the dataplane. + cacheTimeout = time.Duration(10 * time.Minute) + // suffix to use for IPv4 addresses. + ipv4AddrSuffix = "/32" + // envNetworkName specifies the environment variable which should be read + // to obtain the name of the hns network for which we will be managing + // endpoint policies. + envNetworkName = "KUBE_NETWORK" + // the default hns network name to use if the envNetworkName environment + // variable does not resolve to a value + defaultNetworkName = "l2bridge" +) + +var ( + ErrorUnknownEndpoint = errors.New("Endpoint could not be found") + ErrorUpdateFailed = errors.New("Endpoint update failed") +) + +// endpointManager processes WorkloadEndpoint* updates from the datastore. Updates are +// stored and pended for processing during CompleteDeferredWork. endpointManager is also +// responsible for orchestrating a refresh of all impacted endpoints after a IPSet update. +type endpointManager struct { + // the name of the hns network for which we will be managing endpoint policies. + hnsNetworkName string + // the policysets dataplane to be used when looking up endpoint policies/profiles. + policysetsDataplane policysets.PolicySetsDataplane + // pendingWlEpUpdates stores any pending updates to be performed per endpoint. + pendingWlEpUpdates map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint + // activeWlEndpoints stores the active/current state that was applied per endpoint + activeWlEndpoints map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint + // addressToEndpointId serves as a hns endpoint id cache. It enables us to lookup the hns + // endpoint id for a given endpoint ip address. + addressToEndpointId map[string]string + // lastCacheUpdate records the last time that the addressToEndpointId map was refreshed. + lastCacheUpdate time.Time +} + +func newEndpointManager(policysets policysets.PolicySetsDataplane) *endpointManager { + var networkName string + if os.Getenv(envNetworkName) != "" { + networkName = os.Getenv(envNetworkName) + log.WithField("NetworkName", networkName).Info("Setting hns network name from environment variable") + } else { + networkName = defaultNetworkName + log.WithField("NetworkName", networkName).Info("No Network Name environment variable was found, using default name") + } + + return &endpointManager{ + hnsNetworkName: networkName, + policysetsDataplane: policysets, + addressToEndpointId: make(map[string]string), + activeWlEndpoints: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, + pendingWlEpUpdates: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, + } +} + +// OnUpdate is called by the main dataplane driver loop during the first phase. It processes +// specific types of updates from the datastore. +func (m *endpointManager) OnUpdate(msg interface{}) { + switch msg := msg.(type) { + case *proto.WorkloadEndpointUpdate: + log.WithField("workloadEndpointId", msg.Id).Info("Processing WorkloadEndpointUpdate") + m.pendingWlEpUpdates[*msg.Id] = msg.Endpoint + case *proto.WorkloadEndpointRemove: + log.WithField("workloadEndpointId", msg.Id).Info("Processing WorkloadEndpointRemove") + m.pendingWlEpUpdates[*msg.Id] = nil + case *proto.IPSetUpdate: + log.WithField("ipSetId", msg.Id).Info("Processing IPSetUpdate") + m.ProcessIpSetUpdate(msg.Id) + case *proto.IPSetDeltaUpdate: + log.WithField("ipSetId", msg.Id).Info("Processing IPSetDeltaUpdate") + m.ProcessIpSetUpdate(msg.Id) + } +} + +// RefreshHnsEndpointCache refreshes the hns endpoint id cache if enough time has passed since the +// last refresh or if a forceRefresh is requested (may happen if the endpointManager determines that +// a required endpoint id is not present in the cache). +func (m *endpointManager) RefreshHnsEndpointCache(forceRefresh bool) error { + if !forceRefresh && (time.Since(m.lastCacheUpdate) < cacheTimeout) { + return nil + } + + log.Info("Refreshing the endpoint cache") + endpoints, err := hns.HNSListEndpointRequest() + if err != nil { + log.Infof("Failed to obtain HNS endpoints: %v", err) + return err + } + + log.Debug("Clearing the endpoint cache") + m.addressToEndpointId = make(map[string]string) + + for _, endpoint := range endpoints { + if strings.ToLower(endpoint.VirtualNetworkName) == strings.ToLower(m.hnsNetworkName) { + ip := endpoint.IPAddress.String() + ipv4AddrSuffix + log.WithFields(log.Fields{"IPAddress": ip, "EndpointId": endpoint.Id}).Debug("Adding HNS Endpoint Id entry to cache") + m.addressToEndpointId[ip] = endpoint.Id + } + } + + log.Infof("Cache refresh is complete. %v endpoints were cached", len(m.addressToEndpointId)) + m.lastCacheUpdate = time.Now() + + return nil +} + +// ProcessIpSetUpdate is called when a IPSet has changed. The ipSetsManager will have already updated +// the IPSet itself, but the endpointManager is responsible for requesting all impacted policy sets +// to be updated and for marking all impacted endpoints as pending so that updated policies can be +// pushed to them. +func (m *endpointManager) ProcessIpSetUpdate(ipSetId string) { + log.WithField("ipSetId", ipSetId).Debug("Requesting PolicySetsDataplane to process the IP set update") + updatedPolicies := m.policysetsDataplane.ProcessIpSetUpdate(ipSetId) + if updatedPolicies == nil { + return + } + + log.Debugf("Checking if any active endpoint policies need to be refreshed") + for endpointId, workload := range m.activeWlEndpoints { + if _, present := m.pendingWlEpUpdates[endpointId]; present { + // skip this endpoint as it is already marked as pending update + continue + } + + var activePolicyNames []string + if len(workload.Tiers) > 0 { + activePolicyNames = append(activePolicyNames, workload.Tiers[0].IngressPolicies...) + activePolicyNames = append(activePolicyNames, workload.Tiers[0].EgressPolicies...) + } else { + if len(workload.ProfileIds) > 0 { + activePolicyNames = workload.ProfileIds + } + } + + Policies: + for _, policyName := range activePolicyNames { + for _, updatedPolicy := range updatedPolicies { + if policyName == updatedPolicy { + log.WithFields(log.Fields{"policyName": policyName, "endpointId": endpointId}).Info("Endpoint is being marked for policy refresh") + m.pendingWlEpUpdates[endpointId] = workload + break Policies + } + } + } + } +} + +// CompleteDeferredWork will apply all pending updates by gathering the rules to be updated per +// endpoint and communicating them to hns. Note that CompleteDeferredWork is called during the +// second phase of the main dataplane driver loop, so all IPSet/Policy/Profile/Workload updates +// have already been processed by the various managers and we should now have a complete picture +// of the policy/rules to be applied for each pending endpoint. +func (m *endpointManager) CompleteDeferredWork() error { + if len(m.pendingWlEpUpdates) > 0 { + m.RefreshHnsEndpointCache(false) + } + + // Loop through each pending update + for id, workload := range m.pendingWlEpUpdates { + logCxt := log.WithField("id", id) + + var policyNames []string + var endpointId string + + // A non-nil workload indicates this is a pending add or update operation + if workload != nil { + for _, ip := range workload.Ipv4Nets { + var err error + logCxt.WithField("ip", ip).Debug("Resolving workload ip to hns endpoint Id") + endpointId, err = m.getHnsEndpointId(ip) + if err == nil && endpointId != "" { + // Resolution was successful + break + } + } + if endpointId == "" { + // Failed to find the associated hns endpoint id + return ErrorUnknownEndpoint + } + + logCxt.Info("Processing endpoint add/update") + + if len(workload.Tiers) > 0 { + logCxt.Debug("Workload tiers are present - Policies will be applied") + policyNames = append(policyNames, workload.Tiers[0].IngressPolicies...) + policyNames = append(policyNames, workload.Tiers[0].EgressPolicies...) + } else { + if len(workload.ProfileIds) > 0 { + logCxt.Debug("Workload tiers are not present - Profiles will be applied") + policyNames = workload.ProfileIds + } + } + + err := m.applyRules(id, endpointId, policyNames) + if err != nil { + // Failed to apply, this will be rescheduled and retried + return err + } + + m.activeWlEndpoints[id] = workload + delete(m.pendingWlEpUpdates, id) + } else { + // For now, we don't need to do anything. As the endpoint is being removed, HNS will automatically + // handle the removal of any associated policies from the dataplane for us + logCxt.Info("Processing endpoint removal") + delete(m.activeWlEndpoints, id) + delete(m.pendingWlEpUpdates, id) + } + } + + return nil +} + +// applyRules gathers all of the rules for the specified policies and sends them to hns +// as an endpoint policy update (this actually applies the rules to the dataplane). +func (m *endpointManager) applyRules(workloadId proto.WorkloadEndpointID, endpointId string, policyNames []string) error { + logCxt := log.WithFields(log.Fields{"id": workloadId, "endpointId": endpointId}) + logCxt.WithField("policies", policyNames).Info("Applying endpoint rules") + + var rules []*hns.ACLPolicy + if len(policyNames) > 0 { + rules = m.policysetsDataplane.GetPolicySetRules(policyNames) + if log.GetLevel() >= log.DebugLevel { + for _, rule := range rules { + logCxt.WithField("rule", rule).Debug("Complete set of rules to be applied") + } + } + } else { + logCxt.Info("No policies/profiles were specified, all rules will be removed from this endpoint") + } + + logCxt.Debug("Sending request to hns to apply the rules") + + endpoint := &hns.HNSEndpoint{} + endpoint.Id = endpointId + + if err := endpoint.ApplyACLPolicy(rules...); err != nil { + logCxt.WithError(err).Warning("Failed to apply rules. This operation will be retried.") + return ErrorUpdateFailed + } + + return nil +} + +// getHnsEndpointId retrieves the hns endpoint id for the given ip address. First, a cache lookup +// is performed. If no entry is found in the cache, then we will attempt to refresh the cache. If +// the id is still not found, we fail and let the caller implement any needed retry/backoff logic. +func (m *endpointManager) getHnsEndpointId(ip string) (string, error) { + allowRefresh := true + for { + // First check the endpoint cache + id, ok := m.addressToEndpointId[ip] + if ok { + log.WithFields(log.Fields{"ip": ip, "id": id}).Info("Resolved hns endpoint id") + return id, nil + } + + if allowRefresh { + // No cached entry was found, force refresh the cache and check again + log.WithField("ip", ip).Debug("Cache miss, requesting a cache refresh") + allowRefresh = false + m.RefreshHnsEndpointCache(true) + continue + } + break + } + + log.WithField("ip", ip).Info("Could not resolve hns endpoint id") + return "", ErrorUnknownEndpoint +} diff --git a/dataplane/windows/ipsets/ipset_defs.go b/dataplane/windows/ipsets/ipset_defs.go new file mode 100644 index 0000000000..62f1434b2d --- /dev/null +++ b/dataplane/windows/ipsets/ipset_defs.go @@ -0,0 +1,88 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipsets + +import ( + "github.com/projectcalico/libcalico-go/lib/set" +) + +// IPSetMetadata contains the metadata for a particular IP set, such as its name and type. +type IPSetMetadata struct { + SetID string + Type IPSetType +} + +// IPSetsDataplane is interface for managing a plane of ipSet objects. +type IPSetsDataplane interface { + AddOrReplaceIPSet(setMetadata IPSetMetadata, members []string) + AddMembers(setID string, newMembers []string) + RemoveMembers(setID string, removedMembers []string) + RemoveIPSet(setID string) +} + +// IPSetType constants for the different kinds of IP set. +type IPSetType string + +const ( + IPSetTypeHashIP IPSetType = "hash:ip" + IPSetTypeHashNet IPSetType = "hash:net" +) + +func (t IPSetType) SetType() string { + return string(t) +} + +func (t IPSetType) IsValid() bool { + switch t { + case IPSetTypeHashIP, IPSetTypeHashNet: + return true + } + return false +} + +// IPFamily constants to represent the IP family being managed by this IPSet +type IPFamily string + +const ( + IPFamilyV4 = IPFamily("inet") + IPFamilyV6 = IPFamily("inet6") +) + +func (f IPFamily) IsValid() bool { + switch f { + case IPFamilyV4, IPFamilyV6: + return true + } + return false +} + +// ipSet holds the state for a particular IP set. +type ipSet struct { + IPSetMetadata + Members set.Set +} + +// IPVersionConfig wraps up the metadata for a particular IP version. +type IPVersionConfig struct { + Family IPFamily +} + +func NewIPVersionConfig(family IPFamily) *IPVersionConfig { + return &IPVersionConfig{ + Family: family, + } +} diff --git a/dataplane/windows/ipsets/ipsets.go b/dataplane/windows/ipsets/ipsets.go new file mode 100644 index 0000000000..f4a13db485 --- /dev/null +++ b/dataplane/windows/ipsets/ipsets.go @@ -0,0 +1,141 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipsets + +import ( + "strings" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/libcalico-go/lib/set" +) + +// IPSets manages a whole plane of IP sets, i.e. all the IPv4 sets, or all the IPv6 IP sets. +type IPSets struct { + IPVersionConfig *IPVersionConfig + ipSetIDToIPSet map[string]*ipSet + logCxt *log.Entry +} + +func NewIPSets(ipVersionConfig *IPVersionConfig) *IPSets { + return &IPSets{ + IPVersionConfig: ipVersionConfig, + ipSetIDToIPSet: map[string]*ipSet{}, + logCxt: log.WithFields(log.Fields{ + "family": ipVersionConfig.Family, + }), + } +} + +// AddOrReplaceIPSet is responsible for the creation (or replacement) of an IP set in the store +func (s *IPSets) AddOrReplaceIPSet(setMetadata IPSetMetadata, members []string) { + s.logCxt.WithFields(log.Fields{ + "setID": setMetadata.SetID, + "setType": setMetadata.Type, + }).Info("Creating IP set") + filteredMembers := s.filterMembers(members) + + // Create the IP set struct and stores it by id + setID := setMetadata.SetID + ipSet := &ipSet{ + IPSetMetadata: setMetadata, + Members: filteredMembers, + } + s.ipSetIDToIPSet[setID] = ipSet +} + +// RemoveIPSet is responsible for the removal of an IP set from the store +func (s *IPSets) RemoveIPSet(setID string) { + s.logCxt.WithField("setID", setID).Info("Removing IP set") + delete(s.ipSetIDToIPSet, setID) +} + +// AddMembers adds a range of new members to an existing IP set in the store +func (s *IPSets) AddMembers(setID string, newMembers []string) { + if len(newMembers) == 0 { + return + } + + ipSet := s.ipSetIDToIPSet[setID] + filteredMembers := s.filterMembers(newMembers) + if filteredMembers.Len() == 0 { + return + } + s.logCxt.WithFields(log.Fields{ + "setID": setID, + "filteredMembers": filteredMembers, + }).Info("Adding new members to IP set") + filteredMembers.Iter(func(m interface{}) error { + ipSet.Members.Add(m) + return nil + }) +} + +// RemoveMembers removes a range of members from an existing IP set in the store +func (s *IPSets) RemoveMembers(setID string, removedMembers []string) { + if len(removedMembers) == 0 { + return + } + + ipSet := s.ipSetIDToIPSet[setID] + filteredMembers := s.filterMembers(removedMembers) + if filteredMembers.Len() == 0 { + return + } + s.logCxt.WithFields(log.Fields{ + "setID": setID, + "filteredMembers": filteredMembers, + }).Info("Removing members from IP set") + + filteredMembers.Iter(func(m interface{}) error { + ipSet.Members.Discard(m) + return nil + }) +} + +// GetIPSetMembers returns all of the members for a given IP set +func (s *IPSets) GetIPSetMembers(setID string) []string { + var retVal []string + + ipSet := s.ipSetIDToIPSet[setID] + if ipSet == nil { + return nil + } + + ipSet.Members.Iter(func(item interface{}) error { + member := item.(string) + retVal = append(retVal, member) + return nil + }) + + return retVal +} + +// filterMembers filters out any members which are not of the correct +// ip family for the IPSet +func (s *IPSets) filterMembers(members []string) set.Set { + filtered := set.New() + wantIPV6 := s.IPVersionConfig.Family == IPFamilyV6 + for _, member := range members { + isIPV6 := strings.Index(member, ":") >= 0 + if wantIPV6 != isIPV6 { + continue + } + filtered.Add(member) + } + return filtered +} diff --git a/dataplane/windows/ipsets_mgr.go b/dataplane/windows/ipsets_mgr.go new file mode 100644 index 0000000000..0171fab044 --- /dev/null +++ b/dataplane/windows/ipsets_mgr.go @@ -0,0 +1,62 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane + +import ( + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/dataplane/windows/ipsets" + "github.com/projectcalico/felix/proto" +) + +// ipSetsManager simply passes through IP set updates from the datastore to the ipsets.IPSets +// dataplane layer. +type ipSetsManager struct { + ipsetsDataplane ipsets.IPSetsDataplane +} + +func newIPSetsManager(ipsets ipsets.IPSetsDataplane) *ipSetsManager { + return &ipSetsManager{ + ipsetsDataplane: ipsets, + } +} + +// OnUpdate is called by the main dataplane driver loop during the first phase. It processes +// specific types of updates from the datastore. +func (m *ipSetsManager) OnUpdate(msg interface{}) { + switch msg := msg.(type) { + case *proto.IPSetDeltaUpdate: + log.WithField("ipSetId", msg.Id).Info("Processing IPSetDeltaUpdate") + m.ipsetsDataplane.AddMembers(msg.Id, msg.AddedMembers) + m.ipsetsDataplane.RemoveMembers(msg.Id, msg.RemovedMembers) + case *proto.IPSetUpdate: + log.WithField("ipSetId", msg.Id).Info("Processing IPSetUpdate") + metadata := ipsets.IPSetMetadata{ + Type: ipsets.IPSetTypeHashIP, + SetID: msg.Id, + } + m.ipsetsDataplane.AddOrReplaceIPSet(metadata, msg.Members) + case *proto.IPSetRemove: + log.WithField("ipSetId", msg.Id).Info("Processing IPSetRemove") + m.ipsetsDataplane.RemoveIPSet(msg.Id) + } +} + +func (m *ipSetsManager) CompleteDeferredWork() error { + // Nothing to do, we don't defer any work. + return nil +} diff --git a/dataplane/windows/policy_mgr.go b/dataplane/windows/policy_mgr.go new file mode 100644 index 0000000000..ee3e596e02 --- /dev/null +++ b/dataplane/windows/policy_mgr.go @@ -0,0 +1,60 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane + +import ( + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/dataplane/windows/policysets" + "github.com/projectcalico/felix/proto" +) + +// policyManager simply passes through Policy and Profile updates from the datastore to the +// PolicySets dataplane layer. +type policyManager struct { + policysetsDataplane policysets.PolicySetsDataplane +} + +func newPolicyManager(policysets policysets.PolicySetsDataplane) *policyManager { + return &policyManager{ + policysetsDataplane: policysets, + } +} + +// OnUpdate is called by the main dataplane driver loop during the first phase. It processes +// specific types of updates from the datastore. +func (m *policyManager) OnUpdate(msg interface{}) { + switch msg := msg.(type) { + case *proto.ActivePolicyUpdate: + log.WithField("policyID", msg.Id).Info("Processing ActivePolicyUpdate") + m.policysetsDataplane.AddOrReplacePolicySet(policysets.PolicyNamePrefix+msg.Id.Name, msg.Policy) + case *proto.ActivePolicyRemove: + log.WithField("policyID", msg.Id).Info("Processing ActivePolicyRemove") + m.policysetsDataplane.RemovePolicySet(policysets.PolicyNamePrefix + msg.Id.Name) + case *proto.ActiveProfileUpdate: + log.WithField("profileId", msg.Id).Info("Processing ActiveProfileUpdate") + m.policysetsDataplane.AddOrReplacePolicySet(policysets.ProfileNamePrefix+msg.Id.Name, msg.Profile) + case *proto.ActiveProfileRemove: + log.WithField("profileId", msg.Id).Info("Processing ActiveProfileRemove") + m.policysetsDataplane.RemovePolicySet(policysets.ProfileNamePrefix + msg.Id.Name) + } +} + +func (m *policyManager) CompleteDeferredWork() error { + // Nothing to do, we don't defer any work. + return nil +} diff --git a/dataplane/windows/policysets/policyset_defs.go b/dataplane/windows/policysets/policyset_defs.go new file mode 100644 index 0000000000..b1dc520bd8 --- /dev/null +++ b/dataplane/windows/policysets/policyset_defs.go @@ -0,0 +1,87 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policysets + +import ( + "errors" + + hns "github.com/Microsoft/hcsshim" + + "github.com/projectcalico/libcalico-go/lib/set" +) + +const ( + // the ip family of this policy set, currently set to V4. + // V6 will be added once dataplane support is available. + ipVersion uint8 = 4 + // default dataplane rule priority for any rules generated + // from a Policy set. + rulePriority uint16 = 1000 + // prefix to use for all policy names + PolicyNamePrefix string = "policy-" + // prefix to use for all profile names + ProfileNamePrefix string = "profile-" +) + +var ( + SkipRule = errors.New("Rule skipped") + MissingSet = errors.New("Missing IPSet") +) + +// PolicySetType constants for the different kinds of Policy set. +type PolicySetType string + +const ( + PolicySetTypePolicy PolicySetType = "policy" + PolicySetTypeProfile PolicySetType = "profile" +) + +func (t PolicySetType) SetType() string { + return string(t) +} + +// PolicySetMetadata contains the metadata for a particular Policy set, such as its name and type. +type PolicySetMetadata struct { + SetId string + Type PolicySetType +} + +// PolicySetsDataplane is a interface for managing a plane of policySet objects +type PolicySetsDataplane interface { + AddOrReplacePolicySet(setId string, policy interface{}) + RemovePolicySet(setId string) + NewRule(isInbound bool, priority uint16) *hns.ACLPolicy + GetPolicySetRules(setIds []string) (rules []*hns.ACLPolicy) + ProcessIpSetUpdate(ipSetId string) []string +} + +// policySet holds the state for a particular Policy set. +type policySet struct { + // metadata for the Policy set. + PolicySetMetadata + // the original policy received from the datatore, which could be + // either a Profile or a Policy. + Policy interface{} + // Each member of the Policy set is a hns ACLRule computed from the + // Policy. When this Policy set needs to be applied, this set of + // rules is what will be sent to hns for enforcement. + Members set.Set + // The set of IP set ids which are referenced by this Policy set. We + // maintain this to make it easier to look up which Policy sets are + // impacted (in need of recomputation) after a IP set update occurs. + IpSetIds set.Set +} diff --git a/dataplane/windows/policysets/policysets.go b/dataplane/windows/policysets/policysets.go new file mode 100644 index 0000000000..f081897cd1 --- /dev/null +++ b/dataplane/windows/policysets/policysets.go @@ -0,0 +1,561 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policysets + +import ( + "strings" + + hns "github.com/Microsoft/hcsshim" + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/dataplane/windows/ipsets" + "github.com/projectcalico/felix/proto" + "github.com/projectcalico/libcalico-go/lib/set" +) + +// PolicySets manages a whole plane of policies/profiles +type PolicySets struct { + policySetIdToPolicySet map[string]*policySet + + IpSets []*ipsets.IPSets + + resyncRequired bool +} + +func NewPolicySets(ipsets []*ipsets.IPSets) *PolicySets { + return &PolicySets{ + policySetIdToPolicySet: map[string]*policySet{}, + IpSets: ipsets, + } +} + +// AddOrReplacePolicySet is responsible for the creation (or replacement) of a Policy set +// and it is capable of processing either Profiles or Policies from the datastore. +func (s *PolicySets) AddOrReplacePolicySet(setId string, policy interface{}) { + log.WithField("setID", setId).Info("Processing add/replace of Policy set") + + // Process the policy/profile from the datastore and convert it into + // equivalent rules which can be communicated to hns for enforcement in the + // dataplane. We compute these rules up front and cache them to avoid needing + // to recompute them each time the policy is applied to an endpoint. We also + // keep track of any IP sets which were referenced by the policy/profile so that + // we can easily tell which Policy sets are impacted when a IP set is modified. + var rules []*hns.ACLPolicy + var policyIpSetIds set.Set + + setMetadata := PolicySetMetadata{ + SetId: setId, + } + + switch p := policy.(type) { + case *proto.Policy: + // Incoming datastore object is a Policy + rules = s.convertPolicyToRules(setId, p.InboundRules, p.OutboundRules) + policyIpSetIds = getReferencedIpSetIds(p.InboundRules, p.OutboundRules) + setMetadata.Type = PolicySetTypePolicy + case *proto.Profile: + // Incoming datastore object is a Profile + rules = s.convertPolicyToRules(setId, p.InboundRules, p.OutboundRules) + policyIpSetIds = getReferencedIpSetIds(p.InboundRules, p.OutboundRules) + setMetadata.Type = PolicySetTypeProfile + } + + // Create the struct and store it off + policySet := &policySet{ + PolicySetMetadata: setMetadata, + Policy: policy, + Members: set.FromArray(rules), + IpSetIds: policyIpSetIds, + } + s.policySetIdToPolicySet[setMetadata.SetId] = policySet +} + +// RemovePolicySet is responsible for the removal of a Policy set +func (s *PolicySets) RemovePolicySet(setId string) { + log.WithField("setId", setId).Info("Processing removal of Policy set") + delete(s.policySetIdToPolicySet, setId) +} + +// GetPolicySetRules receives a list of Policy set ids and it computes the complete +// set of resultant hns rules which are needed to enforce all of the Policy sets. As +// the Policy sets are processed, we increment a priority number and assign it to each rule +// from the current set. By incremening the rule priority for each set, we ensure that all of +// the sets will be enforced and considered by the dataplane in the order intended by felix. +// Once all rules are gathered, we add a final pair of rules to default deny any traffic which +// has not matched any rules from any Policy sets. +func (s *PolicySets) GetPolicySetRules(setIds []string) (rules []*hns.ACLPolicy) { + // Rules from the first set will receive the default rule priority + currentPriority := rulePriority + + for _, setId := range setIds { + log.WithField("setId", setId).Debug("Gathering rules for policy set") + + policySet := s.policySetIdToPolicySet[setId] + if policySet == nil { + log.WithField("setId", setId).Error("Unable to find Policy set, this set will be skipped") + continue + } + + policySet.Members.Iter(func(item interface{}) error { + member := item.(*hns.ACLPolicy) + member.Priority = currentPriority + rules = append(rules, member) + return nil + }) + + // Increment the priority so that rules from the next set will be 'weaker' priority + // and therefore considered only after this current set of rules has failed to match. + currentPriority += 1 + } + + // Apply a default block rule for each direction at the end of the policy + rules = append(rules, s.NewRule(true, currentPriority), s.NewRule(false, currentPriority)) + + // Finally, for RS3 only, add default allow rules with a host-scope to allow traffic through + // the host windows firewall + rules = append(rules, s.NewHostRule(true), s.NewHostRule(false)) + + return +} + +// ProcessIpSetUpdate locates any Policy set(s) which reference the provided IP set, and causes +// those Policy sets to be recomputed (to ensure any rule address conditions are using the latest +// addres values from the IP set). A list of the Policy sets which were found and recomputed are +// is returned to the caller. +func (s *PolicySets) ProcessIpSetUpdate(ipSetId string) []string { + log.WithField("IPSetId", ipSetId).Info("IP set has changed, looking for associated policies") + stalePolicies := s.getPoliciesByIpSetId(ipSetId) + if len(stalePolicies) == 0 { + return nil + } + + log.WithFields(log.Fields{"IPSetId": ipSetId, "Policies": stalePolicies}).Info("Associated policies need to be refreshed") + for _, policyId := range stalePolicies { + policySet := s.policySetIdToPolicySet[policyId] + if policySet == nil { + log.WithFields(log.Fields{"IPSetId": ipSetId, "Policy": policyId}).Error("Unable to find Policy set, this set will be skipped") + continue + } + + s.AddOrReplacePolicySet(policySet.PolicySetMetadata.SetId, policySet.Policy) + } + + // Return the policies which were recalculated as a result of this update + return stalePolicies +} + +// getPoliciesByIpSetId locates any Policy set(s) which reference the provided IP set +func (s *PolicySets) getPoliciesByIpSetId(ipSetId string) (policies []string) { + for policySetId, policySet := range s.policySetIdToPolicySet { + policySet.IpSetIds.Iter(func(item interface{}) error { + id := item.(string) + if id == ipSetId { + policies = append(policies, policySetId) + } + return nil + }) + } + return +} + +// getReferencedIpSetIds returns all of the IP sets which are referenced by the provided +// inbound and outbound proto Rules. +func getReferencedIpSetIds(inboundRules []*proto.Rule, outboundRules []*proto.Rule) set.Set { + var rules []*proto.Rule + rules = append(rules, inboundRules...) + rules = append(rules, outboundRules...) + + ipSetIds := set.New() + for _, rule := range rules { + ipSetIds.AddAll(rule.SrcIpSetIds) + ipSetIds.AddAll(rule.DstIpSetIds) + } + + return ipSetIds +} + +// convertPolicyToRules converts the provided inbound and outbound proto rules into hns rules. +func (s *PolicySets) convertPolicyToRules(policyId string, inboundRules []*proto.Rule, outboundRules []*proto.Rule) (hnsRules []*hns.ACLPolicy) { + log.WithField("policyId", policyId).Debug("ConvertPolicyToRules") + + inbound := s.protoRulesToHnsRules(policyId, inboundRules, true) + hnsRules = append(hnsRules, inbound...) + + outbound := s.protoRulesToHnsRules(policyId, outboundRules, false) + hnsRules = append(hnsRules, outbound...) + + for _, rule := range hnsRules { + log.WithFields(log.Fields{"policyId": policyId, "rule": rule}).Debug("ConvertPolicyToRules final rule output") + } + + return +} + +// protoRulesToHnsRules converts a set of proto rules into hns rules. +func (s *PolicySets) protoRulesToHnsRules(policyId string, protoRules []*proto.Rule, isInbound bool) (rules []*hns.ACLPolicy) { + log.WithField("policyId", policyId).Debug("protoRulesToHnsRules") + + for _, protoRule := range protoRules { + hnsRules, err := s.protoRuleToHnsRules(policyId, protoRule, isInbound) + if err != nil { + switch err { + case SkipRule: + log.WithField("rule", protoRule).Info("Rule was skipped") + default: + log.WithField("rule", protoRule).Infof("Rule could not be converted, error: %v", err) + } + continue + } + rules = append(rules, hnsRules...) + } + + return +} + +// protoRuleToHnsRules converts a proto rule into equivalent hns rules (one or more resultant rules). For Windows RS3, +// there are a few limitations to be aware of: +// +// The following types of rules are not supported in this release and will be logged+skipped: +// Rules with: Negative match criteria, Actions other than 'allow' or 'deny', Port ranges, and ICMP type/codes. +// +func (s *PolicySets) protoRuleToHnsRules(policyId string, pRule *proto.Rule, isInbound bool) ([]*hns.ACLPolicy, error) { + log.WithField("policyId", policyId).Debug("protoRuleToHnsRules") + + // Check IpVersion + if pRule.IpVersion != 0 && pRule.IpVersion != proto.IPVersion(ipVersion) { + log.WithField("rule", pRule).Info("Skipping rule because it is for an unsupported IP version.") + return nil, SkipRule + } + + // Skip rules with negative match criteria, these are not supported in this version + if ruleHasNegativeMatches(pRule) { + log.WithField("rule", pRule).Info("Skipping rule because it contains negative matches (currently unsupported).") + return nil, SkipRule + } + + // Skip rules with port ranges, only a single port is supported in this version + if portsContainRanges(pRule.SrcPorts) || portsContainRanges(pRule.DstPorts) { + log.WithField("rule", pRule).Info("Skipping rule because it contains port ranges (currently unsupported).") + return nil, SkipRule + } + + // Skip rules with ICMP type/codes, these are not supported + if pRule.Icmp != nil { + log.WithField("rule", pRule).Info("Skipping rule because it contains ICMP type or code (currently unsupported).") + return nil, SkipRule + } + + // Skip rules with name port ipsets + if len(pRule.SrcNamedPortIpSetIds) > 0 || len(pRule.DstNamedPortIpSetIds) > 0 { + log.WithField("rule", pRule).Info("Skipping rule because it contains named port ipsets (currently unsupported).") + return nil, SkipRule + } + + // Filter the Src and Dst CIDRs to only the IP version that we're rendering + var filteredAll bool + ruleCopy := *pRule + + ruleCopy.SrcNet, filteredAll = filterNets(pRule.SrcNet, ipVersion) + if filteredAll { + return nil, SkipRule + } + + ruleCopy.NotSrcNet, filteredAll = filterNets(pRule.NotSrcNet, ipVersion) + if filteredAll { + return nil, SkipRule + } + + ruleCopy.DstNet, filteredAll = filterNets(pRule.DstNet, ipVersion) + if filteredAll { + return nil, SkipRule + } + + ruleCopy.NotDstNet, filteredAll = filterNets(pRule.NotDstNet, ipVersion) + if filteredAll { + return nil, SkipRule + } + + // Log with the rule details for context + logCxt := log.WithField("rule", ruleCopy) + + // Start with a new empty hns aclPolicy (rule) + var aclPolicies []*hns.ACLPolicy + aclPolicy := s.NewRule(isInbound, rulePriority) + + // + // Action + // + switch strings.ToLower(ruleCopy.Action) { + case "", "allow": + aclPolicy.Action = hns.Allow + case "deny": + aclPolicy.Action = hns.Block + case "next-tier", "pass", "log": + logCxt.WithField("action", ruleCopy.Action).Info("This rule action is not supported, rule will be skipped") + return nil, SkipRule + default: + logCxt.WithField("action", ruleCopy.Action).Panic("Unknown rule action") + } + + // + // Source ports + // + if len(ruleCopy.SrcPorts) > 0 { + // Windows RS3 limitation, single port + ports := uint16(ruleCopy.SrcPorts[0].First) + + if isInbound { + aclPolicy.RemotePort = ports + logCxt.WithField("RemotePort", aclPolicy.RemotePort).Debug("Adding Source Ports as RemotePort condition") + } else { + aclPolicy.LocalPort = ports + logCxt.WithField("LocalPort", aclPolicy.LocalPort).Debug("Adding Source Ports as LocalPort condition") + } + } + + // + // Destination Ports + // + if len(ruleCopy.DstPorts) > 0 { + // Windows RS3 limitation, single port (start port) + ports := uint16(ruleCopy.DstPorts[0].First) + + if isInbound { + aclPolicy.LocalPort = ports + logCxt.WithField("LocalPort", aclPolicy.LocalPort).Debug("Adding Destination Ports as LocalPort condition") + } else { + aclPolicy.RemotePort = ports + logCxt.WithField("RemotePort", aclPolicy.RemotePort).Debug("Adding Destination Ports as RemotePort condition") + } + } + + // + // Protocol + // + if ruleCopy.Protocol != nil { + switch p := ruleCopy.Protocol.NumberOrName.(type) { + case *proto.Protocol_Name: + logCxt.WithField("protoName", p.Name).Debug("Adding Protocol Name condition") + aclPolicy.Protocol = protocolNameToNumber(p.Name) + case *proto.Protocol_Number: + logCxt.WithField("protoNum", p.Number).Debug("Adding Protocol number condition") + aclPolicy.Protocol = uint16(p.Number) + } + } + + // + // Source Neworks and IPSets + // + localAddresses := []string{""} // ensures slice always has at least one value + remoteAddresses := []string{""} + + srcAddresses := ruleCopy.SrcNet + + if len(ruleCopy.SrcIpSetIds) > 0 { + ipsetAddresses, err := s.getIPSetAddresses(ruleCopy.SrcIpSetIds) + if err != nil { + logCxt.Info("SrcIpSetIds could not be resolved, rule will be skipped") + return nil, SkipRule + } + srcAddresses = append(srcAddresses, ipsetAddresses...) + } + + if len(srcAddresses) > 0 { + if isInbound { + remoteAddresses = srcAddresses + logCxt.WithField("RemoteAddress", remoteAddresses).Debug("Adding Source Networks/IPsets as RemoteAddress conditions") + } else { + localAddresses = srcAddresses + logCxt.WithField("LocalAddress", localAddresses).Debug("Adding Source Networks/IPsets as LocalAddress conditions") + } + } + + // + // Destination Networks and IPSets + // + dstAddresses := ruleCopy.DstNet + + if len(ruleCopy.DstIpSetIds) > 0 { + ipsetAddresses, err := s.getIPSetAddresses(ruleCopy.DstIpSetIds) + if err != nil { + logCxt.Info("DstIpSetIds could not be resolved, rule will be skipped") + return nil, SkipRule + } + dstAddresses = append(dstAddresses, ipsetAddresses...) + } + + if len(dstAddresses) > 0 { + if isInbound { + localAddresses = dstAddresses + logCxt.WithField("LocalAddress", localAddresses).Debug("Adding Destination Networks/IPsets as LocalAddress condition") + } else { + remoteAddresses = dstAddresses + logCxt.WithField("RemoteAddress", remoteAddresses).Debug("Adding Destination Networks/IPsets as RemoteAddress condition") + } + } + + // For Windows RS3 only, there is a dataplane restriction of a single address/cidr per + // source or destination condition. The behavior below will be removed in + // the next iteration, but for now we have to break up the source and destination + // ip address combinations and represent them using multiple rules + for _, localAddr := range localAddresses { + for _, remoteAddr := range remoteAddresses { + newPolicy := *aclPolicy + newPolicy.LocalAddresses = localAddr + newPolicy.RemoteAddresses = remoteAddr + // Add this rule to the rules being returned + aclPolicies = append(aclPolicies, &newPolicy) + } + } + + return aclPolicies, nil +} + +func ruleHasNegativeMatches(pRule *proto.Rule) bool { + if len(pRule.NotSrcNet) > 0 || len(pRule.NotDstNet) > 0 { + return true + } + if len(pRule.NotSrcPorts) > 0 || len(pRule.NotDstPorts) > 0 { + return true + } + if len(pRule.NotSrcIpSetIds) > 0 || len(pRule.NotDstIpSetIds) > 0 { + return true + } + if len(pRule.NotSrcNamedPortIpSetIds) > 0 || len(pRule.NotDstNamedPortIpSetIds) > 0 { + return true + } + if pRule.NotProtocol != nil { + return true + } + if pRule.NotIcmp != nil { + return true + } + return false +} + +func portsContainRanges(ports []*proto.PortRange) bool { + if len(ports) > 1 { + return true + } + + for _, portRange := range ports { + if portRange.First != portRange.Last { + return true + } + } + return false +} + +// getIPSetAddresses retrieves all of the ip addresses (members) referenced by the provided +// IP sets. +func (s *PolicySets) getIPSetAddresses(setIds []string) ([]string, error) { + var addresses []string + var found bool + + for _, ipsetId := range setIds { + found = false + for _, ipSets := range s.IpSets { + ipSet := ipSets.GetIPSetMembers(ipsetId) + if ipSet == nil { + continue + } + addresses = append(addresses, ipSet...) + found = true + break + } + + if !found { + log.WithField("ipsetId", ipsetId).Info("IPSet could not be found") + return nil, MissingSet + } + } + + return addresses, nil +} + +// protocolNameToNumber converts a protocol name to its numeric representation (returned as string) +func protocolNameToNumber(protocolName string) uint16 { + switch strings.ToLower(protocolName) { + case "tcp": + return 6 + case "udp": + return 17 + case "icmp": + return 1 + case "icmpv6": + return 58 + case "sctp": + return 132 + case "udplite": + return 136 + default: + return 256 // any (as understood by hns) + } +} + +// NewRule returns a new hns switch rule object instantiated with default values. +func (s *PolicySets) NewRule(isInbound bool, priority uint16) *hns.ACLPolicy { + direction := hns.Out + if isInbound { + direction = hns.In + } + + return &hns.ACLPolicy{ + Type: hns.ACL, + RuleType: hns.Switch, + Action: hns.Block, + Direction: direction, + Protocol: 256, // Any, only required for RS3 + Priority: priority, + } +} + +// NewHostRule returns a new hns rule object scoped to the host. This is only +// temporarily required for compatibility with RS3. +func (s *PolicySets) NewHostRule(isInbound bool) *hns.ACLPolicy { + direction := hns.Out + if isInbound { + direction = hns.In + } + + return &hns.ACLPolicy{ + Type: hns.ACL, + RuleType: hns.Host, + Action: hns.Allow, + Direction: direction, + Priority: 100, + Protocol: 256, // Any + } +} + +// filterNets filters out any addresses which are not of the requested ipVersion. +func filterNets(mixedCIDRs []string, ipVersion uint8) (filtered []string, filteredAll bool) { + if len(mixedCIDRs) == 0 { + return nil, false + } + wantV6 := ipVersion == 6 + filteredAll = true + for _, net := range mixedCIDRs { + isV6 := strings.Contains(net, ":") + if isV6 != wantV6 { + continue + } + filtered = append(filtered, net) + filteredAll = false + } + return +} diff --git a/dataplane/windows/win_dataplane.go b/dataplane/windows/win_dataplane.go new file mode 100644 index 0000000000..4852297832 --- /dev/null +++ b/dataplane/windows/win_dataplane.go @@ -0,0 +1,345 @@ +//+build windows + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane + +import ( + "time" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/dataplane/windows/ipsets" + "github.com/projectcalico/felix/dataplane/windows/policysets" + "github.com/projectcalico/felix/jitter" + "github.com/projectcalico/felix/proto" + "github.com/projectcalico/felix/throttle" + "github.com/projectcalico/libcalico-go/lib/health" +) + +const ( + // msgPeekLimit is the maximum number of messages we'll try to grab from the to-dataplane + // channel before we apply the changes. Higher values allow us to batch up more work on + // the channel for greater throughput when we're under load (at cost of higher latency). + msgPeekLimit = 100 + + // After a failure to apply dataplane updates, we will delay for this amount of time + // before rescheduling another attempt to apply the pending updates. + reschedDelay = time.Duration(5) * time.Second +) + +var ( + processStartTime time.Time +) + +func init() { + processStartTime = time.Now() +} + +type Config struct { + IPv6Enabled bool + HealthAggregator *health.HealthAggregator +} + +// winDataplane implements an in-process Felix dataplane driver capable of applying network policy +// dataplane updates via the Host Network Service (HNS) on Windows. It communicates with the +// datastore-facing part of Felix via the Send/RecvMessage methods, which operate on the +// protobuf-defined API objects. +// +// Architecture +// +// The Windows dataplane driver is organised around a main event loop, which handles +// update events from the datastore and dataplane. +// +// Each pass around the main loop has two phases. In the first phase, updates are fanned +// out to "manager" objects, which calculate the changes that are needed. In the second phase, +// the set of pending changes are communicated to the HNS service so that they will be immediately +// applied to the dataplane. The second phase is skipped until the datastore is in sync; this +// ensures that the first update to the dataplane applies a consistent snapshot. +// +// Several optimizations and improvements are forthcoming. At this time, the Windows dataplane does +// not have a native concept similar to IP sets, which means that IP set information needs to be +// cached in the driver along with associated Policies/Profiles. As datastore updates are received, +// we refer back to the caches to recalculate the sets of rules which need to be sent to HNS. As the +// HNS API surface is enhanced, we may be able to optimize and remove some or all of these caches. +// +// Requirements on the API +// +// The dataplane does not do consistency checks on the incoming data. It expects to be told about +// dependent resources before they are needed and for their lifetime to exceed that of the resources +// that depend on them. For example, it is important the the datastore layer send an IP set create +// event before it sends a rule that references that IP set. +type WindowsDataplane struct { + // the channel which we receive messages from felix + toDataplane chan interface{} + // the channel used to send messages from the dataplane to felix + fromDataplane chan interface{} + // stores all of the managers which will be processing the various updates from felix. + allManagers []Manager + // each IPSets manages a whole "plane" of IP sets, i.e. all the IPv4 sets, or all the IPv6 + // IP sets. + ipSets []*ipsets.IPSets + // PolicySets manages all of the policies and profiles which have been communicated to the + // dataplane driver + policySets *policysets.PolicySets + // dataplaneNeedsSync is set if the dataplane is dirty in some way, i.e. we need to + // call apply(). + dataplaneNeedsSync bool + // doneFirstApply is set after we finish the first update to the dataplane. It indicates + // that the dataplane should now be in sync. + doneFirstApply bool + // the reschedule timer/channel enable us to force the dataplane driver to attempt to + // apply any pending updates to the dataplane. This is only enabled and used if a previous + // apply operation has failed and needs to be retried. + reschedTimer *time.Timer + reschedC <-chan time.Time + // a simple throttle to control how frequently the driver is allowed to apply updates + // to the dataplane. + applyThrottle *throttle.Throttle + // config provides a way for felix to provide some additional configuration options + // to the dataplane driver. This isn't really used currently, but will be in the future. + config Config +} + +const ( + healthName = "win_dataplane" + healthInterval = 10 * time.Second +) + +// Interface for Managers. Each Manager is responsible for processing updates from felix and +// for applying any necessary updates to the dataplane. +type Manager interface { + // OnUpdate is called for each protobuf message from the datastore. May either directly + // send updates to the IPSets and PolicySets objects (which will queue the updates + // until the main loop instructs them to act) or (for efficiency) may wait until + // a call to CompleteDeferredWork() to flush updates to the dataplane. + OnUpdate(protoBufMsg interface{}) + // Called to allow for any batched work to be completed. + CompleteDeferredWork() error +} + +// Registers a new Manager with the driver. +func (d *WindowsDataplane) RegisterManager(mgr Manager) { + d.allManagers = append(d.allManagers, mgr) +} + +// NewWinDataplaneDriver creates and initializes a new dataplane driver using the provided +// configuration. +func NewWinDataplaneDriver(config Config) *WindowsDataplane { + log.WithField("config", config).Info("Creating Windows dataplane driver.") + + ipSetsConfigV4 := ipsets.NewIPVersionConfig( + ipsets.IPFamilyV4, + ) + + ipSetsV4 := ipsets.NewIPSets(ipSetsConfigV4) + + dp := &WindowsDataplane{ + toDataplane: make(chan interface{}, msgPeekLimit), + fromDataplane: make(chan interface{}, 100), + config: config, + applyThrottle: throttle.New(10), + } + + dp.applyThrottle.Refill() // Allow the first apply() immediately. + + dp.ipSets = append(dp.ipSets, ipSetsV4) + dp.policySets = policysets.NewPolicySets(dp.ipSets) + + dp.RegisterManager(newIPSetsManager(ipSetsV4)) + dp.RegisterManager(newPolicyManager(dp.policySets)) + dp.RegisterManager(newEndpointManager(dp.policySets)) + + // Register that we will report liveness and readiness. + if config.HealthAggregator != nil { + log.Info("Registering to report health.") + config.HealthAggregator.RegisterReporter( + healthName, + &health.HealthReport{Live: true, Ready: true}, + healthInterval*2, + ) + } + + return dp +} + +// Starts the driver. +func (d *WindowsDataplane) Start() { + go d.loopUpdatingDataplane() +} + +// Called by someone to put a message into our channel so that the loop will pick it up +// and process it. +func (d *WindowsDataplane) SendMessage(msg interface{}) error { + log.Debugf("WindowsDataPlane->SendMessage to felix: %T", msg) + + d.toDataplane <- msg + return nil +} + +// Called by Felix.go so that it can receive a channel to listen for message being +// sent by this dataplane driver. +func (d *WindowsDataplane) RecvMessage() (interface{}, error) { + log.Debug("WindowsDataPlane->RecvMessage was invoked") + + return <-d.fromDataplane, nil +} + +// The main loop which is responsible for picking up any updates and providing them +// to the managers for processing. After managers have had a chance to process the updates +// the loop will call Apply() to actually apply changes to the dataplane. +func (d *WindowsDataplane) loopUpdatingDataplane() { + log.Debug("Started windows dataplane driver loop") + + healthTicks := time.NewTicker(healthInterval).C + d.reportHealth() + + // Fill the apply throttle leaky bucket. + throttleC := jitter.NewTicker(100*time.Millisecond, 10*time.Millisecond).C + beingThrottled := false + + datastoreInSync := false + + // function to pass messages to the managers for processing + processMsgFromCalcGraph := func(msg interface{}) { + log.WithField("msg", proto.MsgStringer{Msg: msg}).Infof( + "Received %T update from calculation graph", msg) + for _, mgr := range d.allManagers { + mgr.OnUpdate(msg) + } + switch msg.(type) { + case *proto.InSync: + log.WithField("timeSinceStart", time.Since(processStartTime)).Info( + "Datastore in sync, flushing the dataplane for the first time...") + datastoreInSync = true + } + } + + for { + select { + case msg := <-d.toDataplane: + // Process the message we received, then opportunistically process any other + // pending messages. + batchSize := 1 + processMsgFromCalcGraph(msg) + msgLoop1: + for i := 0; i < msgPeekLimit; i++ { + select { + case msg := <-d.toDataplane: + processMsgFromCalcGraph(msg) + batchSize++ + default: + // Channel blocked so we must be caught up. + break msgLoop1 + } + } + d.dataplaneNeedsSync = true + case <-throttleC: + d.applyThrottle.Refill() + case <-healthTicks: + d.reportHealth() + case <-d.reschedC: + log.Debug("Reschedule kick received") + d.dataplaneNeedsSync = true + d.reschedC = nil + } + + if datastoreInSync && d.dataplaneNeedsSync { + // Dataplane is out-of-sync, check if we're throttled. + if d.applyThrottle.Admit() { + if beingThrottled && d.applyThrottle.WouldAdmit() { + log.Info("Dataplane updates no longer throttled") + beingThrottled = false + } + log.Info("Applying dataplane updates") + applyStart := time.Now() + + // Actually apply the changes to the dataplane. + d.apply() + + applyTime := time.Since(applyStart) + log.WithField("msecToApply", applyTime.Seconds()*1000.0).Info( + "Finished applying updates to dataplane.") + + if !d.doneFirstApply { + log.WithField( + "secsSinceStart", time.Since(processStartTime).Seconds(), + ).Info("Completed first update to dataplane.") + d.doneFirstApply = true + } + + d.reportHealth() + } else { + if !beingThrottled { + log.Info("Dataplane updates throttled") + beingThrottled = true + } + } + } + } +} + +// Applies any pending changes to the dataplane by giving each of the managers a chance to +// complete their deffered work. If the operation fails, then this will also set up a +// rescheduling kick so that the apply can be reattempted. +func (d *WindowsDataplane) apply() { + // Unset the needs-sync flag, a rescheduling kick will reset it later if something failed + d.dataplaneNeedsSync = false + + // Allow each of the managers to complete any deferred work. + scheduleRetry := false + for _, mgr := range d.allManagers { + err := mgr.CompleteDeferredWork() + if err != nil { + // schedule a retry + log.WithError(err).Warning("CompleteDeferredWork returned an error - scheduling a retry") + scheduleRetry = true + } + } + + // Set up any needed rescheduling kick. + if d.reschedC != nil { + // We have an active rescheduling timer, stop it so we can restart it with a + // different timeout below if it is still needed. + if !d.reschedTimer.Stop() { + // Timer had already popped, drain its channel. + <-d.reschedC + } + // Nil out our copy of the channel to record that the timer is inactive. + d.reschedC = nil + } + + if scheduleRetry { + if d.reschedTimer == nil { + // First time, create the timer. + d.reschedTimer = time.NewTimer(reschedDelay) + } else { + // Have an existing timer, reset it. + d.reschedTimer.Reset(reschedDelay) + } + + d.reschedC = d.reschedTimer.C + } +} + +// Invoked periodically to report health (liveness/readiness) +func (d *WindowsDataplane) reportHealth() { + if d.config.HealthAggregator != nil { + d.config.HealthAggregator.Report( + healthName, + &health.HealthReport{Live: true, Ready: d.doneFirstApply}, + ) + } +} diff --git a/dataplane/windows/win_dataplane_test.go b/dataplane/windows/win_dataplane_test.go new file mode 100644 index 0000000000..3cb8b8e738 --- /dev/null +++ b/dataplane/windows/win_dataplane_test.go @@ -0,0 +1,41 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/projectcalico/felix/config" + "github.com/projectcalico/felix/dataplane/windows" +) + +var _ = Describe("Constructor test", func() { + var configParams *config.Config + var dpConfig windataplane.Config + + JustBeforeEach(func() { + configParams = config.New() + + dpConfig := windataplane.Config{ + IPv6Enabled: configParams.Ipv6Support, + } + }) + + It("should be constructable", func() { + var dp = windataplane.NewWinDataplaneDriver(dpConfig) + Expect(dp).ToNot(BeNil()) + }) +}) diff --git a/dataplane/windows/windataplane_ut_suite_test.go b/dataplane/windows/windataplane_ut_suite_test.go new file mode 100644 index 0000000000..8a0a03477e --- /dev/null +++ b/dataplane/windows/windataplane_ut_suite_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package windataplane_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestWindataplane(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Windataplane Suite") +} diff --git a/felix.go b/felix.go index e453455721..e102d03023 100644 --- a/felix.go +++ b/felix.go @@ -19,15 +19,12 @@ import ( "errors" "fmt" "math/rand" - "net" "net/http" "os" "os/exec" "os/signal" "runtime" "runtime/debug" - "runtime/pprof" - "strings" "syscall" "time" @@ -43,13 +40,9 @@ import ( "github.com/projectcalico/felix/calc" "github.com/projectcalico/felix/config" _ "github.com/projectcalico/felix/config" - "github.com/projectcalico/felix/extdataplane" - "github.com/projectcalico/felix/ifacemonitor" - "github.com/projectcalico/felix/intdataplane" - "github.com/projectcalico/felix/ipsets" + dp "github.com/projectcalico/felix/dataplane" "github.com/projectcalico/felix/logutils" "github.com/projectcalico/felix/proto" - "github.com/projectcalico/felix/rules" "github.com/projectcalico/felix/statusrep" "github.com/projectcalico/felix/usagerep" apiv3 "github.com/projectcalico/libcalico-go/lib/apis/v3" @@ -251,94 +244,10 @@ configRetry: // Start up the dataplane driver. This may be the internal go-based driver or an external // one. - var dpDriver dataplaneDriver + var dpDriver dp.DataplaneDriver var dpDriverCmd *exec.Cmd - if configParams.UseInternalDataplaneDriver { - log.Info("Using internal dataplane driver.") - // Dedicated mark bits for accept and pass actions. These are long lived bits - // that we use for communicating between chains. - markAccept := configParams.NextIptablesMark() - markPass := configParams.NextIptablesMark() - // Short-lived mark bits for local calculations within a chain. - markScratch0 := configParams.NextIptablesMark() - markScratch1 := configParams.NextIptablesMark() - log.WithFields(log.Fields{ - "acceptMark": markAccept, - "passMark": markPass, - "scratch0Mark": markScratch0, - "scratch1Mark": markScratch1, - }).Info("Calculated iptables mark bits") - dpConfig := intdataplane.Config{ - IfaceMonitorConfig: ifacemonitor.Config{ - InterfaceExcludes: configParams.InterfaceExcludes(), - }, - RulesConfig: rules.Config{ - WorkloadIfacePrefixes: configParams.InterfacePrefixes(), - - IPSetConfigV4: ipsets.NewIPVersionConfig( - ipsets.IPFamilyV4, - rules.IPSetNamePrefix, - rules.AllHistoricIPSetNamePrefixes, - rules.LegacyV4IPSetNames, - ), - IPSetConfigV6: ipsets.NewIPVersionConfig( - ipsets.IPFamilyV6, - rules.IPSetNamePrefix, - rules.AllHistoricIPSetNamePrefixes, - nil, - ), - - OpenStackSpecialCasesEnabled: configParams.OpenstackActive(), - OpenStackMetadataIP: net.ParseIP(configParams.MetadataAddr), - OpenStackMetadataPort: uint16(configParams.MetadataPort), - - IptablesMarkAccept: markAccept, - IptablesMarkPass: markPass, - IptablesMarkScratch0: markScratch0, - IptablesMarkScratch1: markScratch1, - - IPIPEnabled: configParams.IpInIpEnabled, - IPIPTunnelAddress: configParams.IpInIpTunnelAddr, - - IptablesLogPrefix: configParams.LogPrefix, - EndpointToHostAction: configParams.DefaultEndpointToHostAction, - IptablesFilterAllowAction: configParams.IptablesFilterAllowAction, - IptablesMangleAllowAction: configParams.IptablesMangleAllowAction, - - FailsafeInboundHostPorts: configParams.FailsafeInboundHostPorts, - FailsafeOutboundHostPorts: configParams.FailsafeOutboundHostPorts, - - DisableConntrackInvalid: configParams.DisableConntrackInvalidCheck, - }, - IPIPMTU: configParams.IpInIpMtu, - IptablesRefreshInterval: configParams.IptablesRefreshInterval, - RouteRefreshInterval: configParams.RouteRefreshInterval, - IPSetsRefreshInterval: configParams.IpsetsRefreshInterval, - IptablesPostWriteCheckInterval: configParams.IptablesPostWriteCheckIntervalSecs, - IptablesInsertMode: configParams.ChainInsertMode, - IptablesLockFilePath: configParams.IptablesLockFilePath, - IptablesLockTimeout: configParams.IptablesLockTimeoutSecs, - IptablesLockProbeInterval: configParams.IptablesLockProbeIntervalMillis, - MaxIPSetSize: configParams.MaxIpsetSize, - IgnoreLooseRPF: configParams.IgnoreLooseRPF, - IPv6Enabled: configParams.Ipv6Support, - StatusReportingInterval: configParams.ReportingIntervalSecs, - - NetlinkTimeout: configParams.NetlinkTimeoutSecs, - - PostInSyncCallback: func() { dumpHeapMemoryProfile(configParams) }, - HealthAggregator: healthAggregator, - - DebugSimulateDataplaneHangAfter: configParams.DebugSimulateDataplaneHangAfter, - } - intDP := intdataplane.NewIntDataplaneDriver(dpConfig) - intDP.Start() - dpDriver = intDP - } else { - log.WithField("driver", configParams.DataplaneDriver).Info( - "Using external dataplane driver.") - dpDriver, dpDriverCmd = extdataplane.StartExtDataplaneDriver(configParams.DataplaneDriver) - } + + dpDriver, dpDriverCmd = dp.StartDataplaneDriver(configParams, healthAggregator) // Initialise the glue logic that connects the calculation graph to/from the dataplane driver. log.Info("Connect to the dataplane driver.") @@ -510,56 +419,13 @@ configRetry: } // On receipt of SIGUSR1, write out heap profile. - usr1SignalChan := make(chan os.Signal, 1) - signal.Notify(usr1SignalChan, syscall.SIGUSR1) - go func() { - for { - <-usr1SignalChan - dumpHeapMemoryProfile(configParams) - } - }() + logutils.DumpHeapMemoryOnSignal(configParams) // Now monitor the worker process and our worker threads and shut // down the process gracefully if they fail. monitorAndManageShutdown(failureReportChan, dpDriverCmd, stopSignalChans) } -func dumpHeapMemoryProfile(configParams *config.Config) { - // If a memory profile file name is configured, dump a heap memory profile. If the - // configured filename includes "", that will be replaced with a stamp indicating - // the current time. - memProfFileName := configParams.DebugMemoryProfilePath - if memProfFileName != "" { - logCxt := log.WithField("file", memProfFileName) - logCxt.Info("Asked to create a memory profile.") - - // If the configured file name includes "", replace that with the current - // time. - if strings.Contains(memProfFileName, "") { - timestamp := time.Now().Format("2006-01-02-15:04:05") - memProfFileName = strings.Replace(memProfFileName, "", timestamp, 1) - logCxt = log.WithField("file", memProfFileName) - } - - // Open a file with that name. - memProfFile, err := os.Create(memProfFileName) - if err != nil { - logCxt.WithError(err).Fatal("Could not create memory profile file") - memProfFile = nil - } else { - defer memProfFile.Close() - logCxt.Info("Writing memory profile...") - // The initial resync uses a lot of scratch space so now is - // a good time to force a GC and return any RAM that we can. - debug.FreeOSMemory() - if err := pprof.WriteHeapProfile(memProfFile); err != nil { - logCxt.WithError(err).Fatal("Could not write memory profile") - } - logCxt.Info("Finished writing memory profile") - } - } -} - func servePrometheusMetrics(configParams *config.Config) { for { log.WithField("port", configParams.PrometheusMetricsPort).Info("Starting prometheus metrics endpoint") @@ -811,11 +677,6 @@ func getAndMergeConfig( return nil } -type dataplaneDriver interface { - SendMessage(msg interface{}) error - RecvMessage() (msg interface{}, err error) -} - type DataplaneConnector struct { config *config.Config configUpdChan chan<- map[string]string @@ -823,7 +684,7 @@ type DataplaneConnector struct { StatusUpdatesFromDataplane chan interface{} InSync chan bool failureReportChan chan<- string - dataplane dataplaneDriver + dataplane dp.DataplaneDriver datastore bapi.Client statusReporter *statusrep.EndpointStatusReporter @@ -839,7 +700,7 @@ type Startable interface { func newConnector(configParams *config.Config, configUpdChan chan<- map[string]string, datastore bapi.Client, - dataplane dataplaneDriver, + dataplane dp.DataplaneDriver, failureReportChan chan<- string, ) *DataplaneConnector { felixConn := &DataplaneConnector{ diff --git a/glide.lock b/glide.lock index f987ac3bff..7bf7f9067b 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 3cff5d16d4998d98f9fb30058c9b50079ee82aef8512b07ce8789c4a03089c0f -updated: 2017-12-03T21:13:51.17571926Z +hash: 36f2fdbe37825d8fc41f1a270562185240b4109cf699d09ea12cced9d2c5d9ec +updated: 2017-12-04T22:16:27.221069031Z imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -139,6 +139,10 @@ imports: version: c12348ce28de40eed0136aa2b644d0ee0650e56c subpackages: - pbutil +- name: github.com/Microsoft/go-winio + version: 78439966b38d69bf38227fbf57ac8a6fee70f69a +- name: github.com/Microsoft/hcsshim + version: 34a629f78a5d50f7de07727e41a948685c45e026 - name: github.com/mipearson/rfw version: 6f0a6f3266ba1058df9ef0c94cda1cecd2e62852 - name: github.com/onsi/ginkgo @@ -243,13 +247,13 @@ imports: subpackages: - go - name: github.com/prometheus/common - version: 49fee292b27bfff7f354ee0f64e1bc4850462edf + version: 2e54d0b93cba2fd133edc32211dcc32c06ef72ca subpackages: - expfmt - internal/bitbucket.org/ww/goautoneg - model - name: github.com/prometheus/procfs - version: a1dba9ce8baed984a2495b658c82687f8157b98f + version: a6e9df898b1336106c743392c48ee0b71f5c4efa subpackages: - xfs - name: github.com/PuerkitoBio/purell @@ -271,7 +275,7 @@ imports: - name: github.com/whyrusleeping/go-logging version: 0457bb6b88fc1973573aaf6b5145d8d3ae972390 - name: golang.org/x/crypto - version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3 + version: 94eea52f7b742c7cbe0b03b22f0c4c8631ece122 subpackages: - ssh/terminal - name: golang.org/x/net diff --git a/glide.yaml b/glide.yaml index 334c283da7..cc79849371 100644 --- a/glide.yaml +++ b/glide.yaml @@ -59,3 +59,7 @@ import: version: 2e7ea655c10e4d4d73365f0f073b81b39cb08ee1 subpackages: - net +- package: github.com/Microsoft/go-winio + version: 78439966b38d69bf38227fbf57ac8a6fee70f69a +- package: github.com/Microsoft/hcsshim + version: 34a629f78a5d50f7de07727e41a948685c45e026 diff --git a/logutils/dump.go b/logutils/dump.go new file mode 100644 index 0000000000..4b317f6c1d --- /dev/null +++ b/logutils/dump.go @@ -0,0 +1,79 @@ +// +build !windows + +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logutils + +import ( + "os" + "os/signal" + "runtime/debug" + "runtime/pprof" + "strings" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/config" +) + +func DumpHeapMemoryProfile(configParams *config.Config) { + // If a memory profile file name is configured, dump a heap memory profile. If the + // configured filename includes "", that will be replaced with a stamp indicating + // the current time. + memProfFileName := configParams.DebugMemoryProfilePath + if memProfFileName != "" { + logCxt := log.WithField("file", memProfFileName) + logCxt.Info("Asked to create a memory profile.") + + // If the configured file name includes "", replace that with the current + // time. + if strings.Contains(memProfFileName, "") { + timestamp := time.Now().Format("2006-01-02-15:04:05") + memProfFileName = strings.Replace(memProfFileName, "", timestamp, 1) + logCxt = log.WithField("file", memProfFileName) + } + + // Open a file with that name. + memProfFile, err := os.Create(memProfFileName) + if err != nil { + logCxt.WithError(err).Fatal("Could not create memory profile file") + memProfFile = nil + } else { + defer memProfFile.Close() + logCxt.Info("Writing memory profile...") + // The initial resync uses a lot of scratch space so now is + // a good time to force a GC and return any RAM that we can. + debug.FreeOSMemory() + if err := pprof.WriteHeapProfile(memProfFile); err != nil { + logCxt.WithError(err).Fatal("Could not write memory profile") + } + logCxt.Info("Finished writing memory profile") + } + } +} + +func DumpHeapMemoryOnSignal(configParams *config.Config) { + // On receipt of SIGUSR1, write out heap profile. + usr1SignalChan := make(chan os.Signal, 1) + signal.Notify(usr1SignalChan, syscall.SIGUSR1) + go func() { + for { + <-usr1SignalChan + DumpHeapMemoryProfile(configParams) + } + }() +} diff --git a/logutils/logutils.go b/logutils/logutils.go index 54f7f598ec..3d864a4c85 100644 --- a/logutils/logutils.go +++ b/logutils/logutils.go @@ -15,12 +15,8 @@ package logutils import ( - "io" - "log/syslog" "os" - "path" - "github.com/mipearson/rfw" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -107,58 +103,27 @@ func ConfigureLogging(configParams *config.Config) { // Screen target. var dests []*logutils.Destination if configParams.LogSeverityScreen != "" { - screenDest := logutils.NewStreamDestination( - logLevelScreen, - os.Stderr, - make(chan logutils.QueuedLog, logQueueSize), - configParams.DebugDisableLogDropping, - counterLogErrors, - ) - dests = append(dests, screenDest) + dests = append(dests, getScreenDestination(configParams, logLevelScreen)) } // File target. We record any errors so we can log them out below after finishing set-up // of the logger. var fileDirErr, fileOpenErr error if configParams.LogSeverityFile != "" && configParams.LogFilePath != "" { - fileDirErr = os.MkdirAll(path.Dir(configParams.LogFilePath), 0755) - var rotAwareFile io.Writer - rotAwareFile, fileOpenErr = rfw.Open(configParams.LogFilePath, 0644) - if fileDirErr == nil && fileOpenErr == nil { - fileDest := logutils.NewStreamDestination( - logLevelFile, - rotAwareFile, - make(chan logutils.QueuedLog, logQueueSize), - configParams.DebugDisableLogDropping, - counterLogErrors, - ) - dests = append(dests, fileDest) + var destination *logutils.Destination + destination, fileDirErr, fileOpenErr = getFileDestination(configParams, logLevelFile) + if fileDirErr == nil && fileOpenErr == nil && destination != nil { + dests = append(dests, destination) } } // Syslog target. Again, we record the error if we fail to connect to syslog. var sysErr error if configParams.LogSeveritySys != "" { - // Set net/addr to "" so we connect to the system syslog server rather - // than a remote one. - net := "" - addr := "" - // The priority parameter is a combination of facility and default - // severity. We want to log with the standard LOG_USER facility; the - // severity is actually irrelevant because the hook always overrides - // it. - priority := syslog.LOG_USER | syslog.LOG_INFO - tag := "calico-felix" - w, sysErr := syslog.Dial(net, addr, priority, tag) - if sysErr == nil { - syslogDest := logutils.NewSyslogDestination( - logLevelSyslog, - w, - make(chan logutils.QueuedLog, logQueueSize), - configParams.DebugDisableLogDropping, - counterLogErrors, - ) - dests = append(dests, syslogDest) + var destination *logutils.Destination + destination, sysErr = getSyslogDestination(configParams, logLevelSyslog) + if sysErr == nil && destination != nil { + dests = append(dests, destination) } } @@ -192,3 +157,13 @@ func ConfigureLogging(configParams *config.Config) { "parameter LogSeveritySys=none or configure a local syslog service.") } } + +func getScreenDestination(configParams *config.Config, logLevel log.Level) *logutils.Destination { + return logutils.NewStreamDestination( + logLevel, + os.Stderr, + make(chan logutils.QueuedLog, logQueueSize), + configParams.DebugDisableLogDropping, + counterLogErrors, + ) +} diff --git a/logutils/logutils_linux.go b/logutils/logutils_linux.go new file mode 100644 index 0000000000..cbb6c54829 --- /dev/null +++ b/logutils/logutils_linux.go @@ -0,0 +1,69 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logutils + +import ( + "io" + "log/syslog" + "os" + "path" + + "github.com/mipearson/rfw" + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/config" + "github.com/projectcalico/libcalico-go/lib/logutils" +) + +func getFileDestination(configParams *config.Config, logLevel log.Level) (fileDest *logutils.Destination, fileDirErr error, fileOpenErr error) { + fileDirErr = os.MkdirAll(path.Dir(configParams.LogFilePath), 0755) + var rotAwareFile io.Writer + rotAwareFile, fileOpenErr = rfw.Open(configParams.LogFilePath, 0644) + if fileDirErr == nil && fileOpenErr == nil { + fileDest = logutils.NewStreamDestination( + logLevel, + rotAwareFile, + make(chan logutils.QueuedLog, logQueueSize), + configParams.DebugDisableLogDropping, + counterLogErrors, + ) + } + return +} + +func getSyslogDestination(configParams *config.Config, logLevel log.Level) (*logutils.Destination, error) { + // Set net/addr to "" so we connect to the system syslog server rather + // than a remote one. + net := "" + addr := "" + // The priority parameter is a combination of facility and default + // severity. We want to log with the standard LOG_USER facility; the + // severity is actually irrelevant because the hook always overrides + // it. + priority := syslog.LOG_USER | syslog.LOG_INFO + tag := "calico-felix" + w, sysErr := syslog.Dial(net, addr, priority, tag) + if sysErr == nil { + syslogDest := logutils.NewSyslogDestination( + logLevel, + w, + make(chan logutils.QueuedLog, logQueueSize), + configParams.DebugDisableLogDropping, + counterLogErrors, + ) + return syslogDest, sysErr + } + return nil, sysErr +} diff --git a/logutils/logutils_windows.go b/logutils/logutils_windows.go new file mode 100644 index 0000000000..f93bbd3e35 --- /dev/null +++ b/logutils/logutils_windows.go @@ -0,0 +1,72 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logutils + +import ( + "io" + "os" + "path" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/felix/config" + "github.com/projectcalico/libcalico-go/lib/logutils" +) + +// File destination for Windows +func getFileDestination(configParams *config.Config, logLevel log.Level) (fileDest *logutils.Destination, fileDirErr error, fileOpenErr error) { + fileDirErr = os.MkdirAll(path.Dir(configParams.LogFilePath), 0755) + var logFile io.Writer + logFile, fileOpenErr = openLogFile(configParams.LogFilePath, 0644) + if fileDirErr == nil && fileOpenErr == nil { + fileDest = logutils.NewStreamDestination( + logLevel, + logFile, + make(chan logutils.QueuedLog, logQueueSize), + configParams.DebugDisableLogDropping, + counterLogErrors, + ) + } + return +} + +// Stub, syslog destination is not used on Windows +func getSyslogDestination(configParams *config.Config, logLevel log.Level) (*logutils.Destination, error) { + return nil, nil +} + +// Stub, this func is not used on Windows +func DumpHeapMemoryOnSignal(configParams *config.Config) { + return +} + +// A simple io.Writer for logging to file +type FileWriter struct { + file *os.File +} + +func (f *FileWriter) Write(p []byte) (int, error) { + return f.file.Write(p) +} + +func openLogFile(path string, mode os.FileMode) (*FileWriter, error) { + var w FileWriter + var err error + w.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, mode) + if err != nil { + return nil, err + } + return &w, err +} diff --git a/proto/msgstringer.go b/proto/msgstringer.go new file mode 100644 index 0000000000..f4837bdadd --- /dev/null +++ b/proto/msgstringer.go @@ -0,0 +1,57 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proto + +import ( + "fmt" + + log "github.com/sirupsen/logrus" +) + +// msgStringer wraps an API message to customise how we stringify it. For example, it truncates +// the lists of members in the (potentially very large) IPSetsUpdate messages. +type MsgStringer struct { + Msg interface{} +} + +func (m MsgStringer) String() string { + if log.GetLevel() < log.DebugLevel && m.Msg != nil { + const truncateAt = 10 + switch msg := m.Msg.(type) { + case *IPSetUpdate: + if len(msg.Members) < truncateAt { + return fmt.Sprintf("%v", msg) + } + return fmt.Sprintf("id:%#v members(%d):%#v(truncated)", + msg.Id, len(msg.Members), msg.Members[:truncateAt]) + case *IPSetDeltaUpdate: + if len(msg.AddedMembers) < truncateAt && len(msg.RemovedMembers) < truncateAt { + return fmt.Sprintf("%v", msg) + } + addedNum := truncateAt + removedNum := truncateAt + if len(msg.AddedMembers) < addedNum { + addedNum = len(msg.AddedMembers) + } + if len(msg.RemovedMembers) < removedNum { + removedNum = len(msg.RemovedMembers) + } + return fmt.Sprintf("id:%#v addedMembers(%d):%#v(truncated) removedMembers(%d):%#v(truncated)", + msg.Id, len(msg.AddedMembers), msg.AddedMembers[:addedNum], + len(msg.RemovedMembers), msg.RemovedMembers[:removedNum]) + } + } + return fmt.Sprintf("%v", m.Msg) +} diff --git a/utils/run-coverage b/utils/run-coverage index e2461daf9c..2ac1356681 100755 --- a/utils/run-coverage +++ b/utils/run-coverage @@ -8,7 +8,7 @@ find . -name "*.coverprofile" -type f -delete echo "Calculating packages to cover..." go_dirs=$(find -type f -name '*.go' | \ - grep -vE '/vendor/|\./proto/|.glide|/k8sfv/|/fv/' | \ + grep -vE '/vendor/|\./proto/|.glide|/k8sfv/|/fv/|/dataplane/windows/' | \ xargs -n 1 dirname | \ sort | uniq | \ tr '\n' ',' | \ @@ -21,7 +21,7 @@ test_pkgs=$(go list -f '{{ if .TestGoFiles | or .XTestGoFiles }}{{ .ImportPath } test ! -z "$test_pkgs" echo "Packages with tests: $test_pkgs" -ginkgo -cover -covermode=count -coverpkg=${go_dirs} -r -skipPackage fv,k8sfv +ginkgo -cover -covermode=count -coverpkg=${go_dirs} -r -skipPackage fv,k8sfv,windows gocovmerge $(find . -name '*.coverprofile') > combined.coverprofile # Print the coverage. We use sed to remove the verbose prefix and trim down