Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update code #23

Merged
merged 5 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
include: package:pedantic/analysis_options.yaml
linter:
rules:
- avoid_null_checks_in_equality_operators
- avoid_returning_null_for_future
- avoid_returning_null_for_void
- prefer_conditional_assignment
- prefer_const_constructors
- prefer_null_aware_operators
- prefer_void_to_null
24 changes: 10 additions & 14 deletions example/hello/receive.dart
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main() {
void main() async {
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
ProcessSignal.sigint.watch().listen((_) async {
await client.close();
exit(0);
});

client
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [*] Waiting for messages. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] Received ${message.payloadAsString}");
});
Channel channel = await client.channel();
Queue queue = await channel.queue("hello");
Consumer consumer = await queue.consume();
print(" [*] Waiting for messages. To exit, press CTRL+C");
consumer.listen((message) {
print(" [x] Received ${message.payloadAsString}");
});
}
15 changes: 6 additions & 9 deletions example/hello/send.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import "package:dart_amqp/dart_amqp.dart";

void main() {
void main() async {
Client client = Client();
client
.channel()
.then((Channel channel) => channel.queue("hello"))
.then((Queue queue) {
queue.publish("Hello World!");
print(" [x] Sent 'Hello World!'");
return client.close();
});
Channel channel = await client.channel();
Queue queue = await channel.queue("hello");
queue.publish("Hello World!");
print(" [x] Sent 'Hello World!'");
await client.close();
}
20 changes: 9 additions & 11 deletions example/pubsub/emit_log.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
void main(List<String> args) async {
Client client = Client();
client
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) {
String message = args.join(' ');
// We dont care about the routing key as our exchange type is FANOUT
exchange.publish(message, null);
print(" [x] Sent ${message}");
return client.close();
});
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("logs", ExchangeType.FANOUT);

