Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the use of volatile in ORC #14175

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ __global__ void __launch_bounds__(block_size, 1)
{
using block_scan = cub::BlockScan<uint32_t, block_size, cub::BLOCK_SCAN_WARP_SCANS>;
__shared__ typename block_scan::TempStorage temp_storage;
volatile uint32_t stats_size = 0;
auto t = threadIdx.x;
uint32_t stats_size = 0;
auto t = threadIdx.x;
__syncthreads();
for (thread_index_type start = 0; start < statistics_count; start += block_size) {
uint32_t stats_len = 0, stats_pos;
Expand Down
82 changes: 36 additions & 46 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ struct orcdec_state_s {
* @param[in] base Pointer to raw byte stream data
* @param[in] len Stream length in bytes
*/
static __device__ void bytestream_init(volatile orc_bytestream_s* bs,
uint8_t const* base,
uint32_t len)
static __device__ void bytestream_init(orc_bytestream_s* bs, uint8_t const* base, uint32_t len)
{
uint32_t pos = (len > 0) ? static_cast<uint32_t>(7 & reinterpret_cast<size_t>(base)) : 0;
bs->base = base - pos;
Expand All @@ -160,8 +158,7 @@ static __device__ void bytestream_init(volatile orc_bytestream_s* bs,
* @param[in] bs Byte stream input
* @param[in] bytes_consumed Number of bytes that were consumed
*/
static __device__ void bytestream_flush_bytes(volatile orc_bytestream_s* bs,
uint32_t bytes_consumed)
static __device__ void bytestream_flush_bytes(orc_bytestream_s* bs, uint32_t bytes_consumed)
{
uint32_t pos = bs->pos;
uint32_t len = bs->len;
Expand Down Expand Up @@ -197,7 +194,7 @@ static __device__ void bytestream_fill(orc_bytestream_s* bs, int t)
* @param[in] pos Position in byte stream
* @return byte
*/
inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint8_t bytestream_readbyte(orc_bytestream_s* bs, int pos)
{
return bs->buf.u8[pos & (bytestream_buffer_size - 1)];
}
Expand All @@ -209,7 +206,7 @@ inline __device__ uint8_t bytestream_readbyte(volatile orc_bytestream_s* bs, int
* @param[in] pos Position in byte stream
* @result bits
*/
inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint32_t bytestream_readu32(orc_bytestream_s* bs, int pos)
{
uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2];
uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2];
Expand All @@ -224,7 +221,7 @@ inline __device__ uint32_t bytestream_readu32(volatile orc_bytestream_s* bs, int
* @param[in] numbits number of bits
* @return bits
*/
inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint64_t bytestream_readu64(orc_bytestream_s* bs, int pos)
{
uint32_t a = bs->buf.u32[(pos & (bytestream_buffer_size - 1)) >> 2];
uint32_t b = bs->buf.u32[((pos + 4) & (bytestream_buffer_size - 1)) >> 2];
Expand All @@ -245,9 +242,7 @@ inline __device__ uint64_t bytestream_readu64(volatile orc_bytestream_s* bs, int
* @param[in] numbits number of bits
* @return decoded value
*/
inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs,
int bitpos,
uint32_t numbits)
inline __device__ uint32_t bytestream_readbits(orc_bytestream_s* bs, int bitpos, uint32_t numbits)
{
int idx = bitpos >> 5;
uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123);
Expand All @@ -263,9 +258,7 @@ inline __device__ uint32_t bytestream_readbits(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @return decoded value
*/
inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs,
int bitpos,
uint32_t numbits)
inline __device__ uint64_t bytestream_readbits64(orc_bytestream_s* bs, int bitpos, uint32_t numbits)
{
int idx = bitpos >> 5;
uint32_t a = __byte_perm(bs->buf.u32[(idx + 0) & bytestream_buffer_mask], 0, 0x0123);
Expand All @@ -288,7 +281,7 @@ inline __device__ uint64_t bytestream_readbits64(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
uint32_t& result)
Expand All @@ -304,7 +297,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
int32_t& result)
Expand All @@ -321,7 +314,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
uint64_t& result)
Expand All @@ -337,7 +330,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @param[in] numbits number of bits
* @param[out] result decoded value
*/
inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
inline __device__ void bytestream_readbe(orc_bytestream_s* bs,
int bitpos,
uint32_t numbits,
int64_t& result)
Expand All @@ -354,7 +347,7 @@ inline __device__ void bytestream_readbe(volatile orc_bytestream_s* bs,
* @return length of varint in bytes
*/
template <class T>
inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos)
inline __device__ uint32_t varint_length(orc_bytestream_s* bs, int pos)
{
if (bytestream_readbyte(bs, pos) > 0x7f) {
uint32_t next32 = bytestream_readu32(bs, pos + 1);
Expand Down Expand Up @@ -392,7 +385,7 @@ inline __device__ uint32_t varint_length(volatile orc_bytestream_s* bs, int pos)
* @return new position in byte stream buffer
*/
template <class T>
inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int pos, T& result)
inline __device__ int decode_base128_varint(orc_bytestream_s* bs, int pos, T& result)
{
uint32_t v = bytestream_readbyte(bs, pos++);
if (v > 0x7f) {
Expand Down Expand Up @@ -446,7 +439,7 @@ inline __device__ int decode_base128_varint(volatile orc_bytestream_s* bs, int p
/**
* @brief Decodes a signed int128 encoded as base-128 varint (used for decimals)
*/
inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int pos)
inline __device__ __int128_t decode_varint128(orc_bytestream_s* bs, int pos)
{
auto byte = bytestream_readbyte(bs, pos++);
__int128_t const sign_mask = -(int32_t)(byte & 1);
Expand All @@ -463,7 +456,7 @@ inline __device__ __int128_t decode_varint128(volatile orc_bytestream_s* bs, int
/**
* @brief Decodes an unsigned 32-bit varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint32_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint32_t& result)
{
uint32_t u;
pos = decode_base128_varint<uint32_t>(bs, pos, u);
Expand All @@ -474,7 +467,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint
/**
* @brief Decodes an unsigned 64-bit varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint64_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, uint64_t& result)
{
uint64_t u;
pos = decode_base128_varint<uint64_t>(bs, pos, u);
Expand All @@ -485,7 +478,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, uint
/**
* @brief Signed version of 32-bit decode_varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int32_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int32_t& result)
{
uint32_t u;
pos = decode_base128_varint<uint32_t>(bs, pos, u);
Expand All @@ -496,7 +489,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int3
/**
* @brief Signed version of 64-bit decode_varint
*/
inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int64_t& result)
inline __device__ int decode_varint(orc_bytestream_s* bs, int pos, int64_t& result)
{
uint64_t u;
pos = decode_base128_varint<uint64_t>(bs, pos, u);
Expand All @@ -514,7 +507,7 @@ inline __device__ int decode_varint(volatile orc_bytestream_s* bs, int pos, int6
* @return number of values decoded
*/
template <class T>
inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t)
inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t)
{
for (uint32_t n = 1; n < numvals; n <<= 1) {
__syncthreads();
Expand All @@ -534,8 +527,8 @@ inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals,
* @return number of values decoded
*/
template <class T>
static __device__ uint32_t Integer_RLEv1(
orc_bytestream_s* bs, volatile orc_rlev1_state_s* rle, volatile T* vals, uint32_t maxvals, int t)
static __device__ uint32_t
Integer_RLEv1(orc_bytestream_s* bs, orc_rlev1_state_s* rle, T* vals, uint32_t maxvals, int t)
{
uint32_t numvals, numruns;
if (t == 0) {
Expand Down Expand Up @@ -642,8 +635,8 @@ static const __device__ __constant__ uint8_t ClosestFixedBitsMap[65] = {
*/
template <class T>
static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
volatile orc_rlev2_state_s* rle,
volatile T* vals,
orc_rlev2_state_s* rle,
T* vals,
uint32_t maxvals,
int t,
bool has_buffered_values = false)
Expand Down Expand Up @@ -883,7 +876,7 @@ static __device__ uint32_t Integer_RLEv2(orc_bytestream_s* bs,
*
* @return 32-bit value
*/
inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bitpos)
inline __device__ uint32_t rle8_read_bool32(uint32_t* vals, uint32_t bitpos)
{
uint32_t a = vals[(bitpos >> 5) + 0];
uint32_t b = vals[(bitpos >> 5) + 1];
Expand All @@ -903,11 +896,8 @@ inline __device__ uint32_t rle8_read_bool32(volatile uint32_t* vals, uint32_t bi
*
* @return number of values decoded
*/
static __device__ uint32_t Byte_RLE(orc_bytestream_s* bs,
volatile orc_byterle_state_s* rle,
volatile uint8_t* vals,
uint32_t maxvals,
int t)
static __device__ uint32_t
Byte_RLE(orc_bytestream_s* bs, orc_byterle_state_s* rle, uint8_t* vals, uint32_t maxvals, int t)
{
uint32_t numvals, numruns;
int r, tr;
Expand Down Expand Up @@ -1006,8 +996,8 @@ static const __device__ __constant__ int64_t kPow5i[28] = {1,
* @return number of values decoded
*/
static __device__ int Decode_Decimals(orc_bytestream_s* bs,
volatile orc_byterle_state_s* scratch,
volatile orcdec_state_s::values& vals,
orc_byterle_state_s* scratch,
orcdec_state_s::values& vals,
int val_scale,
int numvals,
type_id dtype_id,
Expand Down Expand Up @@ -1241,8 +1231,8 @@ __global__ void __launch_bounds__(block_size)
}
__syncthreads();
while (s->top.dict.dict_len > 0) {
uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len;
volatile uint32_t* vals = s->vals.u32;
uint32_t numvals = min(s->top.dict.dict_len, blockDim.x), len;
uint32_t* vals = s->vals.u32;
bytestream_fill(&s->bs, t);
__syncthreads();
if (is_rlev1(s->chunk.encoding_kind)) {
Expand Down Expand Up @@ -1310,12 +1300,12 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s,
min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x));
if (s->chunk.valid_map_base != nullptr) {
// We have a present stream
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
uint32_t valid = (t < nrows && r < rmax)
? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1
: 0;
volatile auto* row_ofs_plus1 = (volatile uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count];
uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row);
auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row);
uint32_t valid = (t < nrows && r < rmax)
? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1
: 0;
auto* row_ofs_plus1 = (uint16_t*)&s->u.rowdec.row[s->u.rowdec.nz_count];
uint32_t nz_pos, row_plus1, nz_count = s->u.rowdec.nz_count, last_row;
if (t < nrows) { row_ofs_plus1[t] = valid; }
lengths_to_positions<uint16_t>(row_ofs_plus1, nrows, t);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ constexpr bool zero_pll_war = true;
struct byterle_enc_state_s {
uint32_t literal_run;
uint32_t repeat_run;
volatile uint32_t rpt_map[(512 / 32) + 1];
uint32_t rpt_map[(512 / 32) + 1];
};

struct intrle_enc_state_s {
Expand All @@ -63,7 +63,7 @@ struct intrle_enc_state_s {
uint32_t literal_w;
uint32_t hdr_bytes;
uint32_t pl_bytes;
volatile uint32_t delta_map[(512 / 32) + 1];
uint32_t delta_map[(512 / 32) + 1];
};

struct strdata_enc_state_s {
Expand Down Expand Up @@ -366,7 +366,7 @@ static __device__ uint32_t IntegerRLE(
using block_reduce = cub::BlockReduce<T, block_size>;
uint8_t* dst = s->stream.data_ptrs[cid] + s->strm_pos[cid];
uint32_t out_cnt = 0;
__shared__ volatile uint64_t block_vmin;
__shared__ uint64_t block_vmin;

while (numvals > 0) {
T v0 = (t < numvals) ? inbuf[(inpos + t) & inmask] : 0;
Expand Down Expand Up @@ -615,7 +615,7 @@ static __device__ void StoreStringData(uint8_t* dst,
* @param[in] t thread id
*/
template <class T>
inline __device__ void lengths_to_positions(volatile T* vals, uint32_t numvals, unsigned int t)
inline __device__ void lengths_to_positions(T* vals, uint32_t numvals, unsigned int t)
{
for (uint32_t n = 1; n < numvals; n <<= 1) {
__syncthreads();
Expand Down Expand Up @@ -1143,7 +1143,7 @@ __global__ void __launch_bounds__(256)
uint32_t comp_block_align)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t* volatile uncomp_base_g;
__shared__ uint8_t* uncomp_base_g;

auto const padded_block_header_size = util::round_up_unsafe(block_header_size, comp_block_align);
auto const padded_comp_block_size = util::round_up_unsafe(max_comp_blk_size, comp_block_align);
Expand Down Expand Up @@ -1196,8 +1196,8 @@ __global__ void __launch_bounds__(1024)
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t const* volatile comp_src_g;
__shared__ uint32_t volatile comp_len_g;
__shared__ uint8_t const* comp_src_g;
__shared__ uint32_t comp_len_g;

auto const stripe_id = blockIdx.x;
auto const stream_id = blockIdx.y;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr
: row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row;
for (int j = t4; j < rowgroup_size4; j += 4) {
((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] =
((volatile uint32_t*)&s->rowgroups[i])[j];
((uint32_t*)&s->rowgroups[i])[j];
}
row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows = num_rows;
// Updating in case of struct
Expand Down