Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Remove memory reservation in JoinLeftData used in HashJoin #13751

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ struct JoinLeftData {
/// Counter of running probe-threads, potentially
/// able to update `visited_indices_bitmap`
probe_threads_counter: AtomicUsize,
/// Memory reservation that tracks memory used by `hash_map` hash table
/// `batch`. Cleared on drop.
_reservation: MemoryReservation,
Copy link
Contributor

@korowa korowa Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case reservation is not stored in JoinLeftData, it'll be dropped right after collect_left_input due to being unneeded anymore. And due to MemoryReservation calls free on its drop, it looks like that with this patch, during the whole join execution build side data will be untracked by memory pool.

So the initial idea of this attribute was to make reservation to live as long as JoinLeftData exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPD: some "logs" (additional prints 😞 ) just to check join execution before/after this patch:

// Before
---- joins::hash_join::tests::join_inner_one_no_shared_column_names stdout ----
Waiting build side
Reserving 324 bytes
Reserving 124 bytes
Fetching probe batch
Processing probe batch
Fetching probe batch
Processing unmatched build side
Freeing 448 bytes

// After
---- joins::hash_join::tests::join_inner_one_no_shared_column_names stdout ----
Waiting build side
Reserving 324 bytes
Reserving 124 bytes
Freeing 448 bytes
Fetching probe batch
Processing probe batch
Fetching probe batch
Processing unmatched build side

so yes, the memory is "freed" before join completes its execution, which doesn't seem to be an expected behavior.

Copy link
Contributor

@jayzhan211 jayzhan211 Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I think this is not an issue is because it only matters when we run 'collect_left_join', we track the memory and free after the build side is done. Although, the memory is clean while the hash map is not freed. BUT, since we don't have expected memory change in the probe stage, the memory is not helpful for the probe stage. That's why I think this will not cause any issues if we drop the reservation early.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, the memory is clean while the hash map is not freed

That actually is the issue, because there is single memory pool for a RuntimeEnv, and it's intended to be used by multiple sessions or at least by the multiple operators within the same query. Even in case of single query like select * from a join b order by a.field untracked build side may cause OOM due to SortExec will consider MemoryPool as being empty, while there is a build side data in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After join, we can release hash table memory on the build side, why will we keep it when we are running Sort or other upstream operator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting build side
Reserving 324 bytes
Reserving 124 bytes
Fetching probe batch
Processing probe batch
Fetching probe batch
Processing unmatched build side
Freeing 448 bytes

-> SortExec, no memory used for join

Copy link
Contributor

@korowa korowa Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all operators streams are running simultaneously, consuming each other and requesting memory from the pool. For the query above (select * from a join b order by a.field) the timeline will be roughly like following:

  1. Sort stream is started and waiting for the Join results
  2. Join stream is started, and in order to produce some output it collects build side data reserving memory in the pool
  3. Join stream completed with build side and produces the output batch (build side is still alive since it's required for producing join output)
  4. Sort stream receives input batch from the join, and starts accumulating + sorting the data, making memory reservations in memory pool
  5. steps 3-4 are repeated multiple times, while more and more memory being reserved by Sort (since it needs to read and sort all the input before producing its output)
  6. Join completes processing of probe side, returns None
  7. Sort completes processing of join output, finishes processing of internally accumulated data, and gets destroyed freeing OS memory
  8. Join is not required anymore, and it also dropped freeing OS memory

Previously, the memory in the pool for join build side was "freed" on step 8, so while sort operator requested for additional memory, there was information in the pool that build side data still exists and takes N bytes, and now, after this patch, the reservation is deleted in the end of step 2.

So, now when Sort will start requesting for the memory, DF will report that memory pool is empty, while build side still exists in OS memory, which is a potential source of OOM -- e.g:

  • running on 5G memory machine
  • having memory pool set to 4G
  • having build side with the size of 3G
  • having probe side with the size of 10G

At the start of Sort input processing (step 4 from above) DF will think that there are 4G available in the pool (while actually OS will have 2G available, due to in-memory join build side), so the sort will try to accumulate up to 4G before spilling, and likely will be OOM killed after reading ~2G of data (when OS memory will be exhausted).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And anyway -- the general idea of MemoryPool is to "mirror" OS memory usage for potentially unbounded memory consumers (such as join build side / aggregation accumulator / etc), so I don't think that removal of memory tracking from one of these consumers is a proper direction to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This makes sense.

}

impl JoinLeftData {
Expand All @@ -102,14 +99,12 @@ impl JoinLeftData {
batch: RecordBatch,
visited_indices_bitmap: SharedBitmapBuilder,
probe_threads_counter: AtomicUsize,
reservation: MemoryReservation,
) -> Self {
Self {
hash_map,
batch,
visited_indices_bitmap,
probe_threads_counter,
_reservation: reservation,
}
}

Expand Down Expand Up @@ -902,7 +897,6 @@ async fn collect_left_input(
single_batch,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
);

Ok(data)
Expand Down Expand Up @@ -1019,6 +1013,7 @@ impl BuildSide {
/// └─ ProcessProbeBatch
///
/// ```
#[derive(Debug, Clone)]
enum HashJoinStreamState {
/// Initial state for HashJoinStream indicating that build-side data not collected yet
WaitBuildSide,
Expand All @@ -1044,6 +1039,7 @@ impl HashJoinStreamState {
}

/// Container for HashJoinStreamState::ProcessProbeBatch related data
#[derive(Debug, Clone)]
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
Expand Down
Loading