Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded operators #78

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

ritwizsinha
Copy link

@ritwizsinha ritwizsinha commented Jan 12, 2025

Fixes #72

@dentiny
Copy link
Contributor

dentiny commented Jan 12, 2025

nit: Is formatting correct? Maybe we could run make format before commit (or add it into precommit hook).

@@ -0,0 +1,18 @@
-- Create a temporary table for testing
CREATE TEMPORARY TABLE test_table (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you are testing heap table, but not columnstore table here
  2. no need to be a temp table
  3. primary key and auto increment column impacts parallelism


select * from test_table;
-- Drop the temporary table
DROP TABLE test_table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line after each file

}
return SinkResultType::NEED_MORE_INPUT;
}

SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override {
auto &gstate = input.global_state.Cast<ColumnstoreDeleteGlobalState>();
auto &lstate_delete = input.local_state.Cast<ColumnstoreDeleteLocalState>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just lstate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override {
auto &gstate = input.global_state.Cast<ColumnstoreDeleteGlobalState>();
auto &lstate_delete = input.local_state.Cast<ColumnstoreDeleteLocalState>();
gstate.row_ids.insert(lstate_delete.local_row_ids.begin(), lstate_delete.local_row_ids.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need lock on gstate to ensure thread-safe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, ofc, I misinterpreted Combine to be thread safe from the documentation, but multiple combines can run concurrently, added a lock

@@ -21,6 +21,11 @@ class ColumnstoreDeleteGlobalState : public GlobalSinkState {
ColumnDataCollection return_collection;
};

class ColumnstoreDeleteLocalState : public LocalSinkState {
public:
unordered_set<row_t> local_row_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i would just name it row_ids since the meaning is clear from the context, e.g. lstate.row_ids vs gstate.row_ids

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -101,5 +121,4 @@ unique_ptr<PhysicalOperator> Columnstore::PlanDelete(ClientContext &context, Log
del->children.push_back(std::move(plan));
return std::move(del);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add back the new line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

bool IsSink() const override {
return true;
}

bool ParallelSink() const override {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that DuckDB doesn't always parallelize its PhysicalInsert (See DuckCatalog::PlanInsert)

Copy link
Author

@ritwizsinha ritwizsinha Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked if the plan supports parallelism and if number of threads > 1, as done in PhysicalInsert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB also doesn't parallelize PhysicalInsert when there's RETURNING

: executor(context, bound_defaults), insert_count(0), return_collection(context, types) {
: executor(context, bound_defaults), insert_count(0), return_collection(context, types) {}

ExpressionExecutor executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not thread-safe to put in global state

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, replicated what PhysicalInsert was doing with having this in the local state

}
}
if (return_chunk) {
gstate.return_collection.Append(gstate.chunk);
lstate.return_collection.Append(lstate.chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB directly writes to gstate.return_collecion. It appears that Append is thread-safe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if Append is thread safe or not, atleast the documentation doesn't mention that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, ColumnDataCollection::Append() is not thread-safe
I mis-read PhysicalInsert::Sink() that gstate.return_collection.Append() is only used under !parallel branch

@dpxcc
Copy link
Contributor

dpxcc commented Jan 14, 2025

You also need to parallelize ColumnstoreUpdate

@ritwizsinha ritwizsinha marked this pull request as draft January 16, 2025 20:56
@ritwizsinha ritwizsinha marked this pull request as ready for review January 20, 2025 10:44
@ritwizsinha
Copy link
Author

A bit confused why make installcheck fails for updates, for some reason in this test case

CREATE TABLE t (a int, b text) USING columnstore;
INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e');
INSERT INTO t VALUES (2, 'f'), (3, 'g'), (4, 'h');
UPDATE t SET b = a + 1 WHERE a > 3;

Here while inserting, two parquet files get generated. While updating those files if we run a simple update on all, only a single file is updated, somehow the chunks passed to the sink operator running in threads differs from what it is being passed in a single threaded execution. In multi threaded execution both threads are getting jumbled data. Checked in gdb and the chunks passed to sink operators seem overwritten in some way.

@dpxcc
Copy link
Contributor

dpxcc commented Jan 21, 2025

I skimmed through ColumnstoreUpdate but didn’t notice anything obviously wrong. You’ll likely need to debug further in GDB

On a separate note, the current ColumnstoreUpdate::Sink() implementation is mostly serial. To improve parallelism, you’ll need to make the lock more fine-grained

Also, can we close #98? I assume it’s related to this PR

D_ASSERT(!return_chunk);
lock_guard<mutex> lock(gstate.insert_lock);
table.Insert(context.client, lstate.insert_chunk);
gstate.insert_count += lstate.insert_chunk.size();
Copy link
Contributor

@YuweiXiao YuweiXiao Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, the parquet write is still serial with this PR?

my expectation of parallelism would be writing N data files concurrently (e.g., each local state produces its own data files). To avoid small-file problem, it could start with a single files (as in the current implementation) and dynamically open additional files when data keep flowing in.

btw, how duckdb determine the level of parallelism? simply equals to max_threads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, the current ColumnstoreUpdate::Sink() implementation is still mostly serial

there is a GUC mooncake.maximum_threads to control the total number of threads

op.column_index_map, std::move(op.bound_defaults), op.return_chunk,
parallel_streaming_insert && num_threads > 1);
std::cout << "For parallelism number of threads: " << num_threads << "and streaming insert state"
<< parallel_streaming_insert << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging with elog(DEBUG1, xxx)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should just get rid of temp logging

even if we want to log, pd_log is preferred over elog - we don't want to mix pg headers into columnstore code
https://github.com/duckdb/pg_duckdb/blob/main/include/pgduckdb/logger.hpp#L59

@YuweiXiao
Copy link
Contributor

YuweiXiao commented Feb 13, 2025

A bit confused why make installcheck fails for updates, for some reason in this test case

CREATE TABLE t (a int, b text) USING columnstore;
INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e');
INSERT INTO t VALUES (2, 'f'), (3, 'g'), (4, 'h');
UPDATE t SET b = a + 1 WHERE a > 3;

Here while inserting, two parquet files get generated. While updating those files if we run a simple update on all, only a single file is updated, somehow the chunks passed to the sink operator running in threads differs from what it is being passed in a single threaded execution. In multi threaded execution both threads are getting jumbled data. Checked in gdb and the chunks passed to sink operators seem overwritten in some way.

Not sure if this is related. DELETE/UPDATE comes after a SCAN, right? so is the mapping structure used for row_ids (and maintained in SCAN) thread-safe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallelize columnstore INSERT/UPDATE/DELETE
4 participants