Skip to content

Commit

Permalink
syncing up to 131e6abf640ed28e857907b6d86a1d4f74725e90
Browse files Browse the repository at this point in the history
Co-authored-by: Bruce Yu <[email protected]>
Co-authored-by: Dan LaMotte <[email protected]>
  • Loading branch information
3 people committed Sep 26, 2024
1 parent e317963 commit b4b2eb5
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 708 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## vNext

- Update signing rotation endpoint for APIs

## v1.15.1

- Updated debian packages to address a few vulnerabilities
Expand Down
36 changes: 1 addition & 35 deletions cmd/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/superblocksteam/agent/internal/metadata"
"github.com/superblocksteam/agent/internal/metrics"
"github.com/superblocksteam/agent/internal/registration"
"github.com/superblocksteam/agent/internal/resigner"
"github.com/superblocksteam/agent/internal/schedule"
signatureReconciler "github.com/superblocksteam/agent/internal/signature/reconciler"
signatureReconcilerServer "github.com/superblocksteam/agent/internal/signature/reconciler/server"
Expand Down Expand Up @@ -95,7 +94,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)

var (
Expand Down Expand Up @@ -186,10 +184,6 @@ func init() {
pflag.String("kafka.topic.metadata", "metadata.cloud", "Kafka topic for metadata")
pflag.Int("kafka.consumer.workers", 10, "")
pflag.Bool("kafka.enabled", false, "")
pflag.Bool("resigner.enabled", false, "Whether or not to enable the resigner")
pflag.Duration("resigner.flush.max.duration", 10*time.Second, "")
pflag.Int("resigner.flush.max.items", 10000, "")
pflag.Int("resigner.consumer.workers", 10, "Number of workers to use for the resigner")
pflag.Bool("events.cloud.enabled", false, "Whether or not to listen for cloud events")
pflag.String("events.cloud.url", "queue.intake.superblocks.com:8443", "URL to listen on for cloud events")
pflag.Bool("events.cloud.insecure", false, "Whether or not to use an insecure grpc connection for cloud events")
Expand Down Expand Up @@ -672,15 +666,6 @@ func main() {
}
}

// Communication between the resigner and the cloud events listener.
resignerConsumer := &events.Consumer{
Zero: func() protoreflect.ProtoMessage {
return new(securityv1.Resource)
},
Channel: make(chan protoreflect.ProtoMessage, viper.GetInt("resigner.consumer.workers")),
Wait: &sync.WaitGroup{},
}

var eventsCloudRunnable run.Runnable
{
headers := map[string]string{
Expand All @@ -694,9 +679,7 @@ func main() {
viper.GetString("events.cloud.url"),
headers,
&events.Options{
Consumers: map[string]*events.Consumer{
events.EventTypeResign: resignerConsumer,
},
Consumers: map[string]*events.Consumer{},
KeepaliveTime: viper.GetDuration("events.cloud.keepalive.time"),
KeepaliveTimeout: viper.GetDuration("events.cloud.keepalive.timeout"),
Insecure: viper.GetBool("events.cloud.insecure"),
Expand All @@ -705,22 +688,6 @@ func main() {
)
}

var resignerRunnable run.Runnable
if viper.GetBool("resigner.enabled") {
resignerRunnable = resigner.NewListener(
resigner.New(registry, logger),
serverHttpClient,
&resigner.Options{
Logger: logger,
Workers: viper.GetInt("resigner.consumer.workers"),
FlushMaxDuration: viper.GetDuration("resigner.flush.max.duration"),
FlushMaxItems: viper.GetInt("resigner.flush.max.items"),
Queue: resignerConsumer.Channel,
Wait: resignerConsumer.Wait,
},
)
}

var registrator registration.Registrator
{
registrator = registration.New(&registration.Options{
Expand Down Expand Up @@ -1070,7 +1037,6 @@ func main() {

g.Add(viper.GetBool("kafka.enabled"), kafkaConsumerRunnable)
g.Add(viper.GetBool("kafka.enabled"), kafkaProducerRunnable)
g.Add(viper.GetBool("resigner.enabled"), resignerRunnable)
g.Add(viper.GetBool("jobs.enabled"), scheduledJobRunner)
g.Add(viper.GetBool("events.cloud.enabled"), eventsCloudRunnable)

Expand Down
Loading

0 comments on commit b4b2eb5

Please sign in to comment.