-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rs
117 lines (100 loc) · 3.38 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{
collections::HashMap,
env,
process::{Command, Stdio},
sync::Arc,
thread::sleep,
};
use opentelemetry::{
global::{self, shutdown_tracer_provider},
trace::TraceError,
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace as sdktrace, Resource};
use tracing::{instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{layer::SubscriberExt, Registry};
#[tokio::main]
async fn main() {
// print the current exec pid
println!("pid: {}", std::process::id());
let tracer = init_tracer().expect("Failed to initialize tracer.");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
global::set_text_map_propagator(TraceContextPropagator::new());
let subscriber = Registry::default().with(telemetry);
// tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
tracing::subscriber::with_default(subscriber, || {
shim_main();
});
shutdown_tracer_provider();
}
#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn a1() {
sleep(std::time::Duration::from_secs(1));
println!("a1");
}
#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn a2() {
sleep(std::time::Duration::from_secs(2));
println!("a2");
}
#[instrument]
fn shim_main() {
let os_args: Vec<_> = env::args().collect();
match os_args.get(1) {
Some(arg) if arg == "1" => {
a1();
spawn();
}
Some(arg) if arg == "2" => {
if let Some(trace_context) = os_args.get(2) {
let extractor: HashMap<String, String> =
serde_json::from_str(trace_context).unwrap();
let context =
global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
Span::current().set_parent(context);
}
a2();
sleep(std::time::Duration::from_secs(3));
}
_ => println!("Usage: {} <1|2>", os_args[0]),
}
}
#[instrument]
fn spawn() {
let cmd = env::current_exe().unwrap();
let cwd = env::current_dir().unwrap();
let mut command = Command::new(cmd);
// propogate the context
let mut injector: HashMap<String, String> = HashMap::new();
global::get_text_map_propagator(|propagator| {
// retrieve the context from `tracing`
propagator.inject_context(&Span::current().context(), &mut injector);
});
let trace_context = serde_json::to_string(&injector).unwrap();
println!("trace_context: {}", trace_context);
command.current_dir(cwd).args(["2"]).arg(trace_context);
command
.stdout(Stdio::null())
.stdin(Stdio::null())
.stderr(Stdio::null());
command.spawn().unwrap();
}
fn init_tracer() -> Result<opentelemetry_sdk::trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317"),
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"instance3",
)])),
)
// .install_batch(runtime::Tokio)
.install_simple()
}