Skip to content

Commit

Permalink
Merge branch 'main' into daniel/doc/add_validate_to_source_verifier
Browse files Browse the repository at this point in the history
  • Loading branch information
danvixent authored Oct 22, 2024
2 parents 51d2e71 + 91d6601 commit 6c8e386
Show file tree
Hide file tree
Showing 35 changed files with 633 additions and 211 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v6
with:
version: latest
only-new-issues: true
Expand Down
51 changes: 31 additions & 20 deletions api/handlers/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/frain-dev/convoy/internal/pkg/fflag"
"github.com/frain-dev/convoy/pkg/circuit_breaker"

Check failure on line 9 in api/handlers/endpoint.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest, 15, 6.2.6)

other declaration of circuit_breaker

Check failure on line 9 in api/handlers/endpoint.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest, 15, 6.2.6)

other declaration of circuit_breaker
"github.com/frain-dev/convoy/pkg/msgpack"

Check failure on line 10 in api/handlers/endpoint.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest, 15, 6.2.6)

other declaration of msgpack

Check failure on line 10 in api/handlers/endpoint.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest, 15, 6.2.6)

other declaration of msgpack
"net/http"
"time"

Expand Down Expand Up @@ -212,29 +216,31 @@ func (h *Handler) GetEndpoints(w http.ResponseWriter, r *http.Request) {
return
}

// fetch keys from redis and mutate endpoints slice
keys := make([]string, len(endpoints))
for i := 0; i < len(endpoints); i++ {
keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID)
}
if h.A.FFlag.CanAccessFeature(fflag.CircuitBreaker) && h.A.Licenser.CircuitBreaking() && len(endpoints) > 0 {
// fetch keys from redis and mutate endpoints slice
keys := make([]string, len(endpoints))
for i := 0; i < len(endpoints); i++ {
keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID)
}

cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result()
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}
cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result()
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

