Skip to content

Commit

Permalink
restructure packetgen job similar to rawnet
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Mar 31, 2022
1 parent 8b0bdac commit 9ef2c3b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/core/packetgen/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn rawConn) Write(packet Packet) (n int, err error) {
type netConnConfig struct {
Protocol string
Address string
ProxyURLs string
ProxyURLs string `mapstructure:"proxy_urls"`
Timeout time.Duration
}

Expand Down
79 changes: 59 additions & 20 deletions src/job/packetgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,54 +37,93 @@ import (
"github.com/Arriven/db1000n/src/utils/templates"
)

type packetgenJobConfig struct {
BasicJobConfig
Packet *templates.MapStruct
Connection packetgen.ConnectionConfig
}

func packetgenJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (data interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var jobConfig struct {
BasicJobConfig
Packet map[string]interface{}
Connection packetgen.ConnectionConfig
}

if err := ParseConfig(&jobConfig, args, *globalConfig); err != nil {
jobConfig, err := parsePacketgenArgs(ctx, logger, globalConfig, args)
if err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

conn, err := packetgen.OpenConnection(jobConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error building raw connection: %w", err)
backoffController := utils.NewBackoffController(utils.NonNilBackoffConfigOrDefault(jobConfig.BackoffConfig, globalConfig.Backoff))

trafficMonitor := metrics.Default.NewWriter(metrics.Traffic, uuid.New().String())
go trafficMonitor.Update(ctx, time.Second)

for jobConfig.Next(ctx) {
err := sendPacket(ctx, logger, jobConfig, trafficMonitor)
if err != nil {
logger.Debug("error sending packet", zap.Error(err), zap.Any("args", args))
utils.Sleep(ctx, backoffController.Increment().GetTimeout())
} else {
backoffController.Reset()
}
}

packetTpl, err := templates.ParseMapStruct(jobConfig.Packet)
return nil, nil
}

func sendPacket(ctx context.Context, logger *zap.Logger, jobConfig *packetgenJobConfig, trafficMonitor *metrics.Writer) error {
conn, err := packetgen.OpenConnection(jobConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error parsing packet: %w", err)
return err
}

trafficMonitor := metrics.Default.NewWriter(metrics.Traffic, uuid.New().String())
go trafficMonitor.Update(ctx, time.Second)

for jobConfig.Next(ctx) {
packetConfigRaw := packetTpl.Execute(logger, ctx)
packetConfigRaw := jobConfig.Packet.Execute(logger, ctx)
logger.Debug("rendered packet config template", zap.Reflect("config", packetConfigRaw))

var packetConfig packetgen.PacketConfig
if err := utils.Decode(packetConfigRaw, &packetConfig); err != nil {
return nil, fmt.Errorf("error parsing packet config: %w", err)
return err
}

packet, err := packetConfig.Build()
if err != nil {
return nil, fmt.Errorf("error building packet: %w", err)
return err
}

n, err := conn.Write(packet)
if err != nil {
return nil, fmt.Errorf("error sending packet: %w", err)
return err
}

trafficMonitor.Add(uint64(n))
}

return nil, nil
return nil
}

func parsePacketgenArgs(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig, args config.Args) (tpl *packetgenJobConfig, err error) {
var jobConfig struct {
BasicJobConfig
Packet map[string]interface{}
Connection packetgen.ConnectionConfig
}

if err = ParseConfig(&jobConfig, args, *globalConfig); err != nil {
return nil, fmt.Errorf("error parsing job config: %w", err)
}

packetTpl, err := templates.ParseMapStruct(jobConfig.Packet)
if err != nil {
return nil, fmt.Errorf("error parsing packet: %w", err)
}

if globalConfig.ProxyURLs != "" {
jobConfig.Connection.Args["proxy_urls"] = templates.ParseAndExecute(logger, globalConfig.ProxyURLs, ctx)
}

return &packetgenJobConfig{
BasicJobConfig: jobConfig.BasicJobConfig,
Packet: packetTpl,
Connection: jobConfig.Connection,
}, nil
}

0 comments on commit 9ef2c3b

Please sign in to comment.