diff --git a/internal/enginenetx/statstracker.go b/internal/enginenetx/statsmanager.go similarity index 100% rename from internal/enginenetx/statstracker.go rename to internal/enginenetx/statsmanager.go diff --git a/internal/enginenetx/statstracker_test.go b/internal/enginenetx/statsmanager_test.go similarity index 100% rename from internal/enginenetx/statstracker_test.go rename to internal/enginenetx/statsmanager_test.go diff --git a/internal/enginenetx/statspolicy.go b/internal/enginenetx/statspolicy.go new file mode 100644 index 0000000000..c57f74c29e --- /dev/null +++ b/internal/enginenetx/statspolicy.go @@ -0,0 +1,95 @@ +package enginenetx + +// +// Scheduling policy based on stats that fallbacks to +// another policy after it has produced all the working +// tactics we can produce given the current stats. +// + +import ( + "context" + "sort" +) + +// statsPolicy is a policy that schedules tactics already known +// to work based on statistics and defers to a fallback policy +// once it has generated all the tactics known to work. +// +// The zero value of this struct is invalid; please, make sure you +// fill all the fields marked as MANDATORY. +type statsPolicy struct { + // Fallback is the MANDATORY fallback policy. + Fallback httpsDialerPolicy + + // Stats is the MANDATORY stats manager. + Stats *statsManager +} + +var _ httpsDialerPolicy = &statsPolicy{} + +// LookupTactics implements HTTPSDialerPolicy. +func (p *statsPolicy) LookupTactics(ctx context.Context, domain string, port string) <-chan *httpsDialerTactic { + out := make(chan *httpsDialerTactic) + + go func() { + index := 0 + defer close(out) + + // make sure we don't emit two equal policy in a single run + uniq := make(map[string]int) + + // function that emits a given tactic unless we already emitted it + maybeEmitTactic := func(t *httpsDialerTactic) { + key := t.tacticSummaryKey() + if uniq[key] > 0 { + return + } + uniq[key]++ + t.InitialDelay = happyEyeballsDelay(index) + index += 1 + out <- t + } + + // give priority to what we know from stats + for _, t := range p.statsLookupTactics(domain, port) { + maybeEmitTactic(t) + } + + // fallback to the secondary policy + for t := range p.Fallback.LookupTactics(ctx, domain, port) { + maybeEmitTactic(t) + } + }() + + return out +} + +func (p *statsPolicy) statsLookupTactics(domain string, port string) (out []*httpsDialerTactic) { + tactics := p.Stats.LookupTactics(domain, port) + + successRate := func(t *statsTactic) (rate float64) { + if t.CountStarted > 0 { + rate = float64(t.CountSuccess) / float64(t.CountStarted) + } + return + } + + sort.SliceStable(tactics, func(i, j int) bool { + // Implementation note: the function should implement the "less" semantics + // but we want descending sort, so we're using a "more" semantics + // + // TODO(bassosimone): should we also consider the number of samples + // we have and how recent a sample is? + return successRate(tactics[i]) > successRate(tactics[j]) + }) + + for _, t := range tactics { + // make sure we only include samples with 1+ successes; we don't want this policy + // to return what we already know it's not working and it will be the purpose of the + // fallback policy to generate new tactics to test + if t.CountSuccess > 0 { + out = append(out, t.Tactic) + } + } + return +} diff --git a/internal/enginenetx/statspolicy_test.go b/internal/enginenetx/statspolicy_test.go new file mode 100644 index 0000000000..5031ae1a96 --- /dev/null +++ b/internal/enginenetx/statspolicy_test.go @@ -0,0 +1,225 @@ +package enginenetx + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/apex/log" + "github.com/google/go-cmp/cmp" + "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/mocks" + "github.com/ooni/probe-cli/v3/internal/netemx" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +func TestStatsPolicyWorkingAsIntended(t *testing.T) { + // prepare the content of the stats + twentyMinutesAgo := time.Now().Add(-20 * time.Minute) + + const beaconAddress = netemx.AddressApiOONIIo + + expectTacticsStats := []*statsTactic{{ + CountStarted: 5, + CountTCPConnectError: 0, + CountTCPConnectInterrupt: 0, + CountTLSHandshakeError: 0, + CountTLSHandshakeInterrupt: 0, + CountTLSVerificationError: 0, + CountSuccess: 5, + HistoTCPConnectError: map[string]int64{}, + HistoTLSHandshakeError: map[string]int64{}, + HistoTLSVerificationError: map[string]int64{}, + LastUpdated: twentyMinutesAgo, + Tactic: &httpsDialerTactic{ + Address: beaconAddress, + InitialDelay: 0, + Port: "443", + SNI: "www.repubblica.it", + VerifyHostname: "api.ooni.io", + }, + }, { + CountStarted: 3, + CountTCPConnectError: 0, + CountTCPConnectInterrupt: 0, + CountTLSHandshakeError: 1, + CountTLSHandshakeInterrupt: 0, + CountTLSVerificationError: 0, + CountSuccess: 2, + HistoTCPConnectError: map[string]int64{}, + HistoTLSHandshakeError: map[string]int64{}, + HistoTLSVerificationError: map[string]int64{}, + LastUpdated: twentyMinutesAgo, + Tactic: &httpsDialerTactic{ + Address: beaconAddress, + InitialDelay: 0, + Port: "443", + SNI: "www.kernel.org", + VerifyHostname: "api.ooni.io", + }, + }, { + CountStarted: 3, + CountTCPConnectError: 0, + CountTCPConnectInterrupt: 0, + CountTLSHandshakeError: 3, + CountTLSHandshakeInterrupt: 0, + CountTLSVerificationError: 0, + CountSuccess: 0, + HistoTCPConnectError: map[string]int64{}, + HistoTLSHandshakeError: map[string]int64{}, + HistoTLSVerificationError: map[string]int64{}, + LastUpdated: twentyMinutesAgo, + Tactic: &httpsDialerTactic{ + Address: beaconAddress, + InitialDelay: 0, + Port: "443", + SNI: "theconversation.com", + VerifyHostname: "api.ooni.io", + }, + }} + + // createStatsManager creates a stats manager given some baseline stats + createStatsManager := func(domainEndpoint string, tactics ...*statsTactic) *statsManager { + container := &statsContainer{ + DomainEndpoints: map[string]*statsDomainEndpoint{ + domainEndpoint: { + Tactics: map[string]*statsTactic{}, + }, + }, + Version: statsContainerVersion, + } + + for _, tx := range tactics { + container.DomainEndpoints[domainEndpoint].Tactics[tx.Tactic.tacticSummaryKey()] = tx + } + + kvStore := &kvstore.Memory{} + if err := kvStore.Set(statsKey, runtimex.Try1(json.Marshal(container))); err != nil { + t.Fatal(err) + } + + return newStatsManager(kvStore, log.Log) + } + + t.Run("when we have unique statistics", func(t *testing.T) { + // create stats manager + stats := createStatsManager("api.ooni.io:443", expectTacticsStats...) + + // create the composed policy + policy := &statsPolicy{ + Fallback: &dnsPolicy{ + Logger: log.Log, + Resolver: &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + switch domain { + case "api.ooni.io": + return []string{beaconAddress}, nil + default: + return nil, netxlite.ErrOODNSNoSuchHost + } + }, + }, + }, + Stats: stats, + } + + // obtain the tactics from the saved stats + var tactics []*httpsDialerTactic + for entry := range policy.LookupTactics(context.Background(), "api.ooni.io", "443") { + tactics = append(tactics, entry) + } + + // compute the list of results we expect to see from the stats data + var expect []*httpsDialerTactic + idx := 0 + for _, entry := range expectTacticsStats { + if entry.CountSuccess <= 0 { + continue // we SHOULD NOT include entries that systematically failed + } + t := entry.Tactic.Clone() + t.InitialDelay = happyEyeballsDelay(idx) + expect = append(expect, t) + idx++ + } + + // extend the expected list to include DNS results + expect = append(expect, &httpsDialerTactic{ + Address: beaconAddress, + InitialDelay: 2 * time.Second, + Port: "443", + SNI: "api.ooni.io", + VerifyHostname: "api.ooni.io", + }) + + // perform the actual comparison + if diff := cmp.Diff(expect, tactics); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("when we have duplicates", func(t *testing.T) { + // add each entry twice to create obvious duplicates + statsWithDupes := []*statsTactic{} + for _, entry := range expectTacticsStats { + statsWithDupes = append(statsWithDupes, entry.Clone()) + statsWithDupes = append(statsWithDupes, entry.Clone()) + } + + // create stats manager + stats := createStatsManager("api.ooni.io:443", statsWithDupes...) + + // create the composed policy + policy := &statsPolicy{ + Fallback: &dnsPolicy{ + Logger: log.Log, + Resolver: &mocks.Resolver{ + MockLookupHost: func(ctx context.Context, domain string) ([]string, error) { + switch domain { + case "api.ooni.io": + // Twice so we try to cause duplicate entries also with the DNS policy + return []string{beaconAddress, beaconAddress}, nil + default: + return nil, netxlite.ErrOODNSNoSuchHost + } + }, + }, + }, + Stats: stats, + } + + // obtain the tactics from the saved stats + var tactics []*httpsDialerTactic + for entry := range policy.LookupTactics(context.Background(), "api.ooni.io", "443") { + tactics = append(tactics, entry) + } + + // compute the list of results we expect to see from the stats data + var expect []*httpsDialerTactic + idx := 0 + for _, entry := range expectTacticsStats { + if entry.CountSuccess <= 0 { + continue // we SHOULD NOT include entries that systematically failed + } + t := entry.Tactic.Clone() + t.InitialDelay = happyEyeballsDelay(idx) + expect = append(expect, t) + idx++ + } + + // extend the expected list to include DNS results + expect = append(expect, &httpsDialerTactic{ + Address: beaconAddress, + InitialDelay: 2 * time.Second, + Port: "443", + SNI: "api.ooni.io", + VerifyHostname: "api.ooni.io", + }) + + // perform the actual comparison + if diff := cmp.Diff(expect, tactics); diff != "" { + t.Fatal(diff) + } + }) +}