Skip to content

Commit

Permalink
Expose intrinsics in intrinsics.py via a native call response for g…
Browse files Browse the repository at this point in the history
…enerators.
  • Loading branch information
stuhood committed May 31, 2024
1 parent 3b0ebf3 commit 1b7c9f1
Show file tree
Hide file tree
Showing 17 changed files with 834 additions and 782 deletions.
62 changes: 60 additions & 2 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@ from typing import (

from typing_extensions import Self

from pants.engine.fs import (
CreateDigest,
DigestContents,
DigestEntries,
DigestSubset,
NativeDownloadFile,
PathGlobs,
Paths,
)
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
from pants.engine.internals.native_dep_inference import (
NativeParsedJavascriptDependencies,
NativeParsedPythonDependencies,
)
from pants.engine.internals.scheduler import Workunit, _PathGlobsAndRootCollection
from pants.engine.internals.session import SessionValues
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.internals.session import RunId, SessionValues
from pants.engine.process import (
FallibleProcessResult,
InteractiveProcess,
InteractiveProcessResult,
Process,
)

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
Expand Down Expand Up @@ -472,6 +491,45 @@ EMPTY_SNAPSHOT: Snapshot

def default_cache_path() -> str: ...

# ------------------------------------------------------------------------------
# Intrinsics
# ------------------------------------------------------------------------------

async def create_digest_to_digest(
create_digest: CreateDigest,
) -> Digest: ...
async def path_globs_to_digest(
path_globs: PathGlobs,
) -> Digest: ...
async def path_globs_to_paths(
path_globs: PathGlobs,
) -> Paths: ...
async def download_file_to_digest(
native_download_file: NativeDownloadFile,
) -> Digest: ...
async def digest_to_snapshot(digest: Digest) -> Snapshot: ...
async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents: ...
async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries: ...
async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest: ...
async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest: ...
async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest: ...
async def process_request_to_process_result(
process: Process, process_execution_environment: ProcessExecutionEnvironment
) -> FallibleProcessResult: ...
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest: ...
async def session_values() -> SessionValues: ...
async def run_id() -> RunId: ...
async def interactive_process(
process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment
) -> InteractiveProcessResult: ...
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult: ...
async def parse_python_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedPythonDependencies: ...
async def parse_javascript_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedJavascriptDependencies: ...

# ------------------------------------------------------------------------------
# `pantsd`
# ------------------------------------------------------------------------------
Expand Down
149 changes: 149 additions & 0 deletions src/python/pants/engine/intrinsics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

from pants.engine.fs import (
AddPrefix,
CreateDigest,
Digest,
DigestContents,
DigestEntries,
DigestSubset,
MergeDigests,
NativeDownloadFile,
PathGlobs,
Paths,
RemovePrefix,
Snapshot,
)
from pants.engine.internals import native_engine
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
from pants.engine.internals.native_dep_inference import (
NativeParsedJavascriptDependencies,
NativeParsedPythonDependencies,
)
from pants.engine.internals.native_engine import NativeDependenciesRequest
from pants.engine.internals.session import RunId, SessionValues
from pants.engine.process import (
FallibleProcessResult,
InteractiveProcess,
InteractiveProcessResult,
Process,
ProcessExecutionEnvironment,
)
from pants.engine.rules import _uncacheable_rule, collect_rules, rule


@rule
async def create_digest_to_digest(
create_digest: CreateDigest,
) -> Digest:
return await native_engine.create_digest_to_digest(create_digest)


@rule
async def path_globs_to_digest(
path_globs: PathGlobs,
) -> Digest:
return await native_engine.path_globs_to_digest(path_globs)


@rule
async def path_globs_to_paths(
path_globs: PathGlobs,
) -> Paths:
return await native_engine.path_globs_to_paths(path_globs)


@rule
async def download_file_to_digest(
native_download_file: NativeDownloadFile,
) -> Digest:
return await native_engine.download_file_to_digest(native_download_file)


@rule
async def digest_to_snapshot(digest: Digest) -> Snapshot:
return await native_engine.digest_to_snapshot(digest)


@rule
async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents:
return await native_engine.directory_digest_to_digest_contents(digest)


@rule
async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries:
return await native_engine.directory_digest_to_digest_entries(digest)


@rule
async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest:
return await native_engine.merge_digests_request_to_digest(merge_digests)


@rule
async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest:
return await native_engine.remove_prefix_request_to_digest(remove_prefix)


@rule
async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest:
return await native_engine.add_prefix_request_to_digest(add_prefix)


@rule
async def process_request_to_process_result(
process: Process, process_execution_environment: ProcessExecutionEnvironment
) -> FallibleProcessResult:
return await native_engine.process_request_to_process_result(
process, process_execution_environment
)


@rule
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest:
return await native_engine.digest_subset_to_digest(digest_subset)


@rule
async def session_values() -> SessionValues:
return await native_engine.session_values()


@rule
async def run_id() -> RunId:
return await native_engine.run_id()


@_uncacheable_rule
async def interactive_process(
process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment
) -> InteractiveProcessResult:
return await native_engine.interactive_process(process, process_execution_environment)


@rule
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult:
return await native_engine.docker_resolve_image(request)


@rule
async def parse_python_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedPythonDependencies:
return await native_engine.parse_python_deps(deps_request)


@rule
async def parse_javascript_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedJavascriptDependencies:
return await native_engine.parse_javascript_deps(deps_request)


def rules():
return [
*collect_rules(),
]
3 changes: 2 additions & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pants.build_graph.build_configuration import BuildConfiguration
from pants.core.util_rules import environments, system_binaries
from pants.core.util_rules.environments import determine_bootstrap_environment
from pants.engine import desktop, download_file, fs, process
from pants.engine import desktop, download_file, fs, intrinsics, process
from pants.engine.console import Console
from pants.engine.environment import EnvironmentName
from pants.engine.fs import PathGlobs, Snapshot, Workspace
Expand Down Expand Up @@ -276,6 +276,7 @@ def current_executing_goals(session_values: SessionValues) -> CurrentExecutingGo
rules = FrozenOrderedSet(
(
*collect_rules(locals()),
*intrinsics.rules(),
*build_files.rules(),
*fs.rules(),
*dep_rules.rules(),
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use crate::intrinsics::Intrinsics;
use crate::nodes::{ExecuteProcess, NodeKey, NodeOutput, NodeResult};
use crate::python::{throw, Failure};
use crate::session::{Session, Sessions};
Expand Down Expand Up @@ -62,7 +61,6 @@ pub struct Core {
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
pub intrinsics: Intrinsics,
pub executor: Executor,
store: Store,
/// The CommandRunners to use for execution, in ascending order of reliability (for the purposes
Expand Down Expand Up @@ -519,7 +517,6 @@ impl Core {
executor: Executor,
tasks: Tasks,
types: Types,
intrinsics: Intrinsics,
build_root: PathBuf,
ignore_patterns: Vec<String>,
use_gitignore: bool,
Expand Down Expand Up @@ -683,7 +680,6 @@ impl Core {
tasks,
rule_graph,
types,
intrinsics,
executor: executor.clone(),
store,
command_runners,
Expand Down
36 changes: 12 additions & 24 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType};
use pyo3::{create_exception, IntoPy, PyAny, PyRef};
use regex::Regex;
use remote::remote_cache::RemoteCacheWarningsBehavior;
use rule_graph::{self, DependencyKey, RuleGraph, RuleId};
use rule_graph::{self, RuleGraph};
use store::RemoteProvider;
use task_executor::Executor;
use workunit_store::{
Expand All @@ -49,14 +49,16 @@ use workunit_store::{

use crate::externs::fs::{possible_store_missing_digest, PyFileDigest};
use crate::externs::process::PyProcessExecutionEnvironment;
use crate::intrinsics;
use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination,
Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions,
Rule, Scheduler, Session, SessionCore, Tasks, TypeId, Types, Value,
Failure, Function, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
SessionCore, Tasks, TypeId, Types, Value,
};

#[pymodule]
fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> {
intrinsics::register(py, m)?;
externs::register(py, m)?;
externs::address::register(py, m)?;
externs::fs::register(m)?;
Expand Down Expand Up @@ -701,9 +703,7 @@ fn scheduler_create(
.borrow_mut()
.take()
.ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?;
let intrinsics = Intrinsics::new(&types);
let mut tasks = py_tasks.0.replace(Tasks::new());
tasks.intrinsics_set(&intrinsics);
let tasks = py_tasks.0.replace(Tasks::new());

// NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to
// use `tokio::spawn` since Python does not setup Tokio for the main thread. This also
Expand All @@ -716,7 +716,6 @@ fn scheduler_create(
py_executor.0.clone(),
tasks,
types,
intrinsics,
build_root,
ignore_patterns,
use_gitignore,
Expand Down Expand Up @@ -1022,21 +1021,11 @@ fn session_run_interactive_process(
let interactive_process: Value = interactive_process.into();
let process_config = Value::new(process_config_from_environment.into_py(py));
py.allow_threads(|| {
core.executor.clone().block_on(nodes::maybe_side_effecting(
core.executor.clone().block_on(nodes::task_context(
context.clone(),
true,
&Arc::new(std::sync::atomic::AtomicBool::new(true)),
core.intrinsics.run(
&Intrinsic {
id: RuleId::new("interactive_process"),
product: core.types.interactive_process_result,
inputs: vec![
DependencyKey::new(core.types.interactive_process),
DependencyKey::new(core.types.process_config_from_environment),
],
},
context,
vec![interactive_process, process_config],
),
intrinsics::interactive_process_inner(&context, interactive_process, process_config),
))
})
.map(|v| v.into())
Expand Down Expand Up @@ -1407,8 +1396,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
core.executor.enter(|| {
let result = PyDict::new(py);
for (rule, rule_dependencies) in core.rule_graph.rule_dependencies() {
let Rule::Task(task) = rule else { continue };

let task = rule.0;
let function = &task.func;
let mut dependencies = Vec::new();
for (dependency_key, rule) in rule_dependencies {
Expand All @@ -1421,7 +1409,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
{
continue;
}
let Rule::Task(task) = rule else { continue };
let function = &rule.0.func;

let provided_params = dependency_key
.provided_params
Expand All @@ -1431,7 +1419,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
dependencies.push((
dependency_key.product.as_py_type(py),
provided_params,
task.func.0.value.into_py(py),
function.0.value.into_py(py),
));
}
if dependencies.is_empty() {
Expand Down
Loading

0 comments on commit 1b7c9f1

Please sign in to comment.