Skip to content

Commit

Permalink
Add distributed pass framework: including PassBase/PassTest/PassUtils (
Browse files Browse the repository at this point in the history
…#36643)

* add split_program

* make ut faster

* increase ut timeout

* make result deterministic

* add fuse_all_reduce pass

* add ut framework, update

* fix ut framework

* remove useless code

* add coverage support

* update

* fix CI

* fix some bugs and fix ci coverage

* fix conflict
  • Loading branch information
sneaxiy authored Nov 15, 2021
1 parent b44db69 commit 12339fa
Show file tree
Hide file tree
Showing 21 changed files with 1,592 additions and 39 deletions.
102 changes: 102 additions & 0 deletions paddle/fluid/framework/ir/graph_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,108 @@ void GraphToProgram(const Graph &graph, ProgramDesc *program,
program->CopyFrom(program_pb);
}

static std::vector<std::vector<ir::Node::Dep>> GetOpDependencies(
const BlockDesc &block, const std::unordered_set<ir::Node *> &nodes) {
auto block_ops = block.AllOps();
size_t op_num = block_ops.size();
std::unordered_map<const ir::Node *, std::unordered_set<const ir::Node *>>
preceding_ops(op_num);
std::unordered_map<const ir::Node *, size_t> preceding_deps(op_num);
std::unordered_map<const ir::Node *, std::unordered_set<const ir::Node *>>
pending_ops(op_num);

std::queue<const ir::Node *> ready_ops;
for (const auto *node : nodes) {
if (!node->IsOp()) continue;

auto &tmp_preceding_ops = preceding_ops[node];
for (const auto *in_var : node->inputs) {
for (const auto *in_op : in_var->inputs) {
tmp_preceding_ops.insert(in_op);
}
}
if (tmp_preceding_ops.empty()) {
ready_ops.push(node);
}
preceding_deps[node] = tmp_preceding_ops.size();

auto &tmp_pending_ops = pending_ops[node];
for (const auto *out_var : node->outputs) {
for (const auto *out_op : out_var->outputs) {
tmp_pending_ops.insert(out_op);
}
}
}

std::unordered_map<const ir::Node *, std::unordered_set<const ir::Node *>>
all_preceding_ops;
while (!ready_ops.empty()) {
const auto *cur_op = ready_ops.front();
ready_ops.pop();

auto &all_preceding_ops_of_cur_op = all_preceding_ops[cur_op];
for (const auto *preceding_op : preceding_ops.at(cur_op)) {
all_preceding_ops_of_cur_op.insert(preceding_op);
auto &prev_preceding_ops = all_preceding_ops[preceding_op];
all_preceding_ops_of_cur_op.insert(prev_preceding_ops.begin(),
prev_preceding_ops.end());
}

for (const auto *pending_op : pending_ops.at(cur_op)) {
if (--preceding_deps.at(pending_op) == 0) {
ready_ops.push(pending_op);
}
}
}

std::unordered_map<uint64_t, size_t> op_id_to_idx(op_num);
for (const auto *op_desc : block_ops) {
size_t op_idx = op_id_to_idx.size();
PADDLE_ENFORCE_EQ(
op_id_to_idx.emplace(op_desc->Id(), op_idx).second, true,
platform::errors::InvalidArgument(
"There should not be duplicate op id: %d", op_desc->Id()));
}

std::vector<std::vector<ir::Node::Dep>> dep_matrix(op_num);
for (size_t i = 0; i < op_num; ++i) {
dep_matrix[i].resize(op_num, ir::Node::Dep::kNoDep);
dep_matrix[i][i] = ir::Node::Dep::kSame;
}

auto get_op_idx_by_id = [&op_id_to_idx](uint64_t op_id) {
auto iter = op_id_to_idx.find(op_id);
PADDLE_ENFORCE_NE(iter, op_id_to_idx.end(),
platform::errors::InvalidArgument(
"Cannot find OpDesc with id %d", op_id));
return iter->second;
};

for (const auto &pair : all_preceding_ops) {
const auto *cur_op_node = pair.first;
size_t op_idx_1 = get_op_idx_by_id(cur_op_node->Op()->Id());
for (const auto *preceding_op_node : pair.second) {
size_t op_idx_2 = get_op_idx_by_id(preceding_op_node->Op()->Id());
dep_matrix[op_idx_1][op_idx_2] = ir::Node::Dep::kAfter;
dep_matrix[op_idx_2][op_idx_1] = ir::Node::Dep::kBefore;
}
}
return dep_matrix;
}

std::vector<std::vector<std::vector<ir::Node::Dep>>> GetOpDependencies(
const ProgramDesc &program) {
ir::Graph graph(program);
size_t block_num = program.Size();
std::vector<std::vector<std::vector<ir::Node::Dep>>> deps;
deps.reserve(block_num);
for (size_t i = 0; i < block_num; ++i) {
deps.emplace_back(
GetOpDependencies(program.Block(i), graph.GetSubGraph(i)->Nodes()));
}
return deps;
}

} // namespace ir
} // namespace framework
} // namespace paddle
3 changes: 3 additions & 0 deletions paddle/fluid/framework/ir/graph_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ std::vector<ir::Node *> TopologySortGraphByDescOrder(const Graph &graph);
void GraphToProgram(const Graph &graph, ProgramDesc *p_program,
const SortKind *sort_kind = nullptr);

