Skip to content

Commit

Permalink
Merge pull request #5720 from vuule/remove-cuio-new-calls
Browse files Browse the repository at this point in the history
[REVIEW] [CUIO] Replace owning raw pointers with std::unique_ptr
  • Loading branch information
vuule authored Jul 21, 2020
2 parents b081ebc + 41f4ee2 commit ed84164
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 151 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
- PR #5708 Add support for `dummy_na` in `get_dummies`
- PR #5709 Update java build to help cu-spacial with java bindings
- PR #5713 Remove old NVTX utilities
- PR #5720 Replace owning raw pointers with std::unique_ptr
- PR #5702 Add inherited methods to python docs and other docs fixes

## Bug Fixes
Expand Down
15 changes: 6 additions & 9 deletions cpp/src/io/comp/io_uncomp.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <memory>
#include <string>
#include <vector>

Expand All @@ -35,15 +36,11 @@ enum {
IO_UNCOMP_STREAM_TYPE_ZSTD = 10,
};

void io_uncompress_single_h2d(const void* src,
size_t src_size,
int strm_type,
std::vector<char>& dst);
std::vector<char> io_uncompress_single_h2d(const void* src, size_t src_size, int stream_type);

void getUncompressedHostData(const char* h_data,
size_t num_bytes,
const std::string& compression,
std::vector<char>& h_uncomp_data);
std::vector<char> getUncompressedHostData(const char* h_data,
size_t num_bytes,
const std::string& compression);

class HostDecompressor {
public:
Expand All @@ -54,7 +51,7 @@ class HostDecompressor {
virtual ~HostDecompressor() {}

public:
static HostDecompressor* Create(int stream_type);
static std::unique_ptr<HostDecompressor> Create(int stream_type);
};

} // namespace io
Expand Down
108 changes: 45 additions & 63 deletions cpp/src/io/comp/uncomp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,43 +277,39 @@ int cpu_inflate_vector(std::vector<char> &dst, const uint8_t *comp_data, size_t
return (zerr == Z_STREAM_END) ? Z_OK : zerr;
}

