Skip to content

Commit

Permalink
Add allow and block IP lists (#2169)
Browse files Browse the repository at this point in the history
* feat: add ip allow and block lists to dispatcher; update dispatcher config

* feat: added more validation

* chore: fix lint errors

* feat: put ip rules behind a feature flag and license; add flags for ip rules; update tests

* chore: update allow-list rules for testcon tests

* chore: update allow-list rules for testcon tests

* feat: use default transport when the feature is disabled.

* feat: update tests
  • Loading branch information
jirevwe authored Oct 22, 2024
1 parent 52acc9e commit 91d6601
Show file tree
Hide file tree
Showing 29 changed files with 535 additions and 86 deletions.
48 changes: 28 additions & 20 deletions api/handlers/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/frain-dev/convoy/internal/pkg/fflag"
"github.com/frain-dev/convoy/pkg/circuit_breaker"
"github.com/frain-dev/convoy/pkg/msgpack"
"net/http"
Expand Down Expand Up @@ -211,29 +212,31 @@ func (h *Handler) GetEndpoints(w http.ResponseWriter, r *http.Request) {
return
}

// fetch keys from redis and mutate endpoints slice
keys := make([]string, len(endpoints))
for i := 0; i < len(endpoints); i++ {
keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID)
}
if h.A.FFlag.CanAccessFeature(fflag.CircuitBreaker) && h.A.Licenser.CircuitBreaking() && len(endpoints) > 0 {
// fetch keys from redis and mutate endpoints slice
keys := make([]string, len(endpoints))
for i := 0; i < len(endpoints); i++ {
keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID)
}

cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result()
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}
cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result()
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