String message = args.join(' ');
// We dont care about the routing key as our exchange type is FANOUT
exchange.publish(message, null);
print(" [x] Sent ${message}");
await client.close();
}
26 changes: 11 additions & 15 deletions example/pubsub/receive_logs.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main() {
void main() async {
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
ProcessSignal.sigint.watch().listen((_) async {
await client.close();
exit(0);
});

client
.channel()
.then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(null))
.then((Consumer consumer) {
print(
" [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(" [x] ${message.payloadAsString}");
});
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("logs", ExchangeType.FANOUT);
Consumer consumer = await exchange.bindPrivateQueueConsumer(null);
print(
" [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((message) {
print(" [x] ${message.payloadAsString}");
});
}
24 changes: 11 additions & 13 deletions example/routing/emit_log_direct.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
if (args.length < 2 || ["info", "warning", "error"].indexOf(args[0]) == -1) {
void main(List<String> args) async {
if (args.length < 2 || !["info", "warning", "error"].contains(args[0])) {
print("""
Error: invalid arguments. Please invoke as:

Expand All @@ -18,15 +18,13 @@ void main(List<String> args) {
String severity = args.first;

Client client = Client();
client
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) {
String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, severity);
print(" [x] Sent [${severity}] ${message}");
return client.close();
});
Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("direct_logs", ExchangeType.DIRECT);

String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, severity);
print(" [x] Sent [${severity}] ${message}");
await client.close();
}
33 changes: 14 additions & 19 deletions example/routing/receive_logs_direct.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
if (args.isEmpty ||
!args.every(
(String arg) => ["info", "warning", "error"].indexOf(arg) != -1)) {
void main(List<String> args) async {
if (args.isEmpty || !args.every(["info", "warning", "error"].contains)) {
print("""
Error: invalid arguments. Please invoke as:

Expand All @@ -23,23 +21,20 @@ void main(List<String> args) {
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
ProcessSignal.sigint.watch().listen((_) async {
await client.close();
exit(0);
});

client
.channel()
.then((Channel channel) =>
channel.exchange("direct_logs", ExchangeType.DIRECT))
.then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args))
.then((Consumer consumer) {
Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("direct_logs", ExchangeType.DIRECT);
Consumer consumer = await exchange.bindPrivateQueueConsumer(args);

print(
" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((message) {
print(
" [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C");
consumer.listen((AmqpMessage message) {
print(
" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});
" [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}");
});
}
62 changes: 28 additions & 34 deletions example/rpc/rpc_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,18 @@ class FibonacciRpcClient {
String _replyQueueName;

FibonacciRpcClient() : client = Client() {
client
.channel()
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue rpcQueue) {
_serverQueue = rpcQueue;

// Allocate a private queue for server responses
return rpcQueue.channel.privateQueue();
})
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
_replyQueueName = consumer.queue.name;
consumer.listen(handleResponse);
connected.complete();
});
_init();
}

Future<void> _init() async {
Channel channel = await client.channel();
_serverQueue = await channel.queue("rpc_queue");
// Allocate a private queue for server responses
Queue queue = await _serverQueue.channel.privateQueue();
Consumer consumer = await queue.consume();
_replyQueueName = consumer.queue.name;
consumer.listen(handleResponse);
connected.complete();
}

void handleResponse(AmqpMessage message) {
Expand All @@ -39,22 +36,22 @@ class FibonacciRpcClient {
.complete(int.parse(message.payloadAsString));
}

Future<int> call(int n) {
Future<int> call(int n) async {
// Make sure we are connected before sending the request
return connected.future.then((_) {
String uuid = "${_nextCorrelationId++}";
Completer<int> completer = Completer<int>();
await connected.future;

String uuid = "${_nextCorrelationId++}";
Completer<int> completer = Completer<int>();

MessageProperties properties = MessageProperties()
..replyTo = _replyQueueName
..corellationId = uuid;
MessageProperties properties = MessageProperties()
..replyTo = _replyQueueName
..corellationId = uuid;

_pendingOperations[uuid] = completer;
_pendingOperations[uuid] = completer;

_serverQueue.publish({"n": n}, properties: properties);
_serverQueue.publish({"n": n}, properties: properties);

return completer.future;
});
return completer.future;
}

Future close() {
Expand All @@ -67,17 +64,14 @@ class FibonacciRpcClient {
}
}

main(List<String> args) {
main(List<String> args) async {
FibonacciRpcClient client = FibonacciRpcClient();

int n = args.isEmpty ? 30 : num.parse(args[0]);

// Make 10 parallel calls and get fib(1) to fib(10)
client
.call(n)
.then((int res) {
print(" [x] fib(${n}) = ${res}");
})
.then((_) => client.close())
.then((_) => exit(0));
int res = await client.call(n);
print(" [x] fib(${n}) = ${res}");
await client.close();
exit(0);
}
30 changes: 13 additions & 17 deletions example/rpc/rpc_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,23 @@ int fib(int n) {
return fib(n - 1) + fib(n - 2);
}

void main(List<String> args) {
void main(List<String> args) async {
Client client = Client();

// Setup a signal handler to cleanly exit if CTRL+C is pressed
ProcessSignal.sigint.watch().listen((_) {
client.close().then((_) {
exit(0);
});
ProcessSignal.sigint.watch().listen((_) async {
await client.close();
exit(0);
});

client
.channel()
.then((Channel channel) => channel.qos(0, 1))
.then((Channel channel) => channel.queue("rpc_queue"))
.then((Queue queue) => queue.consume())
.then((Consumer consumer) {
print(" [x] Awaiting RPC request");
consumer.listen((AmqpMessage message) {
int n = message.payloadAsJson["n"];
print(" [.] fib(${n})");
message.reply(fib(n).toString());
});
Channel channel = await client.channel();
channel = await channel.qos(0, 1);
Queue queue = await channel.queue("rpc_queue");
Consumer consumer = await queue.consume();
print(" [x] Awaiting RPC request");
consumer.listen((message) {
int n = message.payloadAsJson["n"];
print(" [.] fib(${n})");
message.reply(fib(n).toString());
});
}
20 changes: 8 additions & 12 deletions example/topics/emit_log_topic.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import "dart:io";
import "package:dart_amqp/dart_amqp.dart";

void main(List<String> args) {
void main(List<String> args) async {
if (args.length < 2) {
print("""
Error: invalid arguments. Please invoke as:
Expand All @@ -18,15 +18,11 @@ void main(List<String> args) {
String routingKey = args.first;

Client client = Client();
client
.channel()
.then((Channel channel) =>
channel.exchange("topic_logs", ExchangeType.TOPIC))
.then((Exchange exchange) {
String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, routingKey);
print(" [x] Sent [${routingKey}] ${message}");
return client.close();
});
Channel channel = await client.channel();
Exchange exchange = await channel.exchange("topic_logs", ExchangeType.TOPIC);
String message = args.sublist(1).join(' ');
// Use 'severity' as our routing key
exchange.publish(message, routingKey);
print(" [x] Sent [${routingKey}] ${message}");
await client.close();
}
Loading