Skip to content

Commit

Permalink
feat(enginenetx): add policy based on stats (#1312)
Browse files Browse the repository at this point in the history
This policy first checks the stats and sorts the entries for which we
have the highest nonzero success rate. Then, we use an underlying policy
to produce additional suitable tactics for the HTTPS dialer.

There is a check preventing the same tactic from being emitted twice.
This means that, if we emit a tactic looking into the stats, we're not
going to emit the same exact tactic again when using our beacons policy
generator.

Also, because we try using stable sorting, and assuming some tactics
work, we would most likely continue using the tactics that work, until
something bad happens and we need to try and see whether other tactics
work.

While there, rename statstracker{,_test}.go to statsmanager{,_test}.go
because the type inside it is called "state manager". The state tracker
is an abstract concept only used by the https dialer code.

Part of ooni/probe#2531
  • Loading branch information
bassosimone authored Sep 26, 2023
1 parent 619d536 commit 4e1abe0
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 0 deletions.
File renamed without changes.
File renamed without changes.
95 changes: 95 additions & 0 deletions internal/enginenetx/statspolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package enginenetx

//
// Scheduling policy based on stats that fallbacks to
// another policy after it has produced all the working
// tactics we can produce given the current stats.
//

import (
"context"
"sort"
)

// statsPolicy is a policy that schedules tactics already known
// to work based on statistics and defers to a fallback policy
// once it has generated all the tactics known to work.
//
// The zero value of this struct is invalid; please, make sure you
// fill all the fields marked as MANDATORY.
type statsPolicy struct {
// Fallback is the MANDATORY fallback policy.
Fallback httpsDialerPolicy

// Stats is the MANDATORY stats manager.
Stats *statsManager
}

var _ httpsDialerPolicy = &statsPolicy{}

// LookupTactics implements HTTPSDialerPolicy.
func (p *statsPolicy) LookupTactics(ctx context.Context, domain string, port string) <-chan *httpsDialerTactic {
out := make(chan *httpsDialerTactic)

go func() {
index := 0
defer close(out)

// make sure we don't emit two equal policy in a single run
uniq := make(map[string]int)

// function that emits a given tactic unless we already emitted it
maybeEmitTactic := func(t *httpsDialerTactic) {
key := t.tacticSummaryKey()
if uniq[key] > 0 {
return
}
uniq[key]++
t.InitialDelay = happyEyeballsDelay(index)
index += 1
out <- t
}

// give priority to what we know from stats
for _, t := range p.statsLookupTactics(domain, port) {
maybeEmitTactic(t)
}

// fallback to the secondary policy
for t := range p.Fallback.LookupTactics(ctx, domain, port) {
maybeEmitTactic(t)
}
}()

return out
}

func (p *statsPolicy) statsLookupTactics(domain string, port string) (out []*httpsDialerTactic) {
tactics := p.Stats.LookupTactics(domain, port)

successRate := func(t *statsTactic) (rate float64) {
if t.CountStarted > 0 {
rate = float64(t.CountSuccess) / float64(t.CountStarted)
}
return
}

sort.SliceStable(tactics, func(i, j int) bool {
// Implementation note: the function should implement the "less" semantics
// but we want descending sort, so we're using a "more" semantics
//
// TODO(bassosimone): should we also consider the number of samples
// we have and how recent a sample is?
return successRate(tactics[i]) > successRate(tactics[j])
})

for _, t := range tactics {
// make sure we only include samples with 1+ successes; we don't want this policy
// to return what we already know it's not working and it will be the purpose of the
// fallback policy to generate new tactics to test
if t.CountSuccess > 0 {
out = append(out, t.Tactic)
}
}
return
}
225 changes: 225 additions & 0 deletions internal/enginenetx/statspolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package enginenetx

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/apex/log"
"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/kvstore"
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/netemx"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

func TestStatsPolicyWorkingAsIntended(t *testing.T) {
// prepare the content of the stats
twentyMinutesAgo := time.Now().Add(-20 * time.Minute)

const beaconAddress = netemx.AddressApiOONIIo

expectTacticsStats := []*statsTactic{{
CountStarted: 5,
CountTCPConnectError: 0,
CountTCPConnectInterrupt: 0,
CountTLSHandshakeError: 0,
CountTLSHandshakeInterrupt: 0,
CountTLSVerificationError: 0,
CountSuccess: 5,
HistoTCPConnectError: map[string]int64{},
HistoTLSHandshakeError: map[string]int64{},
HistoTLSVerificationError: map[string]int64{},
LastUpdated: twentyMinutesAgo,
Tactic: &httpsDialerTactic{
Address: beaconAddress,
InitialDelay: 0,
Port: "443",
SNI: "www.repubblica.it",
VerifyHostname: "api.ooni.io",
},
}, {
CountStarted: 3,
CountTCPConnectError: 0,
CountTCPConnectInterrupt: 0,
CountTLSHandshakeError: 1,
CountTLSHandshakeInterrupt: 0,
CountTLSVerificationError: 0,
CountSuccess: 2,
HistoTCPConnectError: map[string]int64{},
HistoTLSHandshakeError: map[string]int64{},
HistoTLSVerificationError: map[string]int64{},
LastUpdated: twentyMinutesAgo,
Tactic: &httpsDialerTactic{
Address: beaconAddress,
InitialDelay: 0,
Port: "443",
SNI: "www.kernel.org",
VerifyHostname: "api.ooni.io",
},
}, {
CountStarted: 3,
CountTCPConnectError: 0,
CountTCPConnectInterrupt: 0,
CountTLSHandshakeError: 3,
CountTLSHandshakeInterrupt: 0,
CountTLSVerificationError: 0,
CountSuccess: 0,
HistoTCPConnectError: map[string]int64{},
HistoTLSHandshakeError: map[string]int64{},
HistoTLSVerificationError: map[string]int64{},
LastUpdated: twentyMinutesAgo,
Tactic: &httpsDialerTactic{
Address: beaconAddress,
InitialDelay: 0,
Port: "443",
SNI: "theconversation.com",
VerifyHostname: "api.ooni.io",
},
}}

// createStatsManager creates a stats manager given some baseline stats
createStatsManager := func(domainEndpoint string, tactics ...*statsTactic) *statsManager {
container := &statsContainer{
DomainEndpoints: map[string]*statsDomainEndpoint{
domainEndpoint: {
Tactics: map[string]*statsTactic{},
},
},
Version: statsContainerVersion,
}

for _, tx := range tactics {
container.DomainEndpoints[domainEndpoint].Tactics[tx.Tactic.tacticSummaryKey()] = tx
}

kvStore := &kvstore.Memory{}
if err := kvStore.Set(statsKey, runtimex.Try1(json.Marshal(container))); err != nil {
t.Fatal(err)
}

return newStatsManager(kvStore, log.Log)
}

t.Run("when we have unique statistics", func(t *testing.T) {
// create stats manager
stats := createStatsManager("api.ooni.io:443", expectTacticsStats...)

// create the composed policy
policy := &statsPolicy{
Fallback: &dnsPolicy{
Logger: log.Log,
Resolver: &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
switch domain {
case "api.ooni.io":
return []string{beaconAddress}, nil
default:
return nil, netxlite.ErrOODNSNoSuchHost
}
},
},
},
Stats: stats,
}

// obtain the tactics from the saved stats
var tactics []*httpsDialerTactic
for entry := range policy.LookupTactics(context.Background(), "api.ooni.io", "443") {
tactics = append(tactics, entry)
}

// compute the list of results we expect to see from the stats data
var expect []*httpsDialerTactic
idx := 0
for _, entry := range expectTacticsStats {
if entry.CountSuccess <= 0 {
continue // we SHOULD NOT include entries that systematically failed
}
t := entry.Tactic.Clone()
t.InitialDelay = happyEyeballsDelay(idx)
expect = append(expect, t)
idx++
}

// extend the expected list to include DNS results
expect = append(expect, &httpsDialerTactic{
Address: beaconAddress,
InitialDelay: 2 * time.Second,
Port: "443",
SNI: "api.ooni.io",
VerifyHostname: "api.ooni.io",
})

// perform the actual comparison
if diff := cmp.Diff(expect, tactics); diff != "" {
t.Fatal(diff)
}
})

