Skip to content

Commit

Permalink
Revert "Add new EngineAware method metadata() (#11030)" (#11047)
Browse files Browse the repository at this point in the history
This commit introduced the `cpython` dependency for `workunit_store`, which has caused some CI issues. Revert it for now until a solution can be found that doesn't introduce this dependency.
  • Loading branch information
gshuflin authored Oct 26, 2020
1 parent db64830 commit 5c1aaa0
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 190 deletions.
8 changes: 1 addition & 7 deletions src/python/pants/engine/engine_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from abc import ABC
from typing import Any, Dict, Optional
from typing import Dict, Optional

from pants.engine.fs import Snapshot
from pants.util.logging import LogLevel
Expand Down Expand Up @@ -56,9 +56,3 @@ def artifacts(self) -> Optional[Dict[str, Snapshot]]:
`artifacts` is a mapping of arbitrary string keys to `Snapshot`s.
"""
return None

def metadata(self) -> Optional[Dict[str, Any]]:
"""If implemented, adds arbitrary key-value pairs to the `metadata` entry of the `@rule`
workunit."""

return None
73 changes: 0 additions & 73 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,79 +571,6 @@ def a_rule(n: int) -> Output:
artifacts = workunit["artifacts"]
assert artifacts["some_arbitrary_key"] == EMPTY_SNAPSHOT

def test_metadata_on_engine_aware_type(self) -> None:
@dataclass(frozen=True)
class Output(EngineAwareReturnType):
val: int

def metadata(self):
return {"k1": 1, "k2": "a string", "k3": [1, 2, 3]}

@rule(desc="a_rule")
def a_rule(n: int) -> Output:
return Output(val=n)

rules = [a_rule, QueryRule(Output, (int,))]
scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)

tracker = WorkunitTracker()
handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.TRACE,
)
with handler.session():
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
workunit = next(
item for item in finished if item["name"] == "pants.engine.internals.engine_test.a_rule"
)

metadata = workunit["metadata"]
assert metadata == {"k1": 1, "k2": "a string", "k3": [1, 2, 3]}

def test_metadata_non_string_key_behavior(self) -> None:
# If someone passes a non-string key in a metadata() method,
# this should fail to produce a meaningful metadata entry on
# the workunit (with a warning), but not fail.

@dataclass(frozen=True)
class Output(EngineAwareReturnType):
val: int

def metadata(self):
return {10: "foo", "other_key": "other value"}

@rule(desc="a_rule")
def a_rule(n: int) -> Output:
return Output(val=n)

rules = [a_rule, QueryRule(Output, (int,))]
scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)

tracker = WorkunitTracker()
handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.TRACE,
)
with handler.session():
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
workunit = next(
item for item in finished if item["name"] == "pants.engine.internals.engine_test.a_rule"
)

assert workunit["metadata"] == {}


@dataclass(frozen=True)
class ComplicatedInput:
Expand Down
1 change: 0 additions & 1 deletion src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 55 additions & 27 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use store::Store;
use tempfile::TempDir;
use testutil::data::{TestData, TestDirectory, TestTree};
use testutil::{owned_string_vec, relative_paths};
use workunit_store::{WorkunitState, WorkunitStore};
use workunit_store::{Level, SpanId, Workunit, WorkunitMetadata, WorkunitState, WorkunitStore};

use crate::remote::{digest, CommandRunner, ExecutionError, OperationOrStatus};
use crate::{
Expand Down Expand Up @@ -1830,51 +1830,64 @@ async fn remote_workunits_are_stored() {
.await
.unwrap();

let got_workunit_items: HashSet<(String, WorkunitState)> =
workunit_store.with_latest_workunits(log::Level::Trace, |_, completed| {
completed
.iter()
.map(|workunit| (workunit.name.clone(), workunit.state.clone()))
.collect()
});
let span_id = SpanId::new();
let got_workunits = workunits_with_constant_span_id(&mut workunit_store, span_id);

use concrete_time::Duration;
use concrete_time::TimeSpan;

let wanted_workunit_items = hashset! {
(String::from("remote execution action scheduling"),
WorkunitState::Completed {
time_span: TimeSpan {
start: Duration::new(0, 0),
duration: Duration::new(1, 0),
}
let want_workunits = hashset! {
Workunit {
name: String::from("remote execution action scheduling"),
state: WorkunitState::Completed {
time_span: TimeSpan {
start: Duration::new(0, 0),
duration: Duration::new(1, 0),
}
},
span_id,
parent_id: None,
metadata: WorkunitMetadata::with_level(Level::Debug),
},
),
(String::from("remote execution worker input fetching"),
WorkunitState::Completed {
Workunit {
name: String::from("remote execution worker input fetching"),
state: WorkunitState::Completed {
time_span: TimeSpan {
start: Duration::new(2, 0),
duration: Duration::new(1, 0),
}
}),
(String::from("remote execution worker command executing"),
WorkunitState::Completed {
},
span_id,
parent_id: None,
metadata: WorkunitMetadata::with_level(Level::Debug),
},
Workunit {
name: String::from("remote execution worker command executing"),
state: WorkunitState::Completed {
time_span: TimeSpan {
start: Duration::new(4, 0),
duration: Duration::new(1, 0),
}
}),
(String::from("remote execution worker output uploading"),
WorkunitState::Completed {
},
span_id,
parent_id: None,
metadata: WorkunitMetadata::with_level(Level::Debug),
},
Workunit {
name: String::from("remote execution worker output uploading"),
state: WorkunitState::Completed {
time_span: TimeSpan {
start: Duration::new(6, 0),
duration: Duration::new(1, 0),
}
}),

},
span_id,
parent_id: None,
metadata: WorkunitMetadata::with_level(Level::Debug),
}
};

assert!(got_workunit_items.is_superset(&wanted_workunit_items));
assert!(got_workunits.is_superset(&want_workunits));
}

#[tokio::test]
Expand Down Expand Up @@ -2104,6 +2117,21 @@ async fn extract_output_files_from_response_no_prefix() {
)
}

pub(crate) fn workunits_with_constant_span_id(
workunit_store: &mut WorkunitStore,
span_id: SpanId,
) -> HashSet<Workunit> {
workunit_store.with_latest_workunits(log::Level::Trace, |_, completed_workunits| {
completed_workunits
.iter()
.map(|workunit| Workunit {
span_id,
..workunit.clone()
})
.collect()
})
}

pub fn echo_foo_request() -> MultiPlatformProcess {
let mut req = Process::new(owned_string_vec(&["/bin/echo", "-n", "foo"]));
req.timeout = Some(Duration::from_millis(5000));
Expand Down
8 changes: 0 additions & 8 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,6 @@ impl Value {
Err(arc_handle) => arc_handle.clone_ref(py),
}
}

pub fn new_from_arc(handle: Arc<PyObject>) -> Value {
Value(handle)
}

pub fn consume_into_arc(self) -> Arc<PyObject> {
self.0
}
}

impl PartialEq for Value {
Expand Down
40 changes: 0 additions & 40 deletions src/rust/engine/src/externs/engine_aware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,6 @@ impl EngineAwareInformation for Message {
}
}

pub struct Metadata {}

impl EngineAwareInformation for Metadata {
type MaybeOutput = Vec<(String, Value)>;

fn retrieve(_types: &Types, value: &Value) -> Option<Self::MaybeOutput> {
let metadata_val = match externs::call_method(&value, "metadata", &[]) {
Ok(value) => value,
Err(py_err) => {
let failure = Failure::from_py_err(py_err);
log::error!("Error calling `metadata` method: {}", failure);
return None;
}
};

let metadata_val = externs::check_for_python_none(metadata_val)?;
let gil = Python::acquire_gil();
let py = gil.python();

let mut output = Vec::new();
let metadata_dict: &PyDict = metadata_val.cast_as::<PyDict>(py).ok()?;

for (key, value) in metadata_dict.items(py).into_iter() {
let key_name: String = match key.extract(py) {
Ok(s) => s,
Err(e) => {
log::error!(
"Error in EngineAware.metadata() implementation - non-string key: {:?}",
e
);
return None;
}
};

output.push((key_name, Value::from(value)));
}
Some(output)
}
}

pub struct Artifacts {}

impl EngineAwareInformation for Artifacts {
Expand Down
13 changes: 0 additions & 13 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,19 +890,6 @@ async fn workunit_to_py_value(workunit: &Workunit, core: &Arc<Core>) -> CPyResul
))
}

let mut user_metadata_entries = Vec::new();
for (user_metadata_key, value_arc) in workunit.metadata.user_metadata.iter() {
user_metadata_entries.push((
externs::store_utf8(user_metadata_key.as_str()),
Value::new_from_arc(value_arc.clone()),
));
}

dict_entries.push((
externs::store_utf8("metadata"),
externs::store_dict(user_metadata_entries)?,
));

if let Some(stdout_digest) = &workunit.metadata.stdout.as_ref() {
artifact_entries.push((
externs::store_utf8("stdout_digest"),
Expand Down
17 changes: 2 additions & 15 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,6 @@ pub struct PythonRuleOutput {
new_level: Option<log::Level>,
message: Option<String>,
new_artifacts: Vec<(String, hashing::Digest)>,
new_metadata: Vec<(String, Value)>,
}

#[async_trait]
Expand Down Expand Up @@ -1035,24 +1034,21 @@ impl WrappedNode for Task {
}

if result_type == product {
let (new_level, message, new_artifacts, new_metadata) = if can_modify_workunit {
let (new_level, message, new_artifacts) = if can_modify_workunit {
(
engine_aware::EngineAwareLevel::retrieve(&context.core.types, &result_val),
engine_aware::Message::retrieve(&context.core.types, &result_val),
engine_aware::Artifacts::retrieve(&context.core.types, &result_val)
.unwrap_or_else(Vec::new),
engine_aware::Metadata::retrieve(&context.core.types, &result_val)
.unwrap_or_else(Vec::new),
)
} else {
(None, None, Vec::new(), Vec::new())
(None, None, Vec::new())
};
Ok(PythonRuleOutput {
value: result_val,
new_level,
message,
new_artifacts,
new_metadata,
})
} else {
Err(throw(&format!(
Expand Down Expand Up @@ -1262,7 +1258,6 @@ impl Node for NodeKey {
stdout: None,
stderr: None,
artifacts: Vec::new(),
user_metadata: Vec::new(),
};
let metadata2 = metadata.clone();

Expand All @@ -1287,8 +1282,6 @@ impl Node for NodeKey {
let mut level = metadata.level;
let mut message = None;
let mut artifacts = Vec::new();
let mut user_metadata = Vec::new();

let mut result = match self {
NodeKey::DigestFile(n) => n.run_wrapped_node(context).map_ok(NodeOutput::Digest).await,
NodeKey::DownloadedFile(n) => n.run_wrapped_node(context).map_ok(NodeOutput::Digest).await,
Expand Down Expand Up @@ -1319,7 +1312,6 @@ impl Node for NodeKey {
}
message = python_rule_output.message;
artifacts = python_rule_output.new_artifacts;
user_metadata = python_rule_output.new_metadata;
NodeOutput::Value(python_rule_output.value)
})
.await
Expand All @@ -1341,10 +1333,6 @@ impl Node for NodeKey {
level,
message,
artifacts,
user_metadata: user_metadata
.into_iter()
.map(|(key, val)| (key, val.consume_into_arc()))
.collect(),
..metadata
};
(result, final_metadata)
Expand Down Expand Up @@ -1465,7 +1453,6 @@ impl TryFrom<NodeOutput> for PythonRuleOutput {
new_level: None,
message: None,
new_artifacts: Vec::new(),
new_metadata: Vec::new(),
}),
_ => Err(()),
}
Expand Down
1 change: 0 additions & 1 deletion src/rust/engine/workunit_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ rand = "0.6"
tokio = { version = "0.2.22", features = ["rt-util"] }
petgraph = "0.4.5"
log = "0.4"
cpython = "0.5"
Loading

0 comments on commit 5c1aaa0

Please sign in to comment.