Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Commit

Permalink
WIP using runtime sync
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Aug 31, 2021
1 parent 3fa2821 commit beb7109
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 142 deletions.
61 changes: 46 additions & 15 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,42 @@ import (
const version = "v0.0.1"

type Amqp struct {
modules.InstanceCore

Version string
Connection *amqpDriver.Connection
Queue *Queue
Exchange *Exchange
Queue *Queue
Exchange *Exchange
}

type AmqpRoot struct {
Queue *Queue
Exchange *Exchange
}

func (amqp *AmqpRoot) NewModuleInstance(core modules.InstanceCore) modules.Instance {
return &Amqp{
InstanceCore: core,
Version: version,
Queue: amqp.Queue,
Exchange: amqp.Exchange,
}
}

// GetExports returns the exports of the metrics module
func (amqp *Amqp) GetExports() modules.Exports {
return modules.Exports{
Named: map[string]interface{}{
"start": amqp.Start,
"listen": amqp.Listen,
"publish": amqp.Publish,
"version": amqp.Version,
},
}
}

var _ modules.IsModuleV2 = &AmqpRoot{}

type AmqpOptions struct {
ConnectionUrl string
}
Expand Down Expand Up @@ -57,6 +87,8 @@ func (amqp *Amqp) Start(options AmqpOptions) error {
}

func (amqp *Amqp) Publish(options PublishOptions) error {
amqp.YieldRuntime()
defer amqp.GetRuntime()
ch, err := amqp.Connection.Channel()
if err != nil {
return err
Expand All @@ -76,6 +108,8 @@ func (amqp *Amqp) Publish(options PublishOptions) error {
}

func (amqp *Amqp) Listen(options ListenOptions) error {
amqp.YieldRuntime()
defer amqp.GetRuntime()
ch, err := amqp.Connection.Channel()
if err != nil {
return err
Expand All @@ -97,23 +131,20 @@ func (amqp *Amqp) Listen(options ListenOptions) error {

go func() {
for d := range msgs {
options.Listener(string(d.Body))
func() { // so we can use a defer in the loop
_, ret := amqp.GetRuntimeWithReturn()
defer ret()
options.Listener(string(d.Body))
}()
}
}()
return nil
}

func init() {

queue := Queue{}
exchange := Exchange{}
generalAmqp := Amqp{
Version: version,
Queue: &queue,
Exchange: &exchange,
}

modules.Register("k6/x/amqp", &generalAmqp)
modules.Register("k6/x/amqp/queue", &queue)
modules.Register("k6/x/amqp/exchange", &exchange)
queue := &Queue{}
exchange := &Exchange{}
modules.Register("k6/x/amqp", &AmqpRoot{Queue: queue, Exchange: exchange})
modules.Register("k6/x/amqp/queue", queue)
modules.Register("k6/x/amqp/exchange", exchange)
}
71 changes: 37 additions & 34 deletions examples/test.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,47 @@
import { sleep } from "k6";
import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
console.log("K6 amqp extension enabled, version: " + Amqp.version)
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})
console.log("Connection opened: " + url)

const queueName = 'K6 general'

Queue.declare({
name: queueName,
// durable: false,
// delete_when_unused: false,
// exclusive: false,
// no_wait: false,
// args: null
})
console.log("K6 amqp extension enabled, version: " + Amqp.version)
const url = "amqp://guest:guest@localhost:5672/"
Amqp.start({
connection_url: url
})
console.log("Connection opened: " + url)

console.log(queueName + " queue is ready")
const queueName = 'K6 general'

Amqp.publish({
queue_name: queueName,
body: "Ping from k6"
// exchange: '',
// mandatory: false,
// immediate: false,
})
Queue.declare({
name: queueName,
// durable: false,
// delete_when_unused: false,
// exclusive: false,
// no_wait: false,
// args: null
})

console.log(queueName + " queue is ready")

const listener = function(data) { console.log('received data: ' + data) }
Amqp.listen({
queue_name: queueName,
listener: listener,
auto_ack: true,
// consumer: '',
// exclusive: false,
Amqp.publish({
queue_name: queueName,
body: "Ping from k6 vu: " + __VU + ", iter:"+ __ITER,
// exchange: '',
// mandatory: false,
// immediate: false,
})

const listener = function(data) { console.log(`received data by VU ${__VU}, ITER ${__ITER}: ${data} `)}
Amqp.listen({
queue_name: queueName,
listener: listener,
auto_ack: true,
// consumer: '',
// exclusive: false,
// no_local: false,
// no_wait: false,
// args: null
})
// args: null
})

sleep(2);
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ go 1.15

require (
github.com/streadway/amqp v1.0.0
go.k6.io/k6 v0.33.0
go.k6.io/k6 v0.33.1-0.20210831133630-0a56f9b16ec8
)
Loading

0 comments on commit beb7109

Please sign in to comment.