for i := 0; i < len(cbs); i++ {
if cbs[i] != nil {
str, ok := cbs[i].(string)
if ok {
var c circuit_breaker.CircuitBreaker
asBytes := []byte(str)
innerErr := msgpack.DecodeMsgPack(asBytes, &c)
if innerErr != nil {
continue
for i := 0; i < len(cbs); i++ {
if cbs[i] != nil {
str, ok := cbs[i].(string)
if ok {
var c circuit_breaker.CircuitBreaker
asBytes := []byte(str)
innerErr := msgpack.DecodeMsgPack(asBytes, &c)
if innerErr != nil {
continue
}
endpoints[i].FailureRate = c.FailureRate
}
endpoints[i].FailureRate = c.FailureRate
}
}
}
Expand Down Expand Up @@ -505,6 +508,11 @@ func (h *Handler) PauseEndpoint(w http.ResponseWriter, r *http.Request) {
// @Security ApiKeyAuth
// @Router /v1/projects/{projectID}/endpoints/{endpointID}/activate [post]
func (h *Handler) ActivateEndpoint(w http.ResponseWriter, r *http.Request) {
if !h.A.Licenser.CircuitBreaking() || !h.A.FFlag.CanAccessFeature(fflag.CircuitBreaker) {
_ = render.Render(w, r, util.NewErrorResponse("feature not enabled", http.StatusBadRequest))
return
}

project, err := h.retrieveProject(r)
if err != nil {
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
Expand Down
2 changes: 2 additions & 0 deletions api/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/frain-dev/convoy/internal/pkg/fflag"
"io"
"math/rand"
"net/http"
Expand Down Expand Up @@ -137,6 +138,7 @@ func buildServer() *ApplicationHandler {
Redis: rd.Client(),
Logger: logger,
Cache: noopCache,
FFlag: fflag.NewFFlag([]string{string(fflag.Prometheus), string(fflag.FullTextSearch)}),
Rate: r,
Licenser: noopLicenser.NewLicenser(),
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func startServerComponent(_ context.Context, a *cli.App) error {
lo.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag(&cfg)
flag := fflag.NewFFlag(cfg.EnableFeatureFlag)

lvl, err := log.ParseLevel(cfg.Logger.Level)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ff/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func AddFeatureFlagsCommand() *cobra.Command {
log.WithError(err).Fatal("Error fetching the config.")
}

f := fflag2.NewFFlag(&cfg)
f := fflag2.NewFFlag(cfg.EnableFeatureFlag)
return f.ListFeatures()
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {},
Expand Down
22 changes: 20 additions & 2 deletions cmd/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,31 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
c.RetentionPolicy.IsRetentionPolicyEnabled = retentionPolicyEnabled
}

// Feature flags
// CONVOY_ENABLE_FEATURE_FLAG
fflag, err := cmd.Flags().GetStringSlice("enable-feature-flag")
if err != nil {
return nil, err
}
c.EnableFeatureFlag = fflag

// CONVOY_DISPATCHER_BLOCK_LIST
ipBlockList, err := cmd.Flags().GetStringSlice("ip-block-list")
if err != nil {
return nil, err
}
if len(ipBlockList) > 0 {
c.Dispatcher.BlockList = ipBlockList
}

// CONVOY_DISPATCHER_ALLOW_LIST
ipAllowList, err := cmd.Flags().GetStringSlice("ip-allow-list")
if err != nil {
return nil, err
}
if len(ipAllowList) > 0 {
c.Dispatcher.AllowList = ipAllowList
}

// tracing
tracingProvider, err := cmd.Flags().GetString("tracer-type")
if err != nil {
Expand Down Expand Up @@ -585,7 +603,7 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {

}

flag := fflag2.NewFFlag(c)
flag := fflag2.NewFFlag(c.EnableFeatureFlag)
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func main() {
var dbDatabase string

var fflag []string
var ipAllowList []string
var ipBLockList []string
var enableProfiling bool

var redisPort int
Expand Down Expand Up @@ -105,6 +107,9 @@ func main() {

// misc
c.Flags().StringSliceVar(&fflag, "enable-feature-flag", []string{}, "List of feature flags to enable e.g. \"full-text-search,prometheus\"")
c.Flags().StringSliceVar(&ipAllowList, "ip-allow-list", []string{}, "List of IPs CIDRs to allow e.g. \" 0.0.0.0/0,127.0.0.0/8\"")
c.Flags().StringSliceVar(&ipBLockList, "ip-block-list", []string{}, "List of IPs CIDRs to block e.g. \" 0.0.0.0/0,127.0.0.0/8\"")

c.Flags().IntVar(&instanceIngestRate, "instance-ingest-rate", 0, "Instance ingest Rate")
c.Flags().IntVar(&apiRateLimit, "api-rate-limit", 0, "API rate limit")

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func startConvoyServer(a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag(&cfg)
flag := fflag.NewFFlag(cfg.EnableFeatureFlag)

if cfg.Server.HTTP.Port <= 0 {
return errors.New("please provide the HTTP port in the convoy.json file")
Expand Down
12 changes: 10 additions & 2 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,26 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte

go memorystore.DefaultStore.Sync(ctx, interval)

featureFlag := fflag.NewFFlag(cfg.EnableFeatureFlag)
newTelemetry := telemetry.NewTelemetry(lo, configuration,
telemetry.OptionTracker(counter),
telemetry.OptionBackend(pb),
telemetry.OptionBackend(mb))

dispatcher, err := net.NewDispatcher(cfg.Server.HTTP.HttpProxy, a.Licenser, false)
dispatcher, err := net.NewDispatcher(
a.Licenser,
featureFlag,
net.LoggerOption(lo),
net.ProxyOption(cfg.Server.HTTP.HttpProxy),
net.AllowListOption(cfg.Dispatcher.AllowList),
net.BlockListOption(cfg.Dispatcher.BlockList),
net.InsecureSkipVerifyOption(cfg.Dispatcher.InsecureSkipVerify),
)
if err != nil {
lo.WithError(err).Fatal("Failed to create new net dispatcher")
return err
}

featureFlag := fflag.NewFFlag(&cfg)
var circuitBreakerManager *cb.CircuitBreakerManager

if featureFlag.CanAccessFeature(fflag.CircuitBreaker) {
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ var DefaultConfiguration = Configuration{
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down Expand Up @@ -388,6 +393,13 @@ type Configuration struct {
WorkerExecutionMode ExecutionMode `json:"worker_execution_mode" envconfig:"CONVOY_WORKER_EXECUTION_MODE"`
MaxRetrySeconds uint64 `json:"max_retry_seconds,omitempty" envconfig:"CONVOY_MAX_RETRY_SECONDS"`
LicenseKey string `json:"license_key" envconfig:"CONVOY_LICENSE_KEY"`
Dispatcher DispatcherConfiguration `json:"dispatcher"`
}

type DispatcherConfiguration struct {
InsecureSkipVerify bool `json:"insecure_skip_verify" envconfig:"CONVOY_DISPATCHER_INSECURE_SKIP_VERIFY"`
AllowList []string `json:"allow_list" envconfig:"CONVOY_DISPATCHER_ALLOW_LIST"`
BlockList []string `json:"block_list" envconfig:"CONVOY_DISPATCHER_BLOCK_LIST"`
}

type PyroscopeConfiguration struct {
Expand Down
15 changes: 15 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
WorkerExecutionMode: DefaultExecutionMode,
InstanceIngestRate: 25,
ApiRateLimit: 25,
Expand Down Expand Up @@ -265,6 +270,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down Expand Up @@ -351,6 +361,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ require (
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 // indirect
github.com/stealthrocket/netjail v0.1.2 // indirect
github.com/theupdateframework/notary v0.7.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,8 @@ github.com/spf13/viper v0.0.0-20150530192845-be5ff3e4840c/go.mod h1:A8kyI5cUJhb8
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/stealthrocket/netjail v0.1.2 h1:nOgFLer7XrkYcn8cJk5kI9aUFRkV7LC/8VjmJ2GjBQU=
github.com/stealthrocket/netjail v0.1.2/go.mod h1:LmslfwZTxTchb7koch3C/MNvEzF111G9HwZQrT23No4=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
Expand Down
11 changes: 7 additions & 4 deletions internal/pkg/fflag/fflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package fflag
import (
"errors"
"fmt"
"github.com/frain-dev/convoy/config"
"os"
"sort"
"text/tabwriter"
Expand All @@ -18,9 +17,10 @@ type (
)

const (
IpRules FeatureFlagKey = "ip-rules"
Prometheus FeatureFlagKey = "prometheus"
FullTextSearch FeatureFlagKey = "full-text-search"
CircuitBreaker FeatureFlagKey = "circuit-breaker"
FullTextSearch FeatureFlagKey = "full-text-search"
)

type (
Expand All @@ -33,6 +33,7 @@ const (
)

var DefaultFeaturesState = map[FeatureFlagKey]FeatureFlagState{
IpRules: disabled,
Prometheus: disabled,
FullTextSearch: disabled,
CircuitBreaker: disabled,
Expand All @@ -42,13 +43,15 @@ type FFlag struct {
Features map[FeatureFlagKey]FeatureFlagState
}

func NewFFlag(c *config.Configuration) *FFlag {
func NewFFlag(enableFeatureFlags []string) *FFlag {
f := &FFlag{
Features: clone(DefaultFeaturesState),
}

for _, flag := range c.EnableFeatureFlag {
for _, flag := range enableFeatureFlags {
switch flag {
case string(IpRules):
f.Features[IpRules] = enabled
case string(Prometheus):
f.Features[Prometheus] = enabled
case string(FullTextSearch):
Expand Down
Loading

0 comments on commit 91d6601

Please sign in to comment.