for i := 0; i < len(cbs); i++ {
if cbs[i] != nil {
str, ok := cbs[i].(string)
if ok {
var c circuit_breaker.CircuitBreaker
asBytes := []byte(str)
innerErr := msgpack.DecodeMsgPack(asBytes, &c)
if innerErr != nil {
continue
for i := 0; i < len(cbs); i++ {
if cbs[i] != nil {
str, ok := cbs[i].(string)
if ok {
var c circuit_breaker.CircuitBreaker
asBytes := []byte(str)
innerErr := msgpack.DecodeMsgPack(asBytes, &c)
if innerErr != nil {
continue
}
endpoints[i].FailureRate = c.FailureRate
}
endpoints[i].FailureRate = c.FailureRate
}
}
}
Expand Down Expand Up @@ -506,6 +512,11 @@ func (h *Handler) PauseEndpoint(w http.ResponseWriter, r *http.Request) {
// @Security ApiKeyAuth
// @Router /v1/projects/{projectID}/endpoints/{endpointID}/activate [post]
func (h *Handler) ActivateEndpoint(w http.ResponseWriter, r *http.Request) {
if !h.A.Licenser.CircuitBreaking() || !h.A.FFlag.CanAccessFeature(fflag.CircuitBreaker) {
_ = render.Render(w, r, util.NewErrorResponse("feature not enabled", http.StatusBadRequest))
return
}

project, err := h.retrieveProject(r)
if err != nil {
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
Expand Down
2 changes: 2 additions & 0 deletions api/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/frain-dev/convoy/internal/pkg/fflag"
"io"
"math/rand"
"net/http"
Expand Down Expand Up @@ -137,6 +138,7 @@ func buildServer() *ApplicationHandler {
Redis: rd.Client(),
Logger: logger,
Cache: noopCache,
FFlag: fflag.NewFFlag([]string{string(fflag.Prometheus), string(fflag.FullTextSearch)}),
Rate: r,
Licenser: noopLicenser.NewLicenser(),
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func startServerComponent(_ context.Context, a *cli.App) error {
lo.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag(&cfg)
flag := fflag.NewFFlag(cfg.EnableFeatureFlag)

lvl, err := log.ParseLevel(cfg.Logger.Level)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ff/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func AddFeatureFlagsCommand() *cobra.Command {
log.WithError(err).Fatal("Error fetching the config.")
}

f := fflag2.NewFFlag(&cfg)
f := fflag2.NewFFlag(cfg.EnableFeatureFlag)
return f.ListFeatures()
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {},
Expand Down
52 changes: 35 additions & 17 deletions cmd/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/frain-dev/convoy/internal/pkg/limiter"

"github.com/frain-dev/convoy/util"
"github.com/grafana/pyroscope-go"
pyro "github.com/grafana/pyroscope-go"

fflag2 "github.com/frain-dev/convoy/internal/pkg/fflag"

Expand Down Expand Up @@ -272,34 +272,34 @@ func PostRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args
}

func enableProfiling(cfg config.Configuration, cmd *cobra.Command) error {
_, err := pyroscope.Start(pyroscope.Config{
_, err := pyro.Start(pyro.Config{
ApplicationName: cfg.Pyroscope.ProfileID,
Tags: map[string]string{
"cmd": cmd.Use,
},
// replace this with the address of pyroscope server
// replace this with the address of pyro server
ServerAddress: cfg.Pyroscope.URL,

// you can disable logging by setting this to nil
// Logger: pyroscope.StandardLogger,
// Logger: pyro.StandardLogger,
UploadRate: time.Second * 5,

// optionally, if authentication is enabled, specify the API key:
BasicAuthUser: cfg.Pyroscope.Username,
BasicAuthPassword: cfg.Pyroscope.Password,

// but you can select the ones you want to use:
ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileInuseSpace,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileGoroutines,
pyroscope.ProfileMutexCount,
pyroscope.ProfileMutexDuration,
pyroscope.ProfileBlockCount,
pyroscope.ProfileBlockDuration,
ProfileTypes: []pyro.ProfileType{
pyro.ProfileCPU,
pyro.ProfileInuseObjects,
pyro.ProfileAllocObjects,
pyro.ProfileInuseSpace,
pyro.ProfileAllocSpace,
pyro.ProfileGoroutines,
pyro.ProfileMutexCount,
pyro.ProfileMutexDuration,
pyro.ProfileBlockCount,
pyro.ProfileBlockDuration,
},
})
return err
Expand Down Expand Up @@ -519,13 +519,31 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
c.RetentionPolicy.IsRetentionPolicyEnabled = retentionPolicyEnabled
}

// Feature flags
// CONVOY_ENABLE_FEATURE_FLAG
fflag, err := cmd.Flags().GetStringSlice("enable-feature-flag")
if err != nil {
return nil, err
}
c.EnableFeatureFlag = fflag

// CONVOY_DISPATCHER_BLOCK_LIST
ipBlockList, err := cmd.Flags().GetStringSlice("ip-block-list")
if err != nil {
return nil, err
}
if len(ipBlockList) > 0 {
c.Dispatcher.BlockList = ipBlockList
}

// CONVOY_DISPATCHER_ALLOW_LIST
ipAllowList, err := cmd.Flags().GetStringSlice("ip-allow-list")
if err != nil {
return nil, err
}
if len(ipAllowList) > 0 {
c.Dispatcher.AllowList = ipAllowList
}

// tracing
tracingProvider, err := cmd.Flags().GetString("tracer-type")
if err != nil {
Expand Down Expand Up @@ -585,7 +603,7 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {

}

flag := fflag2.NewFFlag(c)
flag := fflag2.NewFFlag(c.EnableFeatureFlag)
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func main() {
var dbDatabase string

var fflag []string
var ipAllowList []string
var ipBLockList []string
var enableProfiling bool

var redisPort int
Expand Down Expand Up @@ -105,6 +107,9 @@ func main() {

// misc
c.Flags().StringSliceVar(&fflag, "enable-feature-flag", []string{}, "List of feature flags to enable e.g. \"full-text-search,prometheus\"")
c.Flags().StringSliceVar(&ipAllowList, "ip-allow-list", []string{}, "List of IPs CIDRs to allow e.g. \" 0.0.0.0/0,127.0.0.0/8\"")
c.Flags().StringSliceVar(&ipBLockList, "ip-block-list", []string{}, "List of IPs CIDRs to block e.g. \" 0.0.0.0/0,127.0.0.0/8\"")

c.Flags().IntVar(&instanceIngestRate, "instance-ingest-rate", 0, "Instance ingest Rate")
c.Flags().IntVar(&apiRateLimit, "api-rate-limit", 0, "API rate limit")

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func startConvoyServer(a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag(&cfg)
flag := fflag.NewFFlag(cfg.EnableFeatureFlag)

if cfg.Server.HTTP.Port <= 0 {
return errors.New("please provide the HTTP port in the convoy.json file")
Expand Down
12 changes: 10 additions & 2 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,26 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte

go memorystore.DefaultStore.Sync(ctx, interval)

featureFlag := fflag.NewFFlag(cfg.EnableFeatureFlag)
newTelemetry := telemetry.NewTelemetry(lo, configuration,
telemetry.OptionTracker(counter),
telemetry.OptionBackend(pb),
telemetry.OptionBackend(mb))

dispatcher, err := net.NewDispatcher(cfg.Server.HTTP.HttpProxy, a.Licenser, false)
dispatcher, err := net.NewDispatcher(
a.Licenser,
featureFlag,
net.LoggerOption(lo),
net.ProxyOption(cfg.Server.HTTP.HttpProxy),
net.AllowListOption(cfg.Dispatcher.AllowList),
net.BlockListOption(cfg.Dispatcher.BlockList),
net.InsecureSkipVerifyOption(cfg.Dispatcher.InsecureSkipVerify),
)
if err != nil {
lo.WithError(err).Fatal("Failed to create new net dispatcher")
return err
}

featureFlag := fflag.NewFFlag(&cfg)
var circuitBreakerManager *cb.CircuitBreakerManager

if featureFlag.CanAccessFeature(fflag.CircuitBreaker) {
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ var DefaultConfiguration = Configuration{
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down Expand Up @@ -388,6 +393,13 @@ type Configuration struct {
WorkerExecutionMode ExecutionMode `json:"worker_execution_mode" envconfig:"CONVOY_WORKER_EXECUTION_MODE"`
MaxRetrySeconds uint64 `json:"max_retry_seconds,omitempty" envconfig:"CONVOY_MAX_RETRY_SECONDS"`
LicenseKey string `json:"license_key" envconfig:"CONVOY_LICENSE_KEY"`
Dispatcher DispatcherConfiguration `json:"dispatcher"`
}

type DispatcherConfiguration struct {
InsecureSkipVerify bool `json:"insecure_skip_verify" envconfig:"CONVOY_DISPATCHER_INSECURE_SKIP_VERIFY"`
AllowList []string `json:"allow_list" envconfig:"CONVOY_DISPATCHER_ALLOW_LIST"`
BlockList []string `json:"block_list" envconfig:"CONVOY_DISPATCHER_BLOCK_LIST"`
}

type PyroscopeConfiguration struct {
Expand Down
15 changes: 15 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
WorkerExecutionMode: DefaultExecutionMode,
InstanceIngestRate: 25,
ApiRateLimit: 25,
Expand Down Expand Up @@ -265,6 +270,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down Expand Up @@ -351,6 +361,11 @@ func TestLoadConfig(t *testing.T) {
SampleTime: 5,
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
InstanceIngestRate: 25,
ApiRateLimit: 25,
WorkerExecutionMode: DefaultExecutionMode,
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ require (
github.com/go-redsync/redsync/v4 v4.8.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/gorilla/websocket v1.5.0
github.com/grafana/pyroscope-go v1.1.1
github.com/grafana/pyroscope-go v1.1.2
github.com/hibiken/asynq v0.24.1
github.com/hibiken/asynq/x v0.0.0-20221219051101-0b8cfad70341
github.com/jackc/pgx/v5 v5.6.0
github.com/jarcoal/httpmock v1.3.1
github.com/jaswdr/faker v1.10.2
Expand Down Expand Up @@ -147,7 +146,7 @@ require (
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -206,6 +205,7 @@ require (
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 // indirect
github.com/stealthrocket/netjail v0.1.2 // indirect
github.com/theupdateframework/notary v0.7.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand Down
Loading

0 comments on commit 6c8e386

Please sign in to comment.