Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not do rereading step in fread #2558

Merged
merged 40 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5714801
Rename ordered_task -> OrderedTask
st-pasha Jul 28, 2020
e9c9ee8
Rename wait_task -> WaitTask
st-pasha Jul 28, 2020
690c245
Rename ordered_scheduler -> OrderedJob
st-pasha Jul 28, 2020
4ef8873
Orderedjob member variables with _'s
st-pasha Jul 28, 2020
e33e29e
OrderedJob.tasks_ is now a vector of unique_ptr's
st-pasha Jul 28, 2020
3a7da2c
Rename class thread_team -> ThreadTeam
st-pasha Jul 28, 2020
c372d6b
Create new implementation of parallel_for_ordered() loop
st-pasha Jul 28, 2020
230a9b1
Use new ordered loop in generate_string_column()
st-pasha Jul 28, 2020
76d27ed
Simplify map_str2str()
st-pasha Jul 29, 2020
8c5abc5
Use new parallel_for_ordered2() in test_parallel_for_ordered()
st-pasha Jul 29, 2020
9abf6aa
Use parallel_for_ordered2() in test_progress_ordered
st-pasha Jul 29, 2020
80a086c
Use new parallel_for_ordered2 in write_to_csv
st-pasha Jul 29, 2020
403935f
job_->abort_execution() is superfluous
st-pasha Jul 29, 2020
c399ba8
Use new parallel_for_ordered2 in fread
st-pasha Jul 29, 2020
59b7abf
Remove old code and final renames
st-pasha Jul 29, 2020
f096efb
Introduce local types in Parallel reader
st-pasha Jul 29, 2020
ac05c04
Merge branch 'master' into ps-typebumps
st-pasha Jul 31, 2020
a35aa49
tests for parallel ordered loop moved into separate file
st-pasha Jul 31, 2020
8d465ca
Added tests for throwing exceptions in an ordered loop
st-pasha Jul 31, 2020
edb5951
WaitTask -> NoopTask
st-pasha Jul 31, 2020
a8adeec
Added a test for set_num_iterations()
st-pasha Jul 31, 2020
c3dbd8c
Added test for wait_until_all_finalized()
st-pasha Jul 31, 2020
46b582b
Add a test for single-threaded set_num_iters()
st-pasha Jul 31, 2020
487c68b
Added a noop test for parallel ordered loop
st-pasha Aug 1, 2020
ad9af5f
Added method OrderedTask::super_ordered()
st-pasha Aug 1, 2020
eb97d8e
Simplify wait_until_all_finalized()
st-pasha Aug 1, 2020
295eb07
Inline ParallelReader::order_chunk()
st-pasha Aug 3, 2020
b756851
Perform on-the-fly type bumps in fread
st-pasha Aug 4, 2020
4feb656
Cleanup some of the 'type-bumping' logic
st-pasha Aug 4, 2020
993c54b
Removed "Prepare for re-reading"
st-pasha Aug 4, 2020
8abc9f9
Remove is_in_buffer()
st-pasha Aug 4, 2020
5022a9b
More cleanup
st-pasha Aug 4, 2020
7a467ec
Remove OutputColumn::present_in_buffer flag
st-pasha Aug 5, 2020
a887974
Restore stats calculation in fread
st-pasha Aug 5, 2020
965943a
ignore iris.xls.zip in tests
st-pasha Aug 5, 2020
3475efe
do not report skipped files in large fread test
st-pasha Aug 5, 2020
41e1e20
remove WORK_REREAD
st-pasha Aug 5, 2020
ca4d84a
Remove reread stats from FreadObserver
st-pasha Aug 5, 2020
6ca460f
remove commented-out code
st-pasha Aug 5, 2020
9f5b6aa
Remove -Wno-implicit-int-float-conversion flag, not portable
st-pasha Aug 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ clean::
rm -rf build
rm -rf dist
rm -rf datatable.egg-info
rm -rf htmlcov
rm -f *.so
rm -f src/datatable/lib/_datatable*.pyd
rm -f src/datatable/lib/_datatable*.so
Expand Down
1 change: 1 addition & 0 deletions ci/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def build_extension(cmd, verbosity=3):
"-Wno-switch-enum",
"-Wno-weak-template-vtables",
"-Wno-weak-vtables",
"-Wno-implicit-int-float-conversion"
st-pasha marked this conversation as resolved.
Show resolved Hide resolved
)
else:
ext.compiler.add_compiler_flag(
Expand Down
26 changes: 3 additions & 23 deletions src/core/csv/fread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ std::unique_ptr<DataTable> FreadReader::read_all()
int nUserBumped = 0;
for (size_t i = 0; i < ncols; i++) {
auto& col = preframe.column(i);
col.reset_type_bumped();
if (col.is_dropped()) {
ndropped++;
continue;
Expand Down Expand Up @@ -100,37 +99,18 @@ std::unique_ptr<DataTable> FreadReader::read_all()
//****************************************************************************
// [6] Read the data
//****************************************************************************
bool firstTime = true;

auto typesPtr = preframe.get_ptypes();
dt::read::PT* types = typesPtr.data(); // This pointer is valid until `typesPtr` goes out of scope

read: // we'll return here to reread any columns with out-of-sample type exceptions
{
auto _ = logger_.section("[6] Read the data");
job->set_message(firstTime? "Reading data" : "Rereading data");
dt::progress::subtask subwork(*job, firstTime? WORK_READ : WORK_REREAD);
job->set_message("Reading data");
dt::progress::subtask subwork(*job, WORK_READ);
dt::read::FreadParallelReader scr(*this, types);
scr.read_all();
subwork.done();

if (firstTime) {
fo.t_data_read = fo.t_data_reread = wallclock();
} else {
fo.t_data_reread = wallclock();
}
size_t ncols_to_reread = preframe.n_columns_to_reread();
xassert((ncols_to_reread > 0) == reread_scheduled);
if (ncols_to_reread) {
fo.n_cols_reread += ncols_to_reread;
D() << dt::log::plural(ncols_to_reread, "column")
<< " need to be re-read because their types have changed";
preframe.prepare_for_rereading();
firstTime = false;
reread_scheduled = false;
goto read;
}

fo.t_data_read = wallclock();
fo.n_rows_read = preframe.nrows_written();
fo.n_cols_read = preframe.n_columns_in_output();
}
Expand Down
1 change: 1 addition & 0 deletions src/core/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ void GenericReader::report_columns_to_python() {
auto& coli = preframe.column(i);
py::robj elem = newTypesList[i];
coli.set_rtype(elem.to_int64());
coli.outcol().set_stype(coli.get_stype());
if (newNamesList && coli.get_rtype() != RT::RDrop) {
XAssert(j < newNamesList.size());
elem = newNamesList[j++];
Expand Down
1 change: 0 additions & 1 deletion src/core/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class GenericReader
public:
static constexpr size_t WORK_PREPARE = 2;
static constexpr size_t WORK_READ = 100;
static constexpr size_t WORK_REREAD = 60;
static constexpr size_t WORK_DECODE_UTF16 = 50;
std::shared_ptr<dt::progress::work> job;
Buffer input_mbuf;
Expand Down
20 changes: 6 additions & 14 deletions src/core/csv/reader_fread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ int64_t FreadReader::parse_single_line(dt::read::ParseContext& fctx)
fctx.skip_whitespace();

const char* fieldStart = tch;
auto ptype_iter = col.get_ptype_iterator(&fctx.quoteRule);
auto ptype_iter = dt::read::PtypeIterator(
col.get_ptype(), col.get_rtype(), &fctx.quoteRule);
while (true) {
// Try to parse using the regular field parser
tch = fieldStart;
Expand Down Expand Up @@ -532,6 +533,7 @@ int64_t FreadReader::parse_single_line(dt::read::ParseContext& fctx)
}
if (j < ncols && ptype_iter.has_incremented()) {
col.set_ptype(*ptype_iter);
col.outcol().set_stype(col.get_stype());
}
j++;

Expand Down Expand Up @@ -949,7 +951,6 @@ FreadObserver::FreadObserver(const dt::read::GenericReader& g_) : g(g_) {
t_column_types_detected = 0;
t_frame_allocated = 0;
t_data_read = 0;
t_data_reread = 0;
time_read_data = 0.0;
time_push_data = 0.0;
input_size = 0;
Expand All @@ -958,7 +959,6 @@ FreadObserver::FreadObserver(const dt::read::GenericReader& g_) : g(g_) {
n_lines_sampled = 0;
n_rows_allocated = 0;
n_cols_allocated = 0;
n_cols_reread = 0;
allocation_size = 0;
read_data_nthreads = 0;
}
Expand All @@ -974,8 +974,7 @@ void FreadObserver::report() {
t_parse_parameters_detected <= t_column_types_detected &&
t_column_types_detected <= t_frame_allocated &&
t_frame_allocated <= t_data_read &&
t_data_read <= t_data_reread &&
t_data_reread <= t_end &&
t_data_read <= t_end &&
read_data_nthreads > 0);
double total_time = std::max(t_end - t_start + g.t_open_input, 1e-6);
int total_minutes = static_cast<int>(total_time/60);
Expand All @@ -984,11 +983,10 @@ void FreadObserver::report() {
double types_time = t_column_types_detected - t_parse_parameters_detected;
double alloc_time = t_frame_allocated - t_column_types_detected;
double read_time = t_data_read - t_frame_allocated;
double reread_time = t_data_reread - t_data_read;
double makedt_time = t_end - t_data_reread;
double makedt_time = t_end - t_data_read;
double t_read = time_read_data.load() / read_data_nthreads;
double t_push = time_push_data.load() / read_data_nthreads;
double time_wait_data = read_time + reread_time - t_read - t_push;
double time_wait_data = read_time - t_read - t_push;
int p = total_time < 10 ? 5 :
total_time < 100 ? 6 :
total_time < 1000 ? 7 : 8;
Expand Down Expand Up @@ -1021,12 +1019,6 @@ void FreadObserver::report() {
g.d() << " + " << ff(p, 3, read_time) << "s ("
<< ff(2, 0, 100 * read_time / total_time) << "%)"
<< " reading data";
if (n_cols_reread) {
g.d() << " + " << ff(p, 3, reread_time) << "s ("
<< ff(2, 0, 100 * reread_time / total_time) << "%)"
<< " re-reading " << n_cols_reread
<< " columns due to out-of-sample type exceptions";
}
g.d() << " = " << ff(p, 3, t_read) << "s (" << ff(2, 0, 100 * t_read / total_time) << "%) reading into row-major buffers";
g.d() << " = " << ff(p, 3, t_push) << "s (" << ff(2, 0, 100 * t_push / total_time) << "%) saving into the output frame";
g.d() << " = " << ff(p, 3, time_wait_data) << "s (" << ff(2, 0, 100 * time_wait_data / total_time) << "%) waiting";
Expand Down
2 changes: 0 additions & 2 deletions src/core/csv/reader_fread.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class FreadObserver {
double t_column_types_detected;
double t_frame_allocated;
double t_data_read;
double t_data_reread;
dt::atomic<double> time_read_data;
dt::atomic<double> time_push_data;
size_t n_rows_read;
Expand All @@ -62,7 +61,6 @@ class FreadObserver {
size_t n_lines_sampled;
size_t n_rows_allocated;
size_t n_cols_allocated;
size_t n_cols_reread;
size_t allocation_size;
size_t read_data_nthreads;
std::vector<std::string> messages;
Expand Down
35 changes: 35 additions & 0 deletions src/core/csv/reader_parsers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,41 @@ ParserLibrary::ParserLibrary() {



//------------------------------------------------------------------------------
// PtypeIterator
//------------------------------------------------------------------------------
namespace dt {
namespace read {


PtypeIterator::PtypeIterator(PT pt, RT rt, int8_t* qr_ptr)
: pqr(qr_ptr), rtype(rt), orig_ptype(pt), curr_ptype(pt) {}

PT PtypeIterator::operator*() const {
return curr_ptype;
}

RT PtypeIterator::get_rtype() const {
return rtype;
}

PtypeIterator& PtypeIterator::operator++() {
if (curr_ptype < PT::Str32) {
curr_ptype = static_cast<PT>(curr_ptype + 1);
} else {
*pqr = *pqr + 1;
}
return *this;
}

bool PtypeIterator::has_incremented() const {
return curr_ptype != orig_ptype;
}




}}
//------------------------------------------------------------------------------
// ParserIterator
//------------------------------------------------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions src/core/csv/reader_parsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,31 @@ class ParserInfo {
//------------------------------------------------------------------------------
// Parser iterators
//------------------------------------------------------------------------------
namespace dt {
namespace read {


class PtypeIterator {
private:
int8_t* pqr;
RT rtype;
PT orig_ptype;
PT curr_ptype;
int64_t : 40;

public:
PtypeIterator(PT pt, RT rt, int8_t* qr_ptr);
PT operator*() const;
PtypeIterator& operator++();
bool has_incremented() const;
RT get_rtype() const;
};


}}


// unused?
class ParserIterator {
private:
int ipt;
Expand All @@ -184,6 +208,7 @@ class ParserIterator {
value_type operator*() const;
};

// unused?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove ParserIterable ParserLibrary::successor_types(dt::read::PT pt) const; method (that is not used anywhere anyways) ParserIterable and ParserIterator classes could be safely removed too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to do more cleanup around the ParserLibrary in general, but it's better to defer to a future PR.

class ParserIterable {
private:
const dt::read::PT ptype;
Expand Down
68 changes: 60 additions & 8 deletions src/core/parallel/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ void parallel_for_ordered(size_t n_iterations,
function<std::unique_ptr<OrderedTask>()>);


/**
* This class facilitates execution of ordered for-loops. A user
* is expected to derive from this class, overriding methods
* `start(i)`, `order(i)` and `finish(i)`; and then call
* `dt::parallel_for_ordered()` supplying a factory function to
* create instances of the derived class.
*
* The class also has several methods for controlling the execution
* of the orderer loop. See their descriptions below.
*/
class OrderedTask : public ThreadTask {
enum State : size_t;
private:
Expand All @@ -149,26 +159,68 @@ class OrderedTask : public ThreadTask {
virtual void start(size_t i);
virtual void order(size_t i);
virtual void finish(size_t i);
size_t get_iteration() const noexcept;

void execute() override; // ThreadTask's API

size_t get_num_iterations() const;
// (This method may only be called from an ordered section).
// Change the number of iterations in the ordered job. The new
// number of iterations cannot be less than the number of
// iterations that were already ordered.
//
// If the new number of iterations is smaller than the original
// total count, then it is guaranteed that no task will be
// ordered or finished past `n`, although it is possible that
// some tasks will have started at an index `n` or above.
//
void set_num_iterations(size_t n);
size_t get_num_iterations() const;

// (This method may only be called from an ordered section).
// By the time this method returns, all tasks with the index
// less than the current will have completed their "finish"
// stage. Furthermore, no new tasks will start a "finish" stage
// until the end of the enclosing "ordered" step.
//
void wait_until_all_finalized();

// (This method may only be called from an ordered section).
// This method performs the following sequence of actions:
// - blocks new tasks from entering the "start" stage;
// - waits until there are no tasks executing either the
// "start" or "finish" stages;
// - executes the payload function `f()`;
// - resumes the multithreaded execution, making sure that all
// iterations beginning with the current will be re"start"ed.
// More specifically, the current iteration that was
// executing an ordered section, will not finish. Instead,
// this iteration will start again from the "start" step, then
// execute "ordered" again, and only after that it will
// "finish".
//
// Thus, this method creates a region of execution which behaves
// as-if the ordered loop was cancelled, then f() executed in a
// single-threaded environment, then the ordered loop restarted
// from the same iteration where it broke off.
//
// The programmer must take care not to create an infinite loop
// by repeatedly calling `super_ordered` on each execution of
// the same task.
//
void super_ordered(std::function<void()> f);

size_t get_iteration() const noexcept;
bool is_starting() const noexcept;
bool is_ordering() const noexcept;
bool is_finishing() const noexcept;

void execute() override; // ThreadTask's API

private:
friend class MultiThreaded_OrderedJob;
friend class SingleThreaded_OrderedJob;
void init_parent(OrderedJob* parent);
bool ready_to_start() const noexcept;
bool ready_to_order() const noexcept;
bool ready_to_finish() const noexcept;
bool is_starting() const noexcept;
bool is_ordering() const noexcept;
bool is_finishing() const noexcept;
void advance_state();
void cancel();
void start_iteration(size_t i);
};

Expand Down
Loading