t.Run("when we have duplicates", func(t *testing.T) {
// add each entry twice to create obvious duplicates
statsWithDupes := []*statsTactic{}
for _, entry := range expectTacticsStats {
statsWithDupes = append(statsWithDupes, entry.Clone())
statsWithDupes = append(statsWithDupes, entry.Clone())
}

// create stats manager
stats := createStatsManager("api.ooni.io:443", statsWithDupes...)

// create the composed policy
policy := &statsPolicy{
Fallback: &dnsPolicy{
Logger: log.Log,
Resolver: &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
switch domain {
case "api.ooni.io":
// Twice so we try to cause duplicate entries also with the DNS policy
return []string{beaconAddress, beaconAddress}, nil
default:
return nil, netxlite.ErrOODNSNoSuchHost
}
},
},
},
Stats: stats,
}

// obtain the tactics from the saved stats
var tactics []*httpsDialerTactic
for entry := range policy.LookupTactics(context.Background(), "api.ooni.io", "443") {
tactics = append(tactics, entry)
}

// compute the list of results we expect to see from the stats data
var expect []*httpsDialerTactic
idx := 0
for _, entry := range expectTacticsStats {
if entry.CountSuccess <= 0 {
continue // we SHOULD NOT include entries that systematically failed
}
t := entry.Tactic.Clone()
t.InitialDelay = happyEyeballsDelay(idx)
expect = append(expect, t)
idx++
}

// extend the expected list to include DNS results
expect = append(expect, &httpsDialerTactic{
Address: beaconAddress,
InitialDelay: 2 * time.Second,
Port: "443",
SNI: "api.ooni.io",
VerifyHostname: "api.ooni.io",
})

// perform the actual comparison
if diff := cmp.Diff(expect, tactics); diff != "" {
t.Fatal(diff)
}
})
}

0 comments on commit 4e1abe0

Please sign in to comment.