std::vector<std::vector<std::vector<ir::Node::Dep>>> GetOpDependencies(
const ProgramDesc &program);

} // namespace ir
} // namespace framework
} // namespace paddle
1 change: 1 addition & 0 deletions paddle/fluid/framework/ir/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Node {
}

enum class Type { kOperation, kVariable };
enum class Dep { kSame = 0, kBefore = 1, kAfter = 2, kNoDep = 3 };
#if !defined(_WIN32) // msvc not support constexpr correctly.
static constexpr char kControlDepVarName[] = "__control_var";
#else
Expand Down
113 changes: 82 additions & 31 deletions paddle/fluid/operators/coalesce_tensor_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
void Compute(const framework::ExecutionContext &context) const override {
auto in_var_names = context.InputNames("Input");
auto out_var_names = context.OutputNames("Output");
auto &in_vars = context.MultiInputVar("Input");
auto out_vars = context.MultiOutputVar("Output");
const auto &in_tensors = context.MultiInput<framework::LoDTensor>("Input");
auto out_tensors = context.MultiOutput<framework::LoDTensor>("Output");

PADDLE_ENFORCE_GT(in_var_names.size(), static_cast<size_t>(0),
platform::errors::InvalidArgument(
Expand All @@ -101,30 +101,61 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
in_var_names.size(), out_var_names.size()));

// Input & Output check: only support LoDTensor
for (size_t i = 0; i < in_var_names.size(); ++i) {
bool has_not_init_in_vars = false;
for (size_t i = 0; i < in_tensors.size(); ++i) {
PADDLE_ENFORCE_NOT_NULL(
in_vars[i],
platform::errors::NotFound("The input variable %s of CoalesceTensor "
"operator does not exist.",
in_var_names[i]));
in_tensors[i], platform::errors::InvalidArgument(
"The %d-th input tensor cannot be nullptr.", i));
PADDLE_ENFORCE_NOT_NULL(
out_vars[i],
platform::errors::NotFound("The output variable %s of CoalesceTensor "
"operator does not exist.",
out_var_names[i]));
PADDLE_ENFORCE_EQ(in_vars[i]->IsType<framework::LoDTensor>(), true,
out_tensors[i], platform::errors::InvalidArgument(
"The %d-th output tensor cannot be nullptr.", i));
if (!in_tensors[i]->IsInitialized()) {
has_not_init_in_vars = true;
}
}

if (has_not_init_in_vars) {
const auto &concated_shapes =
context.Attr<std::vector<int64_t>>("concated_shapes");
const auto &concated_ranks =
context.Attr<std::vector<int64_t>>("concated_ranks");
PADDLE_ENFORCE_EQ(concated_ranks.size(), out_tensors.size(),
platform::errors::InvalidArgument(
"The input variable %s of CoalesceTensor operator "
"is not LoDTensor.",
in_var_names[i]));
PADDLE_ENFORCE_EQ(out_vars[i]->IsType<framework::LoDTensor>(), true,
"The attribute(concated_ranks) length must be "
"equal to the output tensor number."));
int64_t accumulated_ranks = 0;
for (size_t i = 0; i < in_tensors.size(); ++i) {
framework::DDim dims(concated_shapes.data() + accumulated_ranks,
concated_ranks[i]);
if (!in_tensors[i]->IsInitialized()) {
PADDLE_ENFORCE_EQ(
in_tensors[i], out_tensors[i],
platform::errors::InvalidArgument(
"The %d-th output tensor and %d-th input tensor when the "
"%d-th input tensor is not initialized.",
i, i, i));
out_tensors[i]->Resize(dims);
} else {
PADDLE_ENFORCE_EQ(
in_tensors[i]->dims(), dims,
platform::errors::InvalidArgument(
"The %d-th input tensor shape does not match the "
"attribute(concated_shapes) and "
"attribute(concated_ranks).",
i));
}
accumulated_ranks += concated_ranks[i];
PADDLE_ENFORCE_LE(accumulated_ranks, concated_shapes.size(),
platform::errors::InvalidArgument(
"The attribute(concated_shapes) and "
"attribute(concated_ranks) do not match."));
}
PADDLE_ENFORCE_EQ(accumulated_ranks, concated_shapes.size(),
platform::errors::InvalidArgument(
"The output variable %s of CoalesceTensor operator "
"is not LoDTensor.",
out_var_names[i]));
"The attribute(concated_shapes) and "
"attribute(concated_ranks) do not match."));
}

auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
bool use_align = context.Attr<bool>("use_align");
auto align_size = context.Attr<int>("align_size");
auto size_of_dtype = context.Attr<int>("user_defined_size_of_dtype");
Expand All @@ -141,8 +172,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
} else {
// Init the output as input
for (size_t i = 0; i < in_tensors.size(); ++i) {
out_vars[i]->GetMutable<framework::LoDTensor>()->Resize(
in_tensors[i]->dims());
out_tensors[i]->Resize(in_tensors[i]->dims());
}
}

Expand All @@ -160,11 +190,13 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {

// Alloc the continuous space
auto fused_tensor = context.Output<framework::LoDTensor>("FusedOutput");
fused_tensor->Resize(framework::make_ddim({static_cast<int64_t>(numel)}))
.mutable_data(context.GetPlace(), dtype);
void *fused_tensor_ptr =
fused_tensor
->Resize(framework::make_ddim({static_cast<int64_t>(numel)}))
.mutable_data(context.GetPlace(), dtype);
VLOG(10) << "Fused tensor addr " << fused_tensor_ptr;

// Init the continuous space
auto out_tensors = context.MultiOutput<framework::LoDTensor>("Output");
size_t offset = 0;
if (context.Attr<bool>("copy_data")) {
#ifdef PADDLE_WITH_ASCEND_CL
Expand Down Expand Up @@ -257,10 +289,6 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
std::stringstream ss;
ss << "alloc_space_for_vars: ";
for (size_t i = 0; i < var_names.size(); ++i) {
PADDLE_ENFORCE_EQ(lod_tensors[i]->IsInitialized(), true,
platform::errors::InvalidArgument(
"Tensor `%s` is not initialized.", var_names[i]));

auto size = lod_tensors[i]->numel();
PADDLE_ENFORCE_GT(
size, 0,
Expand All @@ -272,11 +300,13 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
place, align_size) /
size_of_dtype
: static_cast<size_t>(size);
const void *ptr = lod_tensors[i]->IsInitialized()
? lod_tensors[i]->data<void>()
: nullptr;
VLOG(4) << size << " " << len;
ss << "input(" << var_names[i] << ") dim:(" << lod_tensors[i]->dims()
<< ") "
<< " addres:" << lod_tensors[i]->data<void>() << " len: " << len
<< ", ";
<< " addres:" << ptr << " len: " << len << ", ";
*numel += len;
}
VLOG(10) << ss.str();
Expand Down Expand Up @@ -328,6 +358,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
}

protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &context) const override {
auto dtype = static_cast<framework::proto::VarType::Type>(
context.Attr<int>("dtype"));
return framework::OpKernelType(dtype, context.GetPlace());
}

