-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdo.nu
96 lines (78 loc) · 2.45 KB
/
do.nu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# This file is for my own development workflow. It is not intended to be part of the demonstration content of the rest
# of the repository.
export def "start-kafka" [] {
cd $env.DO_DIR
./scripts/start-kafka.sh
}
export def "create-topics" [] {
cd $env.DO_DIR
./scripts/create-topics.sh
}
def compute_options [] {
[in-process-compute remote-compute]
}
def algorithm_options [] {
[sequential concurrent-across-partitions-within-same-poll concurrent-across-partitions concurrent-across-keys concurrent-across-keys-with-coroutines]
}
def build-and-run [...args: string] {
cd $env.DO_DIR
./gradlew runner:installDist --quiet
./runner/build/install/runner/bin/runner ...$args
}
export def "run" [compute: string@compute_options algorithm: string@algorithm_options] {
build-and-run standalone $"($compute):($algorithm)"
}
def test_options [] {
[one-message multi-message all-in-one]
}
export def "test" [case : string@test_options] {
build-and-run $"test-($case)"
}
export def "stop-kafka" [] {
cd $env.DO_DIR
./scripts/stop-kafka.sh
}
export def "topic-offsets" [] {
cd $env.DO_DIR
kafka-run-class org.apache.kafka.tools.GetOffsetShell --broker-list localhost:9092
}
export def "watch-all-consumer-groups" [] {
while true {
sleep 1sec
date now | format date %T | print $in
kafka-consumer-groups --bootstrap-server localhost:9092 --list --state
print ""
}
}
export def "watch-consumer-group" [] {
while true {
sleep 1sec
date now | format date %T | print --no-newline $in
# This command shows the offsets.
kafka-consumer-groups --bootstrap-server localhost:9092 --group app --describe
print ""
print ""
}
}
export def "reset-kafka" [] {
stop-kafka
start-kafka
create-topics
}
export def "observe-input-topic" [] {
# Unfortunately, I'm having trouble using Nushell here because I think there's some extra buffering. Instead, let's
# shell out to Bash..
bash -c 'kcat -CJu -o end -b localhost:9092 -t input | jq --unbuffered'
}
export def "observe-output-topic" [] {
bash -c 'kcat -CJu -o end -b localhost:9092 -t output | jq --unbuffered'
}
export def "describe-topics" [] {
kafka-topics --bootstrap-server localhost:9092 --describe
}
def load_options [] {
[batch batch-uneven steady all-in-one scratch]
}
export def load [load_option: string@load_options] {
build-and-run $"load-($load_option)"
}