-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new load and save ops for storing model params in a single file #7780
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. */ | ||
#include <fstream> | ||
|
||
#include "paddle/framework/op_registry.h" | ||
#include "paddle/platform/device_context.h" | ||
|
||
namespace paddle { | ||
namespace operators { | ||
|
||
class LoadCombineOp : public framework::OperatorBase { | ||
public: | ||
LoadCombineOp(const std::string &type, | ||
const framework::VariableNameMap &inputs, | ||
const framework::VariableNameMap &outputs, | ||
const framework::AttributeMap &attrs) | ||
: OperatorBase(type, inputs, outputs, attrs) {} | ||
void Run(const framework::Scope &scope, | ||
const platform::Place &place) const override { | ||
auto filename = Attr<std::string>("file_path"); | ||
auto position_counter = Attr<int>("position_counter"); | ||
|
||
std::ifstream fin(filename); | ||
PADDLE_ENFORCE(static_cast<bool>(fin), | ||
"Cannot open file %s for load_combine op", filename); | ||
|
||
auto out_var_name = Output("Out"); | ||
auto *out_var = scope.FindVar(out_var_name); | ||
PADDLE_ENFORCE(out_var != nullptr, "Output variable %s cannot be found", | ||
out_var_name); | ||
|
||
auto *tensor = out_var->GetMutable<framework::LoDTensor>(); | ||
|
||
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); | ||
auto &dev_ctx = *pool.Get(place); | ||
|
||
uint64_t data_length; | ||
char *buffer = NULL; | ||
for (int i = 0; i <= position_counter; i++) { | ||
if (!buffer) delete[] buffer; | ||
|
||
// Error checking | ||
PADDLE_ENFORCE(static_cast<bool>(fin), "Cannot read more from file %s", | ||
filename); | ||
|
||
// Read a fixed-width int, to get the number of bytes | ||
// for the serialized data. | ||
fin.read(reinterpret_cast<char *>(&data_length), sizeof(data_length)); | ||
|
||
// Error checking | ||
PADDLE_ENFORCE(static_cast<bool>(fin), "Cannot read more from file %s", | ||
filename); | ||
|
||
buffer = new char[data_length]; | ||
|
||
// Read the serialized data into the buffer | ||
fin.read(buffer, data_length); | ||
} | ||
|
||
std::string current_serialized_data; | ||
current_serialized_data.assign(buffer, data_length); | ||
|
||
// Create an input string stream | ||
std::istringstream ist(current_serialized_data); | ||
DeserializeFromStream(ist, tensor, dev_ctx); | ||
|
||
if (!buffer) delete[] buffer; // delete the last allocated memory | ||
|
||
if (platform::is_gpu_place(place)) { | ||
// copy CPU to GPU | ||
framework::LoDTensor cpu_tensor; | ||
cpu_tensor.ShareDataWith(*tensor); | ||
cpu_tensor.set_lod(tensor->lod()); | ||
|
||
// reset tensor | ||
out_var->Clear(); | ||
tensor = out_var->GetMutable<framework::LoDTensor>(); | ||
tensor->set_lod(cpu_tensor.lod()); | ||
Copy(cpu_tensor, place, dev_ctx, tensor); | ||
} | ||
} | ||
}; | ||
|
||
class LoadCombineOpProtoMaker : public framework::OpProtoAndCheckerMaker { | ||
public: | ||
LoadCombineOpProtoMaker(OpProto *proto, OpAttrChecker *op_checker) | ||
: OpProtoAndCheckerMaker(proto, op_checker) { | ||
AddOutput("Out", "(Tensor) The tensor need to be load_combineed"); | ||
AddAttr<int>("position_counter", | ||
"(int) " | ||
"It specifies the relative ordering of different parameters.") | ||
.AddCustomChecker([](const int &counter) { return counter >= 0; }); | ||
AddAttr<std::string>("file_path", | ||
"(string) " | ||
"Variable will be load_combined from \"file_path\".") | ||
.AddCustomChecker( | ||
[](const std::string &path) { return !path.empty(); }); | ||
AddComment(R"DOC( | ||
LoadCombine Operator. | ||
|
||
LoadCombine operator combines together various tensor variable into a file. | ||
|
||
)DOC"); | ||
} | ||
}; | ||
} // namespace operators | ||
} // namespace paddle | ||
namespace ops = paddle::operators; | ||
|
||
REGISTER_OPERATOR(load_combine, ops::LoadCombineOp, | ||
ops::LoadCombineOpProtoMaker); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. */ | ||
|
||
#include <stdint.h> | ||
#include <sys/stat.h> | ||
#include <fstream> | ||
#include <numeric> | ||
#include <sstream> | ||
#include "paddle/framework/data_type.h" | ||
#include "paddle/framework/framework.pb.h" | ||
#include "paddle/framework/lod_tensor.h" | ||
#include "paddle/framework/op_registry.h" | ||
#include "paddle/platform/device_context.h" | ||
|
||
namespace paddle { | ||
namespace operators { | ||
|
||
// TODO(sidgoyal78): These function are needed by other files (save_op), move | ||
// them to paddle::filesystem namespace. (as noted by yuyang18 in save_op). | ||
constexpr char kSEP = '/'; | ||
static bool FileExists(const std::string &filepath) { | ||
struct stat buffer; | ||
return (stat(filepath.c_str(), &buffer) == 0); | ||
} | ||
|
||
static std::string DirName(const std::string &filepath) { | ||
auto pos = filepath.rfind(kSEP); | ||
if (pos == std::string::npos) { | ||
return ""; | ||
} | ||
return filepath.substr(0, pos); | ||
} | ||
|
||
static void MkDir(const char *path) { | ||
if (mkdir(path, 0755)) { | ||
PADDLE_ENFORCE_EQ(errno, EEXIST, "%s mkdir failed!", path); | ||
} | ||
} | ||
|
||
static void MkDirRecursively(const char *fullpath) { | ||
if (*fullpath == '\0') return; // empty string | ||
if (FileExists(fullpath)) return; | ||
|
||
MkDirRecursively(DirName(fullpath).c_str()); | ||
MkDir(fullpath); | ||
} | ||
|
||
class SaveCombineOp : public framework::OperatorBase { | ||
public: | ||
SaveCombineOp(const std::string &type, | ||
const framework::VariableNameMap &inputs, | ||
const framework::VariableNameMap &outputs, | ||
const framework::AttributeMap &attrs) | ||
: OperatorBase(type, inputs, outputs, attrs) {} | ||
void Run(const framework::Scope &scope, | ||
const platform::Place &place) const override { | ||
auto filename = Attr<std::string>("file_path"); | ||
auto overwrite = Attr<bool>("overwrite"); | ||
auto position_counter = Attr<int>("position_counter"); | ||
|
||
bool is_present = FileExists(filename); | ||
if (is_present && !overwrite && position_counter == 0) { | ||
PADDLE_THROW( | ||
"%s is existed, cannot save_combine to it when overwrite=false", | ||
filename, overwrite); | ||
} | ||
|
||
MkDirRecursively(DirName(filename).c_str()); | ||
|
||
std::ofstream fout; | ||
|
||
// if position_counter is 0, we open the file in write mode, | ||
// otherwise, we open in append mode. | ||
if (position_counter == 0) { | ||
fout.open(filename); | ||
} else { | ||
fout.open(filename, std::ios_base::app); | ||
} | ||
PADDLE_ENFORCE(static_cast<bool>(fout), "Cannot open %s to write", | ||
filename); | ||
|
||
auto iname = Input("X"); | ||
|
||
auto *var = scope.FindVar(iname); | ||
PADDLE_ENFORCE(var != nullptr, | ||
"Cannot find variable %s for save_combine_op", iname); | ||
|
||
PADDLE_ENFORCE(var->IsType<framework::LoDTensor>(), | ||
"SaveCombineOp only support LoDTensor, %s has wrong type", | ||
iname); | ||
|
||
auto &tensor = var->Get<framework::LoDTensor>(); | ||
|
||
// get device context from pool | ||
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); | ||
auto &dev_ctx = *pool.Get(place); | ||
|
||
// Create "output string stream" to get the serialized LodTensor | ||
std::ostringstream str_stream; | ||
framework::SerializeToStream(str_stream, tensor, dev_ctx); | ||
std::string current_serialized_data = str_stream.str(); | ||
|
||
// Save 'current_size' information as a fixed width integer, and | ||
// further save the serialized data using 'current_size' bytes | ||
uint64_t current_size = current_serialized_data.size(); | ||
fout.write(reinterpret_cast<const char *>(¤t_size), | ||
sizeof(current_size)); | ||
fout.write(reinterpret_cast<const char *>(current_serialized_data.c_str()), | ||
static_cast<std::streamsize>(current_size)); | ||
fout.close(); | ||
} | ||
}; | ||
|
||
class SaveCombineOpProtoMaker : public framework::OpProtoAndCheckerMaker { | ||
public: | ||
SaveCombineOpProtoMaker(OpProto *proto, OpAttrChecker *op_checker) | ||
: OpProtoAndCheckerMaker(proto, op_checker) { | ||
AddInput("X", "(Tensor ) Input tensor to be save_combined"); | ||
AddComment(R"DOC( | ||
Save_combine operator | ||
|
||
This operator will serialize and write tensor variables to file on disk in a | ||
combined fashion. | ||
)DOC"); | ||
AddAttr<bool>("overwrite", | ||
"(boolean, default true)" | ||
"Overwrite the output file if exist") | ||
.SetDefault(true); | ||
AddAttr<int>("position_counter", | ||
"(int) " | ||
"It specifies the relative ordering of different parameters.") | ||
.AddCustomChecker([](const int &counter) { return counter >= 0; }); | ||
AddAttr<std::string>( | ||
"file_path", | ||
"(string)" | ||
"The \"file_path\" where the variable will be save_combined.") | ||
.AddCustomChecker( | ||
[](const std::string &path) { return !path.empty(); }); | ||
} | ||
}; | ||
|
||
} // namespace operators | ||
} // namespace paddle | ||
|
||
namespace ops = paddle::operators; | ||
|
||
REGISTER_OPERATOR(save_combine, ops::SaveCombineOp, | ||
ops::SaveCombineOpProtoMaker); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the implementation of
load_op
? Theload_combined
operator is a little confusing.Maybe we can change the output
Out
ofload_op
to aDuplicable
:Then
Out
will be a vector ofVariable
s, and we can load all theVariable
s from a file through oneload_op
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can use
offset
in a file instead of theposition_counter
? And can we avoid to read the content of the file again and again?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can combine it with
load_op
. But in my opinion, it will look more like:Regarding your
Duplicable
suggestion: I am not sure I follow completely, but still I think we will need a deserialization logic over this merged together tensor (which maybe the same amount of computation).Regarding your suggestion about
offset
: I think it has 2 aspects:new
and copy toistringstream
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with it. Maybe can do it in next PRs.
The current LoadCombineOp only process one file each time. Can it process many files at the same time? That is to say, this op loads all parameters at the same time and saves them into one file, and it doesn't need offset or point_counter. Thus, it can avoid reading the content of the file again and again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luotao1 : Can you please explain a bit? I am not sure if I understand the main idea. (Did you mean : can it process many tensors (instead of files) at the same time?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To just explain my point of view a bit more, I thought that the op will be used in the manner as done currently in
save_vars()
influid/io.py
(ref. So basically, i assumed that we will have some of usage which looks like this:https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/io.py#L89-L97
(with
save
replaced withsave_combine
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
I think what @luotao1 is saying is that you can construct a save/load program desc for loading/saving all parameters data at once. This program desc has only one load/save op that takes a list of var names as input/output and do the load/save tasks for once. This list can be obtained from the main program desc by traversing variables. By doing this, we can save a lot of time reading/write files. Thus I prefer this option.