-
Notifications
You must be signed in to change notification settings - Fork 915
/
join_utils.cu
155 lines (135 loc) · 6.88 KB
/
join_utils.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <join/join_common_utils.cuh>
#include <rmm/exec_policy.hpp>
#include <thrust/copy.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scatter.h>
#include <thrust/sequence.h>
namespace cudf {
namespace detail {
bool is_trivial_join(table_view const& left, table_view const& right, join_kind join_type)
{
// If there is nothing to join, then send empty table with all columns
if (left.is_empty() || right.is_empty()) { return true; }
// If left join and the left table is empty, return immediately
if ((join_kind::LEFT_JOIN == join_type) && (0 == left.num_rows())) { return true; }
// If Inner Join and either table is empty, return immediately
if ((join_kind::INNER_JOIN == join_type) && ((0 == left.num_rows()) || (0 == right.num_rows()))) {
return true;
}
// If left semi join (contains) and right table is empty,
// return immediately
if ((join_kind::LEFT_SEMI_JOIN == join_type) && (0 == right.num_rows())) { return true; }
return false;
}
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
get_trivial_left_join_indices(table_view const& left,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
thrust::sequence(rmm::exec_policy(stream), left_indices->begin(), left_indices->end(), 0);
auto right_indices =
std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
thrust::uninitialized_fill(
rmm::exec_policy(stream), right_indices->begin(), right_indices->end(), JoinNoneValue);
return std::make_pair(std::move(left_indices), std::move(right_indices));
}
VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream)
{
CUDF_EXPECTS((a.first->size() == a.second->size()),
"Mismatch between sizes of vectors in vector pair");
CUDF_EXPECTS((b.first->size() == b.second->size()),
"Mismatch between sizes of vectors in vector pair");
if (a.first->is_empty()) {
return std::move(b);
} else if (b.first->is_empty()) {
return std::move(a);
}
auto original_size = a.first->size();
a.first->resize(a.first->size() + b.first->size(), stream);
a.second->resize(a.second->size() + b.second->size(), stream);
thrust::copy(
rmm::exec_policy(stream), b.first->begin(), b.first->end(), a.first->begin() + original_size);
thrust::copy(rmm::exec_policy(stream),
b.second->begin(),
b.second->end(),
a.second->begin() + original_size);
return std::move(a);
}
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
get_left_join_indices_complement(std::unique_ptr<rmm::device_uvector<size_type>>& right_indices,
size_type left_table_row_count,
size_type right_table_row_count,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Get array of indices that do not appear in right_indices
// Vector allocated for unmatched result
auto right_indices_complement =
std::make_unique<rmm::device_uvector<size_type>>(right_table_row_count, stream);
// If left table is empty in a full join call then all rows of the right table
// should be represented in the joined indices. This is an optimization since
// if left table is empty and full join is called all the elements in
// right_indices will be JoinNoneValue, i.e. -1. This if path should
// produce exactly the same result as the else path but will be faster.
if (left_table_row_count == 0) {
thrust::sequence(rmm::exec_policy(stream),
right_indices_complement->begin(),
right_indices_complement->end(),
0);
} else {
// Assume all the indices in invalid_index_map are invalid
auto invalid_index_map =
std::make_unique<rmm::device_uvector<size_type>>(right_table_row_count, stream);
thrust::uninitialized_fill(
rmm::exec_policy(stream), invalid_index_map->begin(), invalid_index_map->end(), int32_t{1});
// Functor to check for index validity since left joins can create invalid indices
valid_range<size_type> valid(0, right_table_row_count);
// invalid_index_map[index_ptr[i]] = 0 for i = 0 to right_table_row_count
// Thus specifying that those locations are valid
thrust::scatter_if(rmm::exec_policy(stream),
thrust::make_constant_iterator(0),
thrust::make_constant_iterator(0) + right_indices->size(),
right_indices->begin(), // Index locations
right_indices->begin(), // Stencil - Check if index location is valid
invalid_index_map->begin(), // Output indices
valid); // Stencil Predicate
size_type begin_counter = static_cast<size_type>(0);
size_type end_counter = static_cast<size_type>(right_table_row_count);
// Create list of indices that have been marked as invalid
size_type indices_count = thrust::copy_if(rmm::exec_policy(stream),
thrust::make_counting_iterator(begin_counter),
thrust::make_counting_iterator(end_counter),
invalid_index_map->begin(),
right_indices_complement->begin(),
thrust::identity{}) -
right_indices_complement->begin();
right_indices_complement->resize(indices_count, stream);
}
auto left_invalid_indices =
std::make_unique<rmm::device_uvector<size_type>>(right_indices_complement->size(), stream);
thrust::uninitialized_fill(rmm::exec_policy(stream),
left_invalid_indices->begin(),
left_invalid_indices->end(),
JoinNoneValue);
return std::make_pair(std::move(left_invalid_indices), std::move(right_indices_complement));
}
} // namespace detail
} // namespace cudf