/* --------------------------------------------------------------------------*/
/**
* @Brief Uncompresses a gzip/zip/bzip2/xz file stored in system memory.
*
* The result is allocated and stored in a vector.
* If the function call fails, the output vector is empty.
*
* @param src[in] Pointer to the compressed data in system memory
* @param src_size[in] The size of the compressed data, in bytes
* @param strm_type[in] Type of compression of the input data
* @param dst[out] Vector containing the uncompressed output
* @param stream_type[in] Type of compression of the input data
*
* @return Vector containing the uncompressed output
*/
/* ----------------------------------------------------------------------------*/
void io_uncompress_single_h2d(const void *src,
size_t src_size,
int strm_type,
std::vector<char> &dst)
std::vector<char> io_uncompress_single_h2d(const void *src, size_t src_size, int stream_type)
{
const uint8_t *raw = (const uint8_t *)src;
const uint8_t *comp_data = nullptr;
size_t comp_len = 0;
size_t uncomp_len = 0;
cudaStream_t strm = (cudaStream_t)0;

CUDF_EXPECTS(src != nullptr, "Decompression: Source cannot be nullptr");
CUDF_EXPECTS(src_size != 0, "Decompression: Source size cannot be 0");

switch (strm_type) {
switch (stream_type) {
case IO_UNCOMP_STREAM_TYPE_INFER:
case IO_UNCOMP_STREAM_TYPE_GZIP: {
gz_archive_s gz;
if (ParseGZArchive(&gz, raw, src_size)) {
strm_type = IO_UNCOMP_STREAM_TYPE_GZIP;
comp_data = gz.comp_data;
comp_len = gz.comp_len;
uncomp_len = gz.isize;
stream_type = IO_UNCOMP_STREAM_TYPE_GZIP;
comp_data = gz.comp_data;
comp_len = gz.comp_len;
uncomp_len = gz.isize;
}
if (strm_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
}
case IO_UNCOMP_STREAM_TYPE_ZIP: {
zip_archive_s za;
Expand All @@ -337,10 +333,10 @@ void io_uncompress_single_h2d(const void *src,
size_t file_end = file_start + lfh->comp_size;
if (file_end <= src_size) {
// Pick the first valid file of non-zero size (only 1 file expected in archive)
strm_type = IO_UNCOMP_STREAM_TYPE_ZIP;
comp_data = raw + file_start;
comp_len = lfh->comp_size;
uncomp_len = lfh->uncomp_size;
stream_type = IO_UNCOMP_STREAM_TYPE_ZIP;
comp_data = raw + file_start;
comp_len = lfh->comp_size;
uncomp_len = lfh->uncomp_size;
break;
}
}
Expand All @@ -350,20 +346,20 @@ void io_uncompress_single_h2d(const void *src,
}
}
}
if (strm_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
case IO_UNCOMP_STREAM_TYPE_BZIP2:
if (src_size > 4) {
const bz2_file_header_s *fhdr = (const bz2_file_header_s *)raw;
// Check for BZIP2 file signature "BZh1" to "BZh9"
if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' &&
fhdr->blksz >= '1' && fhdr->blksz <= '9') {
strm_type = IO_UNCOMP_STREAM_TYPE_BZIP2;
comp_data = raw;
comp_len = src_size;
uncomp_len = 0;
stream_type = IO_UNCOMP_STREAM_TYPE_BZIP2;
comp_data = raw;
comp_len = src_size;
uncomp_len = 0;
}
}
if (strm_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER
default:
// Unsupported format
break;
Expand All @@ -376,19 +372,18 @@ void io_uncompress_single_h2d(const void *src,
// ~4:1 compression for initial size
}

if (strm_type == IO_UNCOMP_STREAM_TYPE_GZIP || strm_type == IO_UNCOMP_STREAM_TYPE_ZIP) {
if (stream_type == IO_UNCOMP_STREAM_TYPE_GZIP || stream_type == IO_UNCOMP_STREAM_TYPE_ZIP) {
// INFLATE
dst.resize(uncomp_len);
int zerr = cpu_inflate_vector(dst, comp_data, comp_len);
if (zerr != 0) {
dst.resize(0);
CUDF_EXPECTS(0, "Decompression: error in stream");
}
} else if (strm_type == IO_UNCOMP_STREAM_TYPE_BZIP2) {
std::vector<char> dst(uncomp_len);
CUDF_EXPECTS(cpu_inflate_vector(dst, comp_data, comp_len) == 0,
"Decompression: error in stream");
return dst;
}
if (stream_type == IO_UNCOMP_STREAM_TYPE_BZIP2) {
size_t src_ofs = 0;
size_t dst_ofs = 0;
int bz_err = 0;
dst.resize(uncomp_len);
std::vector<char> dst(uncomp_len);
do {
size_t dst_len = uncomp_len - dst_ofs;
bz_err = cpu_bz2_uncompress(
Expand All @@ -405,13 +400,11 @@ void io_uncompress_single_h2d(const void *src,
dst.resize(uncomp_len);
}
} while (bz_err == BZ_OUTBUFF_FULL);
if (bz_err != 0) {
dst.resize(0);
CUDF_EXPECTS(0, "Decompression: error in stream");
}
} else {
CUDF_EXPECTS(0, "Unsupported compressed stream type");
CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream");
return dst;
}

CUDF_FAIL("Unsupported compressed stream type");
}

/**
Expand All @@ -421,12 +414,12 @@ void io_uncompress_single_h2d(const void *src,
* @param[in] h_data Pointer to the csv data in host memory
* @param[in] num_bytes Size of the input data, in bytes
* @param[in] compression String describing the compression type
* @param[out] h_uncomp_data Vector containing the output uncompressed data
**/
void getUncompressedHostData(const char *h_data,
size_t num_bytes,
const std::string &compression,
std::vector<char> &h_uncomp_data)
*
* @return Vector containing the output uncompressed data
*/
std::vector<char> getUncompressedHostData(const char *h_data,
size_t num_bytes,
const std::string &compression)
{
int comp_type = IO_UNCOMP_STREAM_TYPE_INFER;
if (compression == "gzip")
Expand All @@ -438,15 +431,12 @@ void getUncompressedHostData(const char *h_data,
else if (compression == "xz")
comp_type = IO_UNCOMP_STREAM_TYPE_XZ;

io_uncompress_single_h2d(h_data, num_bytes, comp_type, h_uncomp_data);
return io_uncompress_single_h2d(h_data, num_bytes, comp_type);
}

/* --------------------------------------------------------------------------*/
/**
* @Brief ZLIB host decompressor class
*/
/* ----------------------------------------------------------------------------*/

class HostDecompressor_ZLIB : public HostDecompressor {
public:
HostDecompressor_ZLIB(bool gz_hdr_) : gz_hdr(gz_hdr_) {}
Expand All @@ -472,12 +462,9 @@ class HostDecompressor_ZLIB : public HostDecompressor {
const bool gz_hdr;
};

/* --------------------------------------------------------------------------*/
/**
* @Brief SNAPPY host decompressor class
*/
/* ----------------------------------------------------------------------------*/

class HostDecompressor_SNAPPY : public HostDecompressor {
public:
HostDecompressor_SNAPPY() {}
Expand Down Expand Up @@ -569,26 +556,21 @@ class HostDecompressor_SNAPPY : public HostDecompressor {
}
};

/* --------------------------------------------------------------------------*/
/**
* @Brief CPU decompression class
*
* @param stream_type[in] compression method (IO_UNCOMP_STREAM_TYPE_XXX)
*
* @returns corresponding HostDecompressor class, nullptr if failure
*/
/* ----------------------------------------------------------------------------*/

HostDecompressor *HostDecompressor::Create(int stream_type)
std::unique_ptr<HostDecompressor> HostDecompressor::Create(int stream_type)
{
HostDecompressor *decompressor;
switch (stream_type) {
case IO_UNCOMP_STREAM_TYPE_GZIP: decompressor = new HostDecompressor_ZLIB(true); break;
case IO_UNCOMP_STREAM_TYPE_INFLATE: decompressor = new HostDecompressor_ZLIB(false); break;
case IO_UNCOMP_STREAM_TYPE_SNAPPY: decompressor = new HostDecompressor_SNAPPY(); break;
default: decompressor = nullptr; break;
case IO_UNCOMP_STREAM_TYPE_GZIP: return std::make_unique<HostDecompressor_ZLIB>(true);
case IO_UNCOMP_STREAM_TYPE_INFLATE: return std::make_unique<HostDecompressor_ZLIB>(false);
case IO_UNCOMP_STREAM_TYPE_SNAPPY: return std::make_unique<HostDecompressor_SNAPPY>();
}
return decompressor;
CUDF_FAIL("Unsupported compression type");
}

} // namespace io
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,8 @@ table_with_metadata reader::impl::read(size_t range_offset,
h_uncomp_data = reinterpret_cast<const char *>(buffer->data());
h_uncomp_size = buffer->size();
} else {
getUncompressedHostData(reinterpret_cast<const char *>(buffer->data()),
buffer->size(),
compression_type_,
h_uncomp_data_owner);
h_uncomp_data_owner = getUncompressedHostData(
reinterpret_cast<const char *>(buffer->data()), buffer->size(), compression_type_);
h_uncomp_data = h_uncomp_data_owner.data();
h_uncomp_size = h_uncomp_data_owner.size();
}
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,8 @@ void reader::impl::decompress_input()
uncomp_data_ = reinterpret_cast<const char *>(buffer_->data());
uncomp_size_ = buffer_->size();
} else {
getUncompressedHostData(reinterpret_cast<const char *>(buffer_->data()),
buffer_->size(),
compression_type,
uncomp_data_owner_);
uncomp_data_owner_ = getUncompressedHostData(
reinterpret_cast<const char *>(buffer_->data()), buffer_->size(), compression_type);
uncomp_data_ = uncomp_data_owner_.data();
uncomp_size_ = uncomp_data_owner_.size();
}
Expand Down
Loading

0 comments on commit ed84164

Please sign in to comment.