From 0c3e4f6739eb2cbd52bee448e2de14c3419778e5 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Mon, 18 Feb 2019 13:47:30 +0000 Subject: [PATCH 01/10] Scheduler return metrics as dict --- src/rust/engine/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index a38af77768b..87ce1e85d1d 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -316,7 +316,9 @@ pub extern "C" fn scheduler_metrics( let values = scheduler .metrics(session) .into_iter() - .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) + .flat_map(|(metric, value)| { + vec![externs::store_utf8(metric), externs::store_i64(value)] + }) .collect::>(); externs::store_dict(&values).into() }) From 3cc083e38d776ec0a5cee094f774eb38dfc8dd51 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Tue, 19 Feb 2019 14:12:53 +0000 Subject: [PATCH 02/10] Add rust workunits to zipkin trace --- src/python/pants/goal/context.py | 4 ++- src/python/pants/reporting/report.py | 5 +++ src/python/pants/reporting/reporter.py | 4 +++ src/python/pants/reporting/zipkin_reporter.py | 31 +++++++++++++++++-- src/rust/engine/src/lib.rs | 21 ++++++++++++- src/rust/engine/src/scheduler.rs | 10 ++++++ 6 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/python/pants/goal/context.py b/src/python/pants/goal/context.py index d1f6beb29da..8040ef253bc 100644 --- a/src/python/pants/goal/context.py +++ b/src/python/pants/goal/context.py @@ -149,7 +149,9 @@ def executing(self): """A contextmanager that sets metrics in the context of a (v1) engine execution.""" self._set_target_root_count_in_runtracker() yield - self.run_tracker.pantsd_stats.set_scheduler_metrics(self._scheduler.metrics()) + metrics = self._scheduler.metrics() + self.run_tracker.pantsd_stats.set_scheduler_metrics(metrics) + self.run_tracker.report.bulk_record_workunits(metrics) self._set_affected_target_count_in_runtracker() def _set_target_root_count_in_runtracker(self): diff --git a/src/python/pants/reporting/report.py b/src/python/pants/reporting/report.py index 520991ac765..f613ead9f99 100644 --- a/src/python/pants/reporting/report.py +++ b/src/python/pants/reporting/report.py @@ -140,3 +140,8 @@ def _notify(self): if len(s) > 0: for reporter in self._reporters.values(): reporter.handle_output(workunit, label, s) + + def bulk_record_workunits(self, metrics): + with self._lock: + for reporter in self._reporters.values(): + reporter.bulk_record_workunits(metrics) diff --git a/src/python/pants/reporting/reporter.py b/src/python/pants/reporting/reporter.py index 5f135952ca5..eda80483da5 100644 --- a/src/python/pants/reporting/reporter.py +++ b/src/python/pants/reporting/reporter.py @@ -47,6 +47,10 @@ def end_workunit(self, workunit): """A workunit has finished.""" pass + def bulk_record_workunits(self, metrics): + """A collection of workunits from Rust (engine) part""" + pass + def handle_log(self, workunit, level, *msg_elements): """Handle a message logged by pants code. diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index 5272dfa5ffd..7effbb0c035 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -7,7 +7,7 @@ import logging import requests -from py_zipkin import Encoding +from py_zipkin import Encoding, storage from py_zipkin.transport import BaseTransportHandler from py_zipkin.util import generate_random_64bit_string from py_zipkin.zipkin import ZipkinAttrs, create_attrs_for_span, zipkin_span @@ -64,6 +64,7 @@ def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_ self.parent_id = parent_id self.sample_rate = float(sample_rate) self.endpoint = endpoint + self.span_storage = storage.default_span_storage() def start_workunit(self, workunit): """Implementation of Reporter callback.""" @@ -97,18 +98,19 @@ def start_workunit(self, workunit): ) self.trace_id = zipkin_attrs.trace_id - span = zipkin_span( service_name=service_name, span_name=workunit.name, transport_handler=self.handler, encoding=Encoding.V1_THRIFT, - zipkin_attrs=zipkin_attrs + zipkin_attrs=zipkin_attrs, + span_storage=self.span_storage, ) else: span = zipkin_span( service_name=service_name, span_name=workunit.name, + span_storage=self.span_storage, ) self._workunits_to_spans[workunit] = span span.start() @@ -129,3 +131,26 @@ def close(self): endpoint = self.endpoint.replace("/api/v1/spans", "") logger.debug("Zipkin trace may be located at this URL {}/traces/{}".format(endpoint, self.trace_id)) + + def bulk_record_workunits(self, metrics): + """A collection of workunits from Rust (engine) part""" + engine_workunits = metrics["engine_workunits"] + for workunit in engine_workunits: + duration = workunit['end_timestamp'] - workunit['start_timestamp'] + + span = zipkin_span( + service_name="pants", + span_name=workunit['name'], + duration=duration, + span_storage=self.span_storage, + ) + span.zipkin_attrs = ZipkinAttrs( + trace_id=self.trace_id, + span_id=workunit['span_id'], + parent_span_id=workunit.get("parent_id", None), + flags='0', # flags: stores flags header. Currently unused + is_sampled=True, + ) + span.start() + span.start_timestamp = workunit['start_timestamp'] + span.stop() diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 87ce1e85d1d..9e174add0a7 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -71,6 +71,7 @@ use fs::{GlobMatching, MemFS, PathStat}; use futures::Future; use hashing::Digest; use log::error; +use crate::scheduler::WorkUnit; // TODO: Consider renaming and making generic for collections of PyResults. #[repr(C)] @@ -313,13 +314,31 @@ pub extern "C" fn scheduler_metrics( ) -> Handle { with_scheduler(scheduler_ptr, |scheduler| { with_session(session_ptr, |session| { - let values = scheduler + let mut values = scheduler .metrics(session) .into_iter() .flat_map(|(metric, value)| { vec![externs::store_utf8(metric), externs::store_i64(value)] }) .collect::>(); + let mut workunits = session.workunits.lock(); + let start_time = std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap().as_secs() as f64; + let test_workunit = WorkUnit { + name: "test_workunit".to_string(), + start_timestamp: start_time, + end_timestamp: start_time + 1.0, + span_id: "2222222222222222".to_string(), + }; + workunits.push(test_workunit); + let workunits = workunits.iter().map(|workunit| { + let values = vec![externs::store_utf8("name"), externs::store_utf8(&workunit.name), + externs::store_utf8("start_timestamp"), externs::store_i64(workunit.start_timestamp as i64), + externs::store_utf8("end_timestamp"), externs::store_i64(workunit.end_timestamp as i64), + externs::store_utf8("span_id"), externs::store_utf8(&workunit.span_id)]; + externs::store_dict(&values) + }).collect::>(); + values.push(externs::store_utf8("engine_workunits")); + values.push(externs::store_tuple(&workunits)); externs::store_dict(&values).into() }) }) diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 8e2fc08c97e..1755bdc1eb2 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -33,6 +33,15 @@ pub struct Session { roots: Mutex>, // If enabled, the display that will render the progress of the V2 engine. display: Option>, + // A place to store info about workunits in rust part + pub workunits: Arc>>, +} + +pub struct WorkUnit { + pub name: String, + pub start_timestamp: f64, + pub end_timestamp: f64, + pub span_id: String, } impl Session { @@ -41,6 +50,7 @@ impl Session { preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), + workunits: Arc::new(Mutex::new(Vec::new())) } } From 74ca8e359422882a4733b048ae181992277c1158 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Tue, 19 Feb 2019 15:00:21 +0000 Subject: [PATCH 03/10] Add f64 type to externs --- src/rust/engine/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 9e174add0a7..e233e7010b1 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -332,8 +332,8 @@ pub extern "C" fn scheduler_metrics( workunits.push(test_workunit); let workunits = workunits.iter().map(|workunit| { let values = vec![externs::store_utf8("name"), externs::store_utf8(&workunit.name), - externs::store_utf8("start_timestamp"), externs::store_i64(workunit.start_timestamp as i64), - externs::store_utf8("end_timestamp"), externs::store_i64(workunit.end_timestamp as i64), + externs::store_utf8("start_timestamp"), externs::store_f64(workunit.start_timestamp), + externs::store_utf8("end_timestamp"), externs::store_f64(workunit.end_timestamp), externs::store_utf8("span_id"), externs::store_utf8(&workunit.span_id)]; externs::store_dict(&values) }).collect::>(); From bf3bc814f46f8ebe77b7b8bb69211532d20a7db3 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Wed, 6 Mar 2019 16:03:34 +0000 Subject: [PATCH 04/10] Add session with workunits to the context and flag zipkin_trace_v2 --- src/python/pants/bin/local_pants_runner.py | 8 +-- src/python/pants/engine/native.py | 4 +- src/python/pants/engine/scheduler.py | 6 ++- src/python/pants/init/engine_initializer.py | 4 +- src/python/pants/reporting/reporting.py | 2 + src/python/pants/reporting/zipkin_reporter.py | 7 ++- src/rust/engine/src/context.rs | 7 ++- src/rust/engine/src/lib.rs | 38 +++++++------- src/rust/engine/src/nodes.rs | 20 +++++++ src/rust/engine/src/scheduler.rs | 52 +++++++++++++++---- 10 files changed, 106 insertions(+), 42 deletions(-) diff --git a/src/python/pants/bin/local_pants_runner.py b/src/python/pants/bin/local_pants_runner.py index 8a7df0f4ec3..76a2203458e 100644 --- a/src/python/pants/bin/local_pants_runner.py +++ b/src/python/pants/bin/local_pants_runner.py @@ -40,7 +40,7 @@ def parse_options(args, env, setup_logging=False, options_bootstrapper=None): return options, build_config, options_bootstrapper @staticmethod - def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, global_options): + def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, options): if graph_session: return graph_session @@ -52,7 +52,9 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config, build_config ) - return graph_scheduler_helper.new_session(global_options.v2_ui) + v2_ui = options.for_global_scope().v2_ui + zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2 + return graph_scheduler_helper.new_session(zipkin_trace_v2, v2_ui) @staticmethod def _maybe_init_target_roots(target_roots, graph_session, options, build_root): @@ -106,7 +108,7 @@ def create(cls, exiter, args, env, target_roots=None, daemon_graph_session=None, daemon_graph_session, options_bootstrapper, build_config, - global_options + options ) target_roots = cls._maybe_init_target_roots( diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 59b09ca246e..350f97fd72c 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -683,8 +683,8 @@ def new_execution_request(self): self.lib.execution_request_create(), self.lib.execution_request_destroy) - def new_session(self, scheduler, should_render_ui, ui_worker_count): - return self.gc(self.lib.session_create(scheduler, should_render_ui, ui_worker_count), self.lib.session_destroy) + def new_session(self, scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count): + return self.gc(self.lib.session_create(scheduler, should_record_zipkin_spans, should_render_ui, ui_worker_count), self.lib.session_destroy) def new_scheduler(self, tasks, diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 4ee04a1f221..00fd1e098f1 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -354,9 +354,11 @@ def lease_files_in_graph(self): def garbage_collect_store(self): self._native.lib.garbage_collect_store(self._scheduler) - def new_session(self, v2_ui=False): + def new_session(self, zipkin_trace_v2, v2_ui=False): """Creates a new SchedulerSession for this Scheduler.""" - return SchedulerSession(self, self._native.new_session(self._scheduler, v2_ui, multiprocessing.cpu_count())) + return SchedulerSession(self, self._native.new_session( + self._scheduler, zipkin_trace_v2, v2_ui, multiprocessing.cpu_count()) + ) _PathGlobsAndRootCollection = Collection.of(PathGlobsAndRoot) diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index 19293d0281f..0ab2342d61b 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -142,8 +142,8 @@ def _tuplify(v): class LegacyGraphScheduler(datatype(['scheduler', 'symbol_table', 'goal_map'])): """A thin wrapper around a Scheduler configured with @rules for a symbol table.""" - def new_session(self, v2_ui=False): - session = self.scheduler.new_session(v2_ui) + def new_session(self, zipkin_trace_v2, v2_ui=False): + session = self.scheduler.new_session(zipkin_trace_v2, v2_ui) return LegacyGraphSession(session, self.symbol_table, self.goal_map) diff --git a/src/python/pants/reporting/reporting.py b/src/python/pants/reporting/reporting.py index 6d1860ee67c..88ca1a5faf5 100644 --- a/src/python/pants/reporting/reporting.py +++ b/src/python/pants/reporting/reporting.py @@ -62,6 +62,8 @@ def register_options(cls, register): 'or not set when running a Pants command.') register('--zipkin-sample-rate', advanced=True, default=100.0, help='Rate at which to sample Zipkin traces. Value 0.0 - 100.0.') + register('--zipkin-trace-v2', advanced=True, type=bool, default=False, + help='If enabled, the zipkin spans are tracked for v2 engine execution progress.') def initialize(self, run_tracker, all_options, start_time=None): """Initialize with the given RunTracker. diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index 7effbb0c035..8567300a704 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -97,6 +97,7 @@ def start_workunit(self, workunit): sample_rate=self.sample_rate, # Value between 0.0 and 100.0 ) self.trace_id = zipkin_attrs.trace_id + self.parent_id = zipkin_attrs.span_id # temporary code span = zipkin_span( service_name=service_name, @@ -146,8 +147,10 @@ def bulk_record_workunits(self, metrics): ) span.zipkin_attrs = ZipkinAttrs( trace_id=self.trace_id, - span_id=workunit['span_id'], - parent_span_id=workunit.get("parent_id", None), + span_id=generate_random_64bit_string(), + # span_id=workunit['span_id'], + parent_span_id=workunit.get("parent_id", self.parent_id), # TODO change it when we properly pass parent id + # parent_span_id=workunit.get("parent_id", None), flags='0', # flags: stores flags header. Currently unused is_sampled=True, ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 024f9845c3e..bca370fd020 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -15,9 +15,11 @@ use crate::core::{Failure, TypeId}; use crate::handles::maybe_drop_handles; use crate::nodes::{NodeKey, TryInto, WrappedNode}; use crate::rule_graph::RuleGraph; +use crate::scheduler::Session; use crate::tasks::Tasks; use crate::types::Types; use boxfuture::{BoxFuture, Boxable}; +use core::clone::Clone; use fs::{self, safe_create_dir_all_ioerror, PosixFS, ResettablePool, Store}; use graph::{EntryId, Graph, NodeContext}; use log::debug; @@ -227,13 +229,15 @@ impl Core { pub struct Context { pub entry_id: EntryId, pub core: Arc, + pub session: Session, } impl Context { - pub fn new(entry_id: EntryId, core: Arc) -> Context { + pub fn new(entry_id: EntryId, core: Arc, session: Session) -> Context { Context { entry_id: entry_id, core: core, + session: session, } } @@ -267,6 +271,7 @@ impl NodeContext for Context { Context { entry_id: entry_id, core: self.core.clone(), + session: self.session.clone(), } } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index e233e7010b1..5f6ab2844a8 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -71,7 +71,6 @@ use fs::{GlobMatching, MemFS, PathStat}; use futures::Future; use hashing::Digest; use log::error; -use crate::scheduler::WorkUnit; // TODO: Consider renaming and making generic for collections of PyResults. #[repr(C)] @@ -317,26 +316,25 @@ pub extern "C" fn scheduler_metrics( let mut values = scheduler .metrics(session) .into_iter() - .flat_map(|(metric, value)| { - vec![externs::store_utf8(metric), externs::store_i64(value)] + .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) + .collect::>(); + let workunits = session.get_workunits().lock(); + let workunits = workunits + .iter() + .map(|workunit| { + let values = vec![ + externs::store_utf8("name"), + externs::store_utf8(&workunit.name), + externs::store_utf8("start_timestamp"), + externs::store_f64(workunit.start_timestamp), + externs::store_utf8("end_timestamp"), + externs::store_f64(workunit.end_timestamp), + externs::store_utf8("span_id"), + externs::store_utf8(&workunit.span_id), + ]; + externs::store_dict(&values) }) .collect::>(); - let mut workunits = session.workunits.lock(); - let start_time = std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap().as_secs() as f64; - let test_workunit = WorkUnit { - name: "test_workunit".to_string(), - start_timestamp: start_time, - end_timestamp: start_time + 1.0, - span_id: "2222222222222222".to_string(), - }; - workunits.push(test_workunit); - let workunits = workunits.iter().map(|workunit| { - let values = vec![externs::store_utf8("name"), externs::store_utf8(&workunit.name), - externs::store_utf8("start_timestamp"), externs::store_f64(workunit.start_timestamp), - externs::store_utf8("end_timestamp"), externs::store_f64(workunit.end_timestamp), - externs::store_utf8("span_id"), externs::store_utf8(&workunit.span_id)]; - externs::store_dict(&values) - }).collect::>(); values.push(externs::store_utf8("engine_workunits")); values.push(externs::store_tuple(&workunits)); externs::store_dict(&values).into() @@ -551,12 +549,14 @@ pub extern "C" fn nodes_destroy(raw_nodes_ptr: *mut RawNodes) { #[no_mangle] pub extern "C" fn session_create( scheduler_ptr: *mut Scheduler, + should_record_zipkin_spans: bool, should_render_ui: bool, ui_worker_count: u64, ) -> *const Session { with_scheduler(scheduler_ptr, |scheduler| { Box::into_raw(Box::new(Session::new( scheduler, + should_record_zipkin_spans, should_render_ui, ui_worker_count as usize, ))) diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 77d7aebe408..f0ca65d2342 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -28,6 +28,7 @@ use fs::{ use hashing; use process_execution::{self, CommandRunner}; +use crate::scheduler::WorkUnit; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; pub type NodeFuture = BoxFuture; @@ -1039,6 +1040,11 @@ impl Node for NodeKey { type Error = Failure; fn run(self, context: Context) -> NodeFuture { + let start_timestamp = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as f64; + let context2 = context.clone(); match self { NodeKey::DigestFile(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::DownloadedFile(n) => n.run(context).map(|v| v.into()).to_boxed(), @@ -1049,6 +1055,20 @@ impl Node for NodeKey { NodeKey::Snapshot(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::Task(n) => n.run(context).map(|v| v.into()).to_boxed(), } + .inspect(move |_: &NodeResult| { + let end_timestamp = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as f64; + let workunit = WorkUnit { + name: "Node".to_string(), + start_timestamp: start_timestamp, + end_timestamp: end_timestamp, + span_id: "aaaaaaaaaaaaaaaa".to_string(), + }; + context2.session.add_workunit(workunit); + }) + .to_boxed() } fn digest(res: NodeResult) -> Option { diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 1755bdc1eb2..67490d610ce 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -26,7 +26,7 @@ use ui::EngineDisplay; /// Both Scheduler and Session are exposed to python and expected to be used by multiple threads, so /// they use internal mutability in order to avoid exposing locks to callers. /// -pub struct Session { +struct InnerSession { // The total size of the graph at Session-creation time. preceding_graph_size: usize, // The set of roots that have been requested within this session. @@ -34,9 +34,12 @@ pub struct Session { // If enabled, the display that will render the progress of the V2 engine. display: Option>, // A place to store info about workunits in rust part - pub workunits: Arc>>, + workunits: Mutex>, } +#[derive(Clone)] +pub struct Session(Arc); + pub struct WorkUnit { pub name: String, pub start_timestamp: f64, @@ -45,24 +48,46 @@ pub struct WorkUnit { } impl Session { - pub fn new(scheduler: &Scheduler, should_render_ui: bool, ui_worker_count: usize) -> Session { - Session { + pub fn new( + scheduler: &Scheduler, + _should_record_zipkin_spans: bool, + should_render_ui: bool, + ui_worker_count: usize, + ) -> Session { + let inner_session = InnerSession { preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), - workunits: Arc::new(Mutex::new(Vec::new())) - } + workunits: Mutex::new(Vec::new()), + }; + Session(Arc::new(inner_session)) } fn extend(&self, new_roots: &[Root]) { - let mut roots = self.roots.lock(); + let mut roots = self.0.roots.lock(); roots.extend(new_roots.iter().cloned()); } fn root_nodes(&self) -> Vec { - let roots = self.roots.lock(); + let roots = self.0.roots.lock(); roots.iter().map(|r| r.clone().into()).collect() } + + pub fn preceding_graph_size(&self) -> usize { + self.0.preceding_graph_size + } + + pub fn display(&self) -> &Option> { + &self.0.display + } + + pub fn get_workunits(&self) -> &Mutex> { + &self.0.workunits + } + + pub fn add_workunit(&self, workunit: WorkUnit) { + self.0.workunits.lock().push(workunit); + } } pub struct ExecutionRequest { @@ -178,7 +203,10 @@ impl Scheduler { .graph .reachable_digest_count(&session.root_nodes()) as i64, ); - m.insert("preceding_graph_size", session.preceding_graph_size as i64); + m.insert( + "preceding_graph_size", + session.preceding_graph_size() as i64, + ); m.insert("resulting_graph_size", self.core.graph.len() as i64); m } @@ -258,6 +286,7 @@ impl Scheduler { // individual Future in the join was (eventually) mapped into success. let context = RootContext { core: self.core.clone(), + session: session.clone(), }; let (sender, receiver) = mpsc::channel(); @@ -270,7 +299,7 @@ impl Scheduler { .collect(); // Lock the display for the remainder of the execution, and grab a reference to it. - let mut maybe_display = match &session.display { + let mut maybe_display = match &session.display() { &Some(ref d) => Some(d.lock()), &None => None, }; @@ -353,13 +382,14 @@ pub type RootResult = Result; #[derive(Clone)] struct RootContext { core: Arc, + session: Session, } impl NodeContext for RootContext { type Node = NodeKey; fn clone_for(&self, entry_id: EntryId) -> Context { - Context::new(entry_id, self.core.clone()) + Context::new(entry_id, self.core.clone(), self.session.clone()) } fn graph(&self) -> &Graph { From c90e51d176011d7b9fbb90e03593f3dc01f0d1e0 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Fri, 8 Mar 2019 15:50:43 +0000 Subject: [PATCH 05/10] Add flag for zipkin spans for v2 engine --- src/python/pants/engine/scheduler.py | 6 +++ src/python/pants/goal/context.py | 4 +- src/python/pants/reporting/report.py | 4 +- src/python/pants/reporting/reporter.py | 2 +- src/python/pants/reporting/zipkin_reporter.py | 8 ++-- src/rust/engine/src/lib.rs | 41 ++++++++++--------- src/rust/engine/src/nodes.rs | 22 ++++------ src/rust/engine/src/scheduler.rs | 29 ++++++++++--- 8 files changed, 68 insertions(+), 48 deletions(-) diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 00fd1e098f1..c17e8720bbe 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -446,6 +446,12 @@ def metrics(self): """Returns metrics for this SchedulerSession as a dict of metric name to metric value.""" return self._scheduler._metrics(self._session) + def metrics_have_engine_workunits(self): + return "engine_workunits" in self.metrics() + + def engine_workunits(self): + return self.metrics().get("engine_workunits") + def with_fork_context(self, func): return self._scheduler.with_fork_context(func) diff --git a/src/python/pants/goal/context.py b/src/python/pants/goal/context.py index 8040ef253bc..98532faa5d5 100644 --- a/src/python/pants/goal/context.py +++ b/src/python/pants/goal/context.py @@ -151,7 +151,9 @@ def executing(self): yield metrics = self._scheduler.metrics() self.run_tracker.pantsd_stats.set_scheduler_metrics(metrics) - self.run_tracker.report.bulk_record_workunits(metrics) + if self._scheduler.metrics_have_engine_workunits(): + engine_workunits = self._scheduler.engine_workunits() + self.run_tracker.report.bulk_record_workunits(engine_workunits) self._set_affected_target_count_in_runtracker() def _set_target_root_count_in_runtracker(self): diff --git a/src/python/pants/reporting/report.py b/src/python/pants/reporting/report.py index f613ead9f99..b981d444931 100644 --- a/src/python/pants/reporting/report.py +++ b/src/python/pants/reporting/report.py @@ -141,7 +141,7 @@ def _notify(self): for reporter in self._reporters.values(): reporter.handle_output(workunit, label, s) - def bulk_record_workunits(self, metrics): + def bulk_record_workunits(self, engine_workunits): with self._lock: for reporter in self._reporters.values(): - reporter.bulk_record_workunits(metrics) + reporter.bulk_record_workunits(engine_workunits) diff --git a/src/python/pants/reporting/reporter.py b/src/python/pants/reporting/reporter.py index eda80483da5..42c9fa4e2c3 100644 --- a/src/python/pants/reporting/reporter.py +++ b/src/python/pants/reporting/reporter.py @@ -47,7 +47,7 @@ def end_workunit(self, workunit): """A workunit has finished.""" pass - def bulk_record_workunits(self, metrics): + def bulk_record_workunits(self, engine_workunits): """A collection of workunits from Rust (engine) part""" pass diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index 8567300a704..6fd9eeebf08 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -133,9 +133,8 @@ def close(self): logger.debug("Zipkin trace may be located at this URL {}/traces/{}".format(endpoint, self.trace_id)) - def bulk_record_workunits(self, metrics): - """A collection of workunits from Rust (engine) part""" - engine_workunits = metrics["engine_workunits"] + def bulk_record_workunits(self, engine_workunits): + """A collection of workunits from v2 engine part""" for workunit in engine_workunits: duration = workunit['end_timestamp'] - workunit['start_timestamp'] @@ -147,8 +146,7 @@ def bulk_record_workunits(self, metrics): ) span.zipkin_attrs = ZipkinAttrs( trace_id=self.trace_id, - span_id=generate_random_64bit_string(), - # span_id=workunit['span_id'], + span_id=workunit['span_id'], parent_span_id=workunit.get("parent_id", self.parent_id), # TODO change it when we properly pass parent id # parent_span_id=workunit.get("parent_id", None), flags='0', # flags: stores flags header. Currently unused diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 5f6ab2844a8..d1a8b7e989a 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -318,25 +318,28 @@ pub extern "C" fn scheduler_metrics( .into_iter() .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) .collect::>(); - let workunits = session.get_workunits().lock(); - let workunits = workunits - .iter() - .map(|workunit| { - let values = vec![ - externs::store_utf8("name"), - externs::store_utf8(&workunit.name), - externs::store_utf8("start_timestamp"), - externs::store_f64(workunit.start_timestamp), - externs::store_utf8("end_timestamp"), - externs::store_f64(workunit.end_timestamp), - externs::store_utf8("span_id"), - externs::store_utf8(&workunit.span_id), - ]; - externs::store_dict(&values) - }) - .collect::>(); - values.push(externs::store_utf8("engine_workunits")); - values.push(externs::store_tuple(&workunits)); + let values_ref = &mut values; + let workunits_option = session.get_workunits(); + if let &Some(ref workunits_mutex) = workunits_option { + let workunits = workunits_mutex.lock() + .iter() + .map(|workunit| { + let workunit_zipkin_trace_info = vec![ + externs::store_utf8("name"), + externs::store_utf8(&workunit.name), + externs::store_utf8("start_timestamp"), + externs::store_f64(workunit.start_timestamp), + externs::store_utf8("end_timestamp"), + externs::store_f64(workunit.end_timestamp), + externs::store_utf8("span_id"), + externs::store_utf8(&workunit.span_id), + ]; + externs::store_dict(&workunit_zipkin_trace_info) + }) + .collect::>(); + values_ref.push(externs::store_utf8("engine_workunits")); + values_ref.push(externs::store_tuple(&workunits)); + } externs::store_dict(&values).into() }) }) diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index f0ca65d2342..11c99de44c1 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -28,7 +28,6 @@ use fs::{ use hashing; use process_execution::{self, CommandRunner}; -use crate::scheduler::WorkUnit; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; pub type NodeFuture = BoxFuture; @@ -1041,9 +1040,9 @@ impl Node for NodeKey { fn run(self, context: Context) -> NodeFuture { let start_timestamp = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as f64; + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as f64; let context2 = context.clone(); match self { NodeKey::DigestFile(n) => n.run(context).map(|v| v.into()).to_boxed(), @@ -1057,16 +1056,11 @@ impl Node for NodeKey { } .inspect(move |_: &NodeResult| { let end_timestamp = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as f64; - let workunit = WorkUnit { - name: "Node".to_string(), - start_timestamp: start_timestamp, - end_timestamp: end_timestamp, - span_id: "aaaaaaaaaaaaaaaa".to_string(), - }; - context2.session.add_workunit(workunit); + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as f64; + let node_name = "Node".to_string(); + context2.session.add_workunit(node_name, start_timestamp, end_timestamp); }) .to_boxed() } diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 67490d610ce..1e35e00f5a3 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -18,6 +18,8 @@ use indexmap::IndexMap; use log::{debug, info, warn}; use parking_lot::Mutex; use ui::EngineDisplay; +use rand::thread_rng; +use rand::Rng; /// /// A Session represents a related series of requests (generally: one run of the pants CLI) on an @@ -34,7 +36,7 @@ struct InnerSession { // If enabled, the display that will render the progress of the V2 engine. display: Option>, // A place to store info about workunits in rust part - workunits: Mutex>, + workunits: Option>>, } #[derive(Clone)] @@ -50,15 +52,16 @@ pub struct WorkUnit { impl Session { pub fn new( scheduler: &Scheduler, - _should_record_zipkin_spans: bool, + should_record_zipkin_spans: bool, should_render_ui: bool, ui_worker_count: usize, ) -> Session { + let workunits = if should_record_zipkin_spans {Some(Mutex::new(Vec::new()))} else {None}; let inner_session = InnerSession { preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), - workunits: Mutex::new(Vec::new()), + workunits: workunits, }; Session(Arc::new(inner_session)) } @@ -81,15 +84,29 @@ impl Session { &self.0.display } - pub fn get_workunits(&self) -> &Mutex> { + pub fn get_workunits(&self) -> &Option>> { &self.0.workunits } - pub fn add_workunit(&self, workunit: WorkUnit) { - self.0.workunits.lock().push(workunit); + pub fn add_workunit(&self, node_name: String, start_timestamp: f64, end_timestamp: f64) { + let w = &self.0.workunits; + if let &Some(ref x) = w { + let workunit = WorkUnit { + name: node_name, + start_timestamp: start_timestamp, + end_timestamp: end_timestamp, + span_id: generate_random_64bit_string(), + }; + x.lock().push(workunit)}; } } +fn generate_random_64bit_string() -> String { + let mut rng = thread_rng(); + let random_u64: u64 = rng.gen(); + format!("{:16.x}", random_u64) +} + pub struct ExecutionRequest { // Set of roots for an execution, in the order they were declared. pub roots: Vec, From 60fe1f89b68b02a91739adec5152d38fa2b917d8 Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Tue, 12 Mar 2019 18:25:28 +0000 Subject: [PATCH 06/10] Address comments --- src/python/pants/reporting/zipkin_reporter.py | 6 ++- src/rust/engine/src/lib.rs | 39 ++++++++------- src/rust/engine/src/nodes.rs | 49 +++++++++++++++---- src/rust/engine/src/scheduler.rs | 34 +++++-------- 4 files changed, 76 insertions(+), 52 deletions(-) diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index 6fd9eeebf08..c02f2be5dc7 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -147,8 +147,10 @@ def bulk_record_workunits(self, engine_workunits): span.zipkin_attrs = ZipkinAttrs( trace_id=self.trace_id, span_id=workunit['span_id'], - parent_span_id=workunit.get("parent_id", self.parent_id), # TODO change it when we properly pass parent id - # parent_span_id=workunit.get("parent_id", None), + # TODO change it when we properly pass parent_id to the v2 engine Nodes + # TODO Pass parent_id with ExecutionRequest when v2 engine is called by a workunit + # TODO pass parent_id when v2 engine Node is called by another v2 engine Node + parent_span_id=workunit.get("parent_id", self.parent_id), flags='0', # flags: stores flags header. Currently unused is_sampled=True, ) diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index d1a8b7e989a..85d8771a293 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -319,27 +319,28 @@ pub extern "C" fn scheduler_metrics( .flat_map(|(metric, value)| vec![externs::store_utf8(metric), externs::store_i64(value)]) .collect::>(); let values_ref = &mut values; - let workunits_option = session.get_workunits(); - if let &Some(ref workunits_mutex) = workunits_option { - let workunits = workunits_mutex.lock() - .iter() - .map(|workunit| { - let workunit_zipkin_trace_info = vec![ - externs::store_utf8("name"), - externs::store_utf8(&workunit.name), - externs::store_utf8("start_timestamp"), - externs::store_f64(workunit.start_timestamp), - externs::store_utf8("end_timestamp"), - externs::store_f64(workunit.end_timestamp), - externs::store_utf8("span_id"), - externs::store_utf8(&workunit.span_id), - ]; - externs::store_dict(&workunit_zipkin_trace_info) - }) - .collect::>(); + if session.should_record_zipkin_spans() { + let workunits = &session + .get_workunits() + .lock() + .iter() + .map(|workunit| { + let workunit_zipkin_trace_info = vec![ + externs::store_utf8("name"), + externs::store_utf8(&workunit.name), + externs::store_utf8("start_timestamp"), + externs::store_f64(workunit.start_timestamp), + externs::store_utf8("end_timestamp"), + externs::store_f64(workunit.end_timestamp), + externs::store_utf8("span_id"), + externs::store_utf8(&workunit.span_id), + ]; + externs::store_dict(&workunit_zipkin_trace_info) + }) + .collect::>(); values_ref.push(externs::store_utf8("engine_workunits")); values_ref.push(externs::store_tuple(&workunits)); - } + }; externs::store_dict(&values).into() }) }) diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 11c99de44c1..2806fc80231 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -28,7 +28,10 @@ use fs::{ use hashing; use process_execution::{self, CommandRunner}; +use crate::scheduler::WorkUnit; use graph::{Entry, Node, NodeError, NodeTracer, NodeVisualizer}; +use rand::thread_rng; +use rand::Rng; pub type NodeFuture = BoxFuture; @@ -1039,10 +1042,16 @@ impl Node for NodeKey { type Error = Failure; fn run(self, context: Context) -> NodeFuture { - let start_timestamp = std::time::SystemTime::now() + let node_name_and_start_timestamp = if context.session.should_record_zipkin_spans() { + let node_name = format!("{}", self); + let start_timestamp_duration = std::time::SystemTime::now() .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as f64; + .unwrap(); + let start_timestamp = duration_as_float_secs(&start_timestamp_duration); + Some((node_name, start_timestamp)) + } else { + None + }; let context2 = context.clone(); match self { NodeKey::DigestFile(n) => n.run(context).map(|v| v.into()).to_boxed(), @@ -1055,12 +1064,19 @@ impl Node for NodeKey { NodeKey::Task(n) => n.run(context).map(|v| v.into()).to_boxed(), } .inspect(move |_: &NodeResult| { - let end_timestamp = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as f64; - let node_name = "Node".to_string(); - context2.session.add_workunit(node_name, start_timestamp, end_timestamp); + if context2.session.should_record_zipkin_spans() { + let end_timestamp_duration = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap(); + let end_timestamp = duration_as_float_secs(&end_timestamp_duration); + let (node_name, start_timestamp) = node_name_and_start_timestamp.expect("If flag --reporting-zipkin-trace-v2 is enabled (should_record_zipkin_spans returns true) a node_name and a start_timestamp should be recorded before running a NodeKey."); + let workunit = WorkUnit { + name: node_name, + start_timestamp: start_timestamp, + end_timestamp: end_timestamp, + span_id: generate_random_64bit_string(), + }; + context2.session.add_workunit(workunit)}; }) .to_boxed() } @@ -1086,6 +1102,21 @@ impl Node for NodeKey { } } +fn duration_as_float_secs(duration: &Duration) -> f64 { + // Returning value is formed by representing duration as a hole number of seconds (u64) plus + // a hole number of microseconds (u32) turned into a f64 type. + // Reverting time from duration to f64 decrease precision. + let whole_secs_in_duration = duration.as_secs() as f64; + let fract_part_of_duration_in_micros = f64::from(duration.subsec_micros()); + whole_secs_in_duration + fract_part_of_duration_in_micros / 1_000_000.0 +} + +fn generate_random_64bit_string() -> String { + let mut rng = thread_rng(); + let random_u64: u64 = rng.gen(); + format!("{:16.x}", random_u64) +} + impl Display for NodeKey { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 1e35e00f5a3..8f185fa8d89 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -18,8 +18,6 @@ use indexmap::IndexMap; use log::{debug, info, warn}; use parking_lot::Mutex; use ui::EngineDisplay; -use rand::thread_rng; -use rand::Rng; /// /// A Session represents a related series of requests (generally: one run of the pants CLI) on an @@ -35,8 +33,10 @@ struct InnerSession { roots: Mutex>, // If enabled, the display that will render the progress of the V2 engine. display: Option>, + // If enabled, Zipkin spans for v2 engine will be collected. + should_record_zipkin_spans: bool, // A place to store info about workunits in rust part - workunits: Option>>, + workunits: Mutex>, } #[derive(Clone)] @@ -56,12 +56,12 @@ impl Session { should_render_ui: bool, ui_worker_count: usize, ) -> Session { - let workunits = if should_record_zipkin_spans {Some(Mutex::new(Vec::new()))} else {None}; let inner_session = InnerSession { preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashSet::new()), display: EngineDisplay::create(ui_worker_count, should_render_ui).map(Mutex::new), - workunits: workunits, + should_record_zipkin_spans: should_record_zipkin_spans, + workunits: Mutex::new(Vec::new()), }; Session(Arc::new(inner_session)) } @@ -84,27 +84,17 @@ impl Session { &self.0.display } - pub fn get_workunits(&self) -> &Option>> { - &self.0.workunits + pub fn should_record_zipkin_spans(&self) -> bool { + self.0.should_record_zipkin_spans } - pub fn add_workunit(&self, node_name: String, start_timestamp: f64, end_timestamp: f64) { - let w = &self.0.workunits; - if let &Some(ref x) = w { - let workunit = WorkUnit { - name: node_name, - start_timestamp: start_timestamp, - end_timestamp: end_timestamp, - span_id: generate_random_64bit_string(), - }; - x.lock().push(workunit)}; + pub fn get_workunits(&self) -> &Mutex> { + &self.0.workunits } -} -fn generate_random_64bit_string() -> String { - let mut rng = thread_rng(); - let random_u64: u64 = rng.gen(); - format!("{:16.x}", random_u64) + pub fn add_workunit(&self, workunit: WorkUnit) { + self.0.workunits.lock().push(workunit); + } } pub struct ExecutionRequest { From 934dcf76d150e306cca2bcde3f68ccf8f7f6753b Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Tue, 12 Mar 2019 19:28:12 +0000 Subject: [PATCH 07/10] Address comments --- src/python/pants/reporting/zipkin_reporter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/python/pants/reporting/zipkin_reporter.py b/src/python/pants/reporting/zipkin_reporter.py index c02f2be5dc7..557b338c44c 100644 --- a/src/python/pants/reporting/zipkin_reporter.py +++ b/src/python/pants/reporting/zipkin_reporter.py @@ -97,7 +97,10 @@ def start_workunit(self, workunit): sample_rate=self.sample_rate, # Value between 0.0 and 100.0 ) self.trace_id = zipkin_attrs.trace_id - self.parent_id = zipkin_attrs.span_id # temporary code + # TODO delete this line when parent_id will be passed in v2 engine: + # - with ExecutionRequest when Nodes from v2 engine are called by a workunit; + # - when a v2 engine Node is called by another v2 engine Node. + self.parent_id = zipkin_attrs.span_id span = zipkin_span( service_name=service_name, From 034319f5d65ce4fdca960b8cba38738f926c844d Mon Sep 17 00:00:00 2001 From: Ekaterina Tyurina Date: Fri, 15 Mar 2019 16:57:05 +0000 Subject: [PATCH 08/10] Add integration test --- src/python/pants/goal/context.py | 4 ++-- .../pants/pantsd/service/scheduler_service.py | 5 +++-- .../pants_test/engine/scheduler_test_base.py | 2 +- .../pants_test/engine/test_scheduler.py | 18 ++++++++++++++++ .../reporting/test_reporting_integration.py | 21 +++++++++++++++++++ tests/python/pants_test/test_base.py | 2 +- 6 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/python/pants/goal/context.py b/src/python/pants/goal/context.py index 98532faa5d5..690b4641c7d 100644 --- a/src/python/pants/goal/context.py +++ b/src/python/pants/goal/context.py @@ -151,8 +151,8 @@ def executing(self): yield metrics = self._scheduler.metrics() self.run_tracker.pantsd_stats.set_scheduler_metrics(metrics) - if self._scheduler.metrics_have_engine_workunits(): - engine_workunits = self._scheduler.engine_workunits() + engine_workunits = self._scheduler.engine_workunits() + if engine_workunits: self.run_tracker.report.bulk_record_workunits(engine_workunits) self._set_affected_target_count_in_runtracker() diff --git a/src/python/pants/pantsd/service/scheduler_service.py b/src/python/pants/pantsd/service/scheduler_service.py index 26b3c7e5e1f..8904d93dd0d 100644 --- a/src/python/pants/pantsd/service/scheduler_service.py +++ b/src/python/pants/pantsd/service/scheduler_service.py @@ -174,8 +174,9 @@ def prefork(self, options, options_bootstrapper): if graph_len > 0: self._logger.debug('graph len was {}, waiting for initial watchman event'.format(graph_len)) self._watchman_is_running.wait() - - session = self._graph_helper.new_session(options.for_global_scope().v2_ui) + v2_ui = options.for_global_scope().v2_ui + zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2 + session = self._graph_helper.new_session(zipkin_trace_v2, v2_ui) if options.for_global_scope().loop: return session, self._prefork_loop(session, options, options_bootstrapper) else: diff --git a/tests/python/pants_test/engine/scheduler_test_base.py b/tests/python/pants_test/engine/scheduler_test_base.py index 675bdec4aaa..11aa427380a 100644 --- a/tests/python/pants_test/engine/scheduler_test_base.py +++ b/tests/python/pants_test/engine/scheduler_test_base.py @@ -63,7 +63,7 @@ def mk_scheduler(self, union_rules, DEFAULT_EXECUTION_OPTIONS, include_trace_on_error=include_trace_on_error) - return scheduler.new_session() + return scheduler.new_session(zipkin_trace_v2=False) def context_with_scheduler(self, scheduler, *args, **kwargs): return self.context(*args, scheduler=scheduler, **kwargs) diff --git a/tests/python/pants_test/engine/test_scheduler.py b/tests/python/pants_test/engine/test_scheduler.py index b688ad83647..bda6281d076 100644 --- a/tests/python/pants_test/engine/test_scheduler.py +++ b/tests/python/pants_test/engine/test_scheduler.py @@ -214,9 +214,27 @@ def test_get_type_match_failure(self): self.scheduler.product_request(A, [Params(TypeCheckFailWrapper(A()))]) def test_trace_includes_rule_exception_traceback(self): +<<<<<<< Updated upstream # Execute a request that will trigger the nested raise, and then directly inspect its trace. request = self.scheduler.execution_request([A], [B()]) self.scheduler.execute(request) +======= + rules = [ + RootRule(B), + nested_raise, + ] + + scheduler = create_scheduler(rules) + request = scheduler._native.new_execution_request() + subject = B() + scheduler.add_root_selection(request, subject, A) + session = scheduler.new_session(zipkin_trace_v2=False) + scheduler._run_and_return_roots(session._session, request) + + trace = '\n'.join(scheduler.graph_trace(request)) + # NB removing location info to make trace repeatable + trace = remove_locations_from_traceback(trace) +>>>>>>> Stashed changes trace = remove_locations_from_traceback('\n'.join(self.scheduler.trace(request))) assert_equal_with_printing(self, dedent(''' diff --git a/tests/python/pants_test/reporting/test_reporting_integration.py b/tests/python/pants_test/reporting/test_reporting_integration.py index c09303aedf5..646107b1f7b 100644 --- a/tests/python/pants_test/reporting/test_reporting_integration.py +++ b/tests/python/pants_test/reporting/test_reporting_integration.py @@ -237,6 +237,27 @@ def test_zipkin_reporter_with_zero_sample_rate(self): num_of_traces = len(ZipkinHandler.traces) self.assertEqual(num_of_traces, 0) + def test_zipkin_reporter_for_v2_engine(self): + ZipkinHandler = zipkin_handler() + with http_server(ZipkinHandler) as port: + endpoint = "http://localhost:{}".format(port) + command = [ + '--reporting-zipkin-endpoint={}'.format(endpoint), + '--reporting-zipkin-trace-v2', + 'cloc', + 'src/python/pants:version' + ] + + pants_run = self.run_pants(command) + self.assert_success(pants_run) + + num_of_traces = len(ZipkinHandler.traces) + self.assertEqual(num_of_traces, 1) + + trace = ZipkinHandler.traces[-1] + self.assertTrue(any("Scandir" in span['name'] for span in trace), + "There is no span that contains 'Scandir' in it's name. The trace: {}".format(trace)) + @staticmethod def find_spans_by_name(trace, name): return [span for span in trace if span['name'] == name] diff --git a/tests/python/pants_test/test_base.py b/tests/python/pants_test/test_base.py index e892a2677fd..3d8ae1de2db 100644 --- a/tests/python/pants_test/test_base.py +++ b/tests/python/pants_test/test_base.py @@ -399,7 +399,7 @@ def _init_engine(cls): options_bootstrapper=OptionsBootstrapper.create(args=['--pants-config-files=[]']), build_configuration=cls.build_config(), build_ignore_patterns=None, - ).new_session() + ).new_session(zipkin_trace_v2=False) cls._scheduler = graph_session.scheduler_session cls._build_graph, cls._address_mapper = graph_session.create_build_graph( TargetRoots([]), cls._build_root() From e8210702ae96581277d9a083f528c658100f6fc7 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Sat, 16 Mar 2019 11:57:28 -0700 Subject: [PATCH 09/10] remove conflict markers --- .../python/pants_test/engine/test_scheduler.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/tests/python/pants_test/engine/test_scheduler.py b/tests/python/pants_test/engine/test_scheduler.py index bda6281d076..b688ad83647 100644 --- a/tests/python/pants_test/engine/test_scheduler.py +++ b/tests/python/pants_test/engine/test_scheduler.py @@ -214,27 +214,9 @@ def test_get_type_match_failure(self): self.scheduler.product_request(A, [Params(TypeCheckFailWrapper(A()))]) def test_trace_includes_rule_exception_traceback(self): -<<<<<<< Updated upstream # Execute a request that will trigger the nested raise, and then directly inspect its trace. request = self.scheduler.execution_request([A], [B()]) self.scheduler.execute(request) -======= - rules = [ - RootRule(B), - nested_raise, - ] - - scheduler = create_scheduler(rules) - request = scheduler._native.new_execution_request() - subject = B() - scheduler.add_root_selection(request, subject, A) - session = scheduler.new_session(zipkin_trace_v2=False) - scheduler._run_and_return_roots(session._session, request) - - trace = '\n'.join(scheduler.graph_trace(request)) - # NB removing location info to make trace repeatable - trace = remove_locations_from_traceback(trace) ->>>>>>> Stashed changes trace = remove_locations_from_traceback('\n'.join(self.scheduler.trace(request))) assert_equal_with_printing(self, dedent(''' From 6c891a895cf742bb10be4a42d240c45256d4902f Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Sat, 16 Mar 2019 22:32:57 -0700 Subject: [PATCH 10/10] switch usages of log to tokio trace --- src/rust/engine/Cargo.lock | 88 ++++++++++++++++++- src/rust/engine/Cargo.toml | 7 ++ src/rust/engine/fs/Cargo.toml | 5 +- src/rust/engine/fs/brfs/Cargo.toml | 5 +- src/rust/engine/fs/brfs/src/main.rs | 4 +- src/rust/engine/fs/src/glob_matching.rs | 1 - src/rust/engine/fs/src/lib.rs | 3 + src/rust/engine/fs/src/store.rs | 1 - src/rust/engine/process_execution/Cargo.toml | 5 +- src/rust/engine/process_execution/src/lib.rs | 3 + .../engine/process_execution/src/local.rs | 2 - .../engine/process_execution/src/remote.rs | 1 - src/rust/engine/src/context.rs | 1 - src/rust/engine/src/externs.rs | 70 +++++++++------ src/rust/engine/src/lib.rs | 4 +- src/rust/engine/src/scheduler.rs | 1 - 16 files changed, 156 insertions(+), 45 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index b5846dbfcdb..3bcd63eb730 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -171,13 +171,16 @@ dependencies = [ "futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)", "hashing 0.0.1", "libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "serverset 0.0.1", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "testutil 0.0.1", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513)", + "tokio-trace-futures 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-log 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-macros 0.1.0 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", ] [[package]] @@ -413,6 +416,10 @@ dependencies = [ "tar_api 0.0.1", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513)", + "tokio-trace-futures 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-log 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-macros 0.1.0 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", "ui 0.0.1", "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -524,7 +531,6 @@ dependencies = [ "itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "lmdb 0.8.0 (git+https://github.com/pantsbuild/lmdb-rs.git?rev=06bdfbfc6348f6804127176e561843f214fc17f8)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -534,6 +540,10 @@ dependencies = [ "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "testutil 0.0.1", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513)", + "tokio-trace-futures 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-log 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-macros 0.1.0 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", "uuid 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1269,7 +1279,6 @@ dependencies = [ "futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)", "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", "hashing 0.0.1", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "protobuf 2.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "resettable 0.0.1", @@ -1279,6 +1288,10 @@ dependencies = [ "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513)", + "tokio-trace-futures 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-log 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", + "tokio-trace-macros 0.1.0 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", ] [[package]] @@ -1365,7 +1378,7 @@ name = "protoc" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2186,6 +2199,66 @@ dependencies = [ "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-trace" +version = "0.0.1" +source = "git+https://github.com/tokio-rs/tokio#92d51202ef5e6b99004d77300de1aba0f8835513" +dependencies = [ + "tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-trace" +version = "0.0.1" +source = "git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513#92d51202ef5e6b99004d77300de1aba0f8835513" +dependencies = [ + "tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-trace-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-trace-futures" +version = "0.0.1" +source = "git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926#ee38e0030e3b249b09f0432f4d9140d15048a926" +dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio)", +] + +[[package]] +name = "tokio-trace-log" +version = "0.0.1" +source = "git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926#ee38e0030e3b249b09f0432f4d9140d15048a926" +dependencies = [ + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio)", + "tokio-trace-subscriber 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)", +] + +[[package]] +name = "tokio-trace-macros" +version = "0.1.0" +source = "git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926#ee38e0030e3b249b09f0432f4d9140d15048a926" +dependencies = [ + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio)", +] + +[[package]] +name = "tokio-trace-subscriber" +version = "0.0.1" +source = "git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926#ee38e0030e3b249b09f0432f4d9140d15048a926" +dependencies = [ + "tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio)", +] + [[package]] name = "tokio-udp" version = "0.1.3" @@ -2757,6 +2830,13 @@ dependencies = [ "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" "checksum tokio-threadpool 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "742e511f6ce2298aeb86fc9ea0d8df81c2388c6ebae3dc8a7316e8c9df0df801" "checksum tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2910970404ba6fa78c5539126a9ae2045d62e3713041e447f695f41405a120c6" +"checksum tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio)" = "" +"checksum tokio-trace 0.0.1 (git+https://github.com/tokio-rs/tokio?rev=92d51202ef5e6b99004d77300de1aba0f8835513)" = "" +"checksum tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "350c9edade9830dc185ae48ba45667a445ab59f6167ef6d0254ec9d2430d9dd3" +"checksum tokio-trace-futures 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)" = "" +"checksum tokio-trace-log 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)" = "" +"checksum tokio-trace-macros 0.1.0 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)" = "" +"checksum tokio-trace-subscriber 0.0.1 (git+https://github.com/tokio-rs/tokio-trace-nursery?rev=ee38e0030e3b249b09f0432f4d9140d15048a926)" = "" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index a31f00059fa..f34a6c6c488 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -95,7 +95,14 @@ reqwest = { version = "0.9.10", default_features = false, features = ["rustls-tl resettable = { path = "resettable" } smallvec = "0.6" tokio = "0.1" +tokio-trace = { git = "https://github.com/tokio-rs/tokio", rev = "92d51202ef5e6b99004d77300de1aba0f8835513" } +tokio-trace-log = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-futures = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-macros = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } tempfile = "3" ui = { path = "ui" } url = "1.7.1" tar_api = { path = "tar_api" } + +[patch.crates-io] +tokio-trace = { git = "https://github.com/tokio-rs/tokio" } diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index 1c1c800833d..a5705c225e4 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -23,7 +23,6 @@ indexmap = "1.0.2" itertools = "0.7.2" lazy_static = "1" lmdb = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "06bdfbfc6348f6804127176e561843f214fc17f8" } -log = "0.4" parking_lot = "0.6" protobuf = { version = "2.0.4", features = ["with-bytes"] } serverset = { path = "../serverset" } @@ -31,6 +30,10 @@ sha2 = "0.8" serde = "1.0" serde_derive = "1.0" tempfile = "3" +tokio-trace = { git = "https://github.com/tokio-rs/tokio", rev = "92d51202ef5e6b99004d77300de1aba0f8835513" } +tokio-trace-log = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-futures = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-macros = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } uuid = { version = "0.7.1", features = ["v4"] } [dev-dependencies] diff --git a/src/rust/engine/fs/brfs/Cargo.toml b/src/rust/engine/fs/brfs/Cargo.toml index 7fbe30da698..fabcdaff5c5 100644 --- a/src/rust/engine/fs/brfs/Cargo.toml +++ b/src/rust/engine/fs/brfs/Cargo.toml @@ -18,11 +18,14 @@ futures = "^0.1.16" futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" } hashing = { path = "../../hashing" } libc = "0.2.39" -log = "0.4.1" parking_lot = "0.6" protobuf = { version = "2.0.4", features = ["with-bytes"] } serverset = { path = "../../serverset" } time = "0.1.39" +tokio-trace = { git = "https://github.com/tokio-rs/tokio", rev = "92d51202ef5e6b99004d77300de1aba0f8835513" } +tokio-trace-log = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-futures = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-macros = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } [dev-dependencies] bytes = "0.4.5" diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index 76f48d87159..60d29724370 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -26,6 +26,9 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +#[macro_use] +extern crate tokio_trace; + use bazel_protos; use clap; use dirs; @@ -44,7 +47,6 @@ use time; use futures::future::Future; use hashing::{Digest, Fingerprint}; -use log::{debug, error, warn}; use parking_lot::Mutex; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; diff --git a/src/rust/engine/fs/src/glob_matching.rs b/src/rust/engine/fs/src/glob_matching.rs index 403017505bc..5570ff3cf18 100644 --- a/src/rust/engine/fs/src/glob_matching.rs +++ b/src/rust/engine/fs/src/glob_matching.rs @@ -11,7 +11,6 @@ use futures::future; use futures::Future; use glob::Pattern; use indexmap::{map::Entry::Occupied, IndexMap, IndexSet}; -use log::warn; use crate::{ Dir, GitignoreStyleExcludes, GlobExpansionConjunction, GlobParsedSource, GlobSource, diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 2a818ebc22e..bb21f52eb22 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -26,6 +26,9 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +#[macro_use] +extern crate tokio_trace; + mod glob_matching; pub use crate::glob_matching::GlobMatching; mod snapshot; diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index 4083da7de0b..5b8911a7e6e 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -691,7 +691,6 @@ mod local { self, Cursor, Database, DatabaseFlags, Environment, EnvironmentCopyFlags, EnvironmentFlags, RwTransaction, Transaction, WriteFlags, }; - use log::{debug, error}; use sha2::Sha256; use std; use std::collections::{BinaryHeap, HashMap}; diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 1a6361205a3..e660ae712e6 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -15,7 +15,6 @@ fs = { path = "../fs" } futures = "^0.1.16" grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec", "secure"] } hashing = { path = "../hashing" } -log = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } resettable = { path = "../resettable" } sha2 = "0.8" @@ -25,6 +24,10 @@ futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b time = "0.1.40" tokio-codec = "0.1" tokio-process = "0.2.1" +tokio-trace = { git = "https://github.com/tokio-rs/tokio", rev = "92d51202ef5e6b99004d77300de1aba0f8835513" } +tokio-trace-log = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-futures = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } +tokio-trace-macros = { git = "https://github.com/tokio-rs/tokio-trace-nursery", rev = "ee38e0030e3b249b09f0432f4d9140d15048a926" } [dev-dependencies] mock = { path = "../testutil/mock" } diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index e1445185315..cd2203714ce 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -26,6 +26,9 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +#[macro_use] +extern crate tokio_trace; + use boxfuture::BoxFuture; use bytes::Bytes; use std::collections::{BTreeMap, BTreeSet}; diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index e55d1ac668c..f9841fb2382 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -1,10 +1,8 @@ -use log; use tempfile; use boxfuture::{try_future, BoxFuture, Boxable}; use fs::{self, GlobExpansionConjunction, GlobMatching, PathGlobs, Snapshot, StrictGlobMatching}; use futures::{future, Future, Stream}; -use log::info; use std::collections::{BTreeSet, HashSet}; use std::ffi::OsStr; use std::fs::create_dir_all; diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 042fdf5f28b..85646958412 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -13,7 +13,6 @@ use futures::{future, Future, Stream}; use futures_timer::Delay; use grpcio; use hashing::{Digest, Fingerprint}; -use log::{debug, trace, warn}; use protobuf::{self, Message, ProtobufEnum}; use sha2::Sha256; use time; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index bca370fd020..dc0c36023be 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -22,7 +22,6 @@ use boxfuture::{BoxFuture, Boxable}; use core::clone::Clone; use fs::{self, safe_create_dir_all_ioerror, PosixFS, ResettablePool, Store}; use graph::{EntryId, Graph, NodeContext}; -use log::debug; use process_execution::{self, BoundedCommandRunner, CommandRunner}; use rand::seq::SliceRandom; use reqwest; diff --git a/src/rust/engine/src/externs.rs b/src/rust/engine/src/externs.rs index f34534bc01e..6e7cf08f0e3 100644 --- a/src/rust/engine/src/externs.rs +++ b/src/rust/engine/src/externs.rs @@ -13,9 +13,10 @@ use crate::core::{Failure, Function, Key, TypeId, Value}; use crate::handles::{DroppingHandle, Handle}; use crate::interning::Interns; use lazy_static::lazy_static; -use log; +use log::{Level, LevelFilter, Log, Metadata, Record}; use num_enum::CustomTryInto; use parking_lot::RwLock; +use tokio_trace_log::LogTracer; pub fn eval(python: &str) -> Result { with_externs(|e| (e.eval)(e.context, python.as_ptr(), python.len() as u64)).into() @@ -294,9 +295,7 @@ lazy_static! { // would need to be acquired for every single logging statement). // Please don't mutate it. // Please. -static mut LOGGER: FfiLogger = FfiLogger { - level_filter: log::LevelFilter::Off, -}; +static mut LOGGER: FfiLogger = FfiLogger { log_tracer: None }; /// /// Set the static Externs for this process. All other methods of this module will fail @@ -702,29 +701,29 @@ enum PythonLogLevel { Critical = 50, } -impl From for PythonLogLevel { - fn from(level: log::Level) -> Self { +impl From for PythonLogLevel { + fn from(level: Level) -> Self { match level { - log::Level::Error => PythonLogLevel::Error, - log::Level::Warn => PythonLogLevel::Warn, - log::Level::Info => PythonLogLevel::Info, - log::Level::Debug => PythonLogLevel::Debug, - log::Level::Trace => PythonLogLevel::Trace, + Level::Error => PythonLogLevel::Error, + Level::Warn => PythonLogLevel::Warn, + Level::Info => PythonLogLevel::Info, + Level::Debug => PythonLogLevel::Debug, + Level::Trace => PythonLogLevel::Trace, } } } -impl From for log::LevelFilter { +impl From for LevelFilter { fn from(level: PythonLogLevel) -> Self { match level { - PythonLogLevel::NotSet => log::LevelFilter::Off, - PythonLogLevel::Trace => log::LevelFilter::Trace, - PythonLogLevel::Debug => log::LevelFilter::Debug, - PythonLogLevel::Info => log::LevelFilter::Info, - PythonLogLevel::Warn => log::LevelFilter::Warn, - PythonLogLevel::Error => log::LevelFilter::Error, + PythonLogLevel::NotSet => LevelFilter::Off, + PythonLogLevel::Trace => LevelFilter::Trace, + PythonLogLevel::Debug => LevelFilter::Debug, + PythonLogLevel::Info => LevelFilter::Info, + PythonLogLevel::Warn => LevelFilter::Warn, + PythonLogLevel::Error => LevelFilter::Error, // Rust doesn't have a Critical, so treat them like Errors. - PythonLogLevel::Critical => log::LevelFilter::Error, + PythonLogLevel::Critical => LevelFilter::Error, } } } @@ -733,7 +732,7 @@ impl From for log::LevelFilter { /// FfiLogger is an implementation of log::Log which asks the Python logging system to log via cffi. /// struct FfiLogger { - level_filter: log::LevelFilter, + log_tracer: Option, } impl FfiLogger { @@ -741,28 +740,33 @@ impl FfiLogger { // If either of the above are violated, expect a panic. pub fn init(&'static mut self, max_level: u8) { let max_python_level = max_level.try_into_PythonLogLevel(); - self.level_filter = { + + let level_filter = { match max_python_level { Ok(python_level) => { - let level: log::LevelFilter = python_level.into(); + let level: LevelFilter = python_level.into(); level } Err(err) => panic!("Unrecognised log level from python: {}: {}", max_level, err), } }; + self.log_tracer = Some(LogTracer::with_filter(level_filter)); - log::set_max_level(self.level_filter); + log::set_max_level(level_filter); log::set_logger(self) .expect("Failed to set logger (maybe you tried to call init multiple times?)"); } } -impl log::Log for FfiLogger { - fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { - metadata.level() <= self.level_filter +impl Log for FfiLogger { + fn enabled(&self, metadata: &Metadata<'_>) -> bool { + match &self.log_tracer { + Some(tracer) => tracer.enabled(metadata), + None => false, + } } - fn log(&self, record: &log::Record<'_>) { + fn log(&self, record: &Record<'_>) { if !self.enabled(record.metadata()) { return; } @@ -775,8 +779,16 @@ impl log::Log for FfiLogger { message.as_ptr(), message.len() as u64, ) - }) + }); + /* TODO: extract the above python log from the trace! */ + if let Some(tracer) = &self.log_tracer { + tracer.log(record); + } } - fn flush(&self) {} + fn flush(&self) { + if let Some(tracer) = &self.log_tracer { + tracer.flush(); + } + } } diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 85d8771a293..a6ced2b99fc 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -30,6 +30,9 @@ // other unsafeness. #![allow(clippy::not_unsafe_ptr_arg_deref)] +#[macro_use] +extern crate tokio_trace; + pub mod cffi_externs; mod context; mod core; @@ -70,7 +73,6 @@ use crate::types::Types; use fs::{GlobMatching, MemFS, PathStat}; use futures::Future; use hashing::Digest; -use log::error; // TODO: Consider renaming and making generic for collections of PyResults. #[repr(C)] diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 8f185fa8d89..92540737189 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -15,7 +15,6 @@ use crate::nodes::{NodeKey, Select, Tracer, TryInto, Visualizer}; use crate::selectors; use graph::{EntryId, Graph, InvalidationResult, NodeContext}; use indexmap::IndexMap; -use log::{debug, info, warn}; use parking_lot::Mutex; use ui::EngineDisplay;