framework::OpKernelType GetKernelTypeForVar(
const std::string &var_name, const framework::Tensor &tensor,
const framework::OpKernelType &expected_kernel_type) const override {
Expand Down Expand Up @@ -386,6 +423,20 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
"make sure the shape of these two vars are identical with "
"each other, this attr is added.")
.SetDefault(-1);
AddAttr<std::vector<int64_t>>(
"concated_shapes",
"The concated shapes of each shape of the input tensors. "
"If any of the input tensors are not inited, this is used to "
"init the output tensor shape, together with "
"attribute(concated_ranks).")
.SetDefault({});
AddAttr<std::vector<int64_t>>(
"concated_ranks",
"The concated ranks of each rank of the input tensors. "
"If any of the input tensors are not inited, this is used to "
"init the output tensor shape, together with "
"attribute(concated_shapes).")
.SetDefault({});
AddComment(R"DOC(
CoalesceTensor Operator.
Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/pybind/ir.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ void BindNode(py::module *m) {
.value("Operation", Node::Type::kOperation)
.value("Variable", Node::Type::kVariable)
.export_values();

py::enum_<Node::Dep>(node, "Dep")
.value("Same", Node::Dep::kSame)
.value("Before", Node::Dep::kBefore)
.value("After", Node::Dep::kAfter)
.value("NoDep", Node::Dep::kNoDep)
.export_values();
}

class PYBIND11_HIDDEN PassAttrGetterSetterRegistry {
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/pybind/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License. */
#include <tuple>

#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/process_mesh_desc.h"
#include "paddle/fluid/framework/program_desc.h"
Expand Down Expand Up @@ -81,7 +82,10 @@ void BindProgramDesc(pybind11::module *m) {
},
pybind11::arg("version") = pd::kCurProgramVersion)
.def("_version",
[](pd::ProgramDesc &self) -> int64_t { return self.Version(); });
[](pd::ProgramDesc &self) -> int64_t { return self.Version(); })
.def("get_op_deps", [](const framework::ProgramDesc &program) {
return framework::ir::GetOpDependencies(program);
});
}

void BindProcessMeshDesc(pybind11::module *m) {
Expand Down
18 changes: 17 additions & 1 deletion python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,18 @@ def __init__(self):
self.cmd = None


_run_with_coverage = False


def run_with_coverage(*args):
global _run_with_coverage
assert len(args) <= 1, "len(args) {} should <= 1".format(len(args))
if len(args) == 1:
assert isinstance(args[0], bool)
_run_with_coverage = args[0]
return _run_with_coverage


def start_local_trainers(cluster,
pod,
training_script,
Expand Down Expand Up @@ -518,7 +530,11 @@ def start_local_trainers(cluster,

current_env.update(proc_env)

cmd = [sys.executable, "-u", training_script] + training_script_args
coverage_args = []
if run_with_coverage():
coverage_args = ["-m", "coverage", "run", "--branch", "-p"]
cmd = [sys.executable, "-u"] + coverage_args + [training_script
] + training_script_args

logger.debug("start trainer proc{} env:{}".format(cmd, current_env))

Expand Down
Loading

0 comments on commit 12339fa

Please sign in to comment.