Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-30: [Python] Routines for converting between arrow::Array/Table and pandas.DataFrame #46

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions ci/travis_script_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,11 @@ python_version_tests() {

python setup.py build_ext --inplace

py.test -vv -r sxX pyarrow
python -m pytest -vv -r sxX pyarrow
}

# run tests for python 2.7 and 3.5
python_version_tests 2.7
python_version_tests 3.5

# if [ $TRAVIS_OS_NAME == "linux" ]; then
# valgrind --tool=memcheck py.test -vv -r sxX arrow
# else
# py.test -vv -r sxX arrow
# fi

popd
6 changes: 3 additions & 3 deletions cpp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ Detailed unit test logs will be placed in the build directory under `build/test-

### Building/Running benchmarks

Follow the directions for simple build except run cmake
Follow the directions for simple build except run cmake
with the `--ARROW_BUILD_BENCHMARKS` parameter set correctly:

cmake -DARROW_BUILD_BENCHMARKS=ON ..

and instead of make unittest run either `make; ctest` to run both unit tests
and instead of make unittest run either `make; ctest` to run both unit tests
and benchmarks or `make runbenchmark` to run only the benchmark tests.

Benchmark logs will be placed in the build directory under `build/benchmark-logs`.
Expand All @@ -60,4 +60,4 @@ variables

* Googletest: `GTEST_HOME` (only required to build the unit tests)
* Google Benchmark: `GBENCHMARK_HOME` (only required if building benchmarks)

* Flatbuffers: `FLATBUFFERS_HOME` (only required for the IPC extensions)
13 changes: 7 additions & 6 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ class Buffer;
//
// The base class is only required to have a null bitmap buffer if the null
// count is greater than 0
//
// Any buffers used to initialize the array have their references "stolen". If
// you wish to use the buffer beyond the lifetime of the array, you need to
// explicitly increment its reference count
class Array {
public:
Array(const TypePtr& type, int32_t length, int32_t null_count = 0,
Array(const std::shared_ptr<DataType>& type, int32_t length,
int32_t null_count = 0,
const std::shared_ptr<Buffer>& null_bitmap = nullptr);

virtual ~Array() {}
Expand All @@ -60,11 +57,15 @@ class Array {
return null_bitmap_;
}

const uint8_t* null_bitmap_data() const {
return null_bitmap_data_;
}

bool EqualsExact(const Array& arr) const;
virtual bool Equals(const std::shared_ptr<Array>& arr) const = 0;

protected:
TypePtr type_;
std::shared_ptr<DataType> type_;
int32_t null_count_;
int32_t length_;

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/types/string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@
#include <sstream>
#include <string>

#include "arrow/type.h"

namespace arrow {

const std::shared_ptr<DataType> STRING(new StringType());

StringArray::StringArray(int32_t length,
const std::shared_ptr<Buffer>& offsets,
const ArrayPtr& values, int32_t null_count,
const std::shared_ptr<Buffer>& null_bitmap) :
StringArray(STRING, length, offsets, values, null_count, null_bitmap) {}

std::string CharType::ToString() const {
std::stringstream s;
s << "char(" << size << ")";
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/types/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ class StringArray : public ListArray {
const std::shared_ptr<Buffer>& offsets,
const ArrayPtr& values,
int32_t null_count = 0,
const std::shared_ptr<Buffer>& null_bitmap = nullptr) :
StringArray(std::make_shared<StringType>(), length, offsets, values,
null_count, null_bitmap) {}
const std::shared_ptr<Buffer>& null_bitmap = nullptr);

// Compute the pointer t
const uint8_t* GetValue(int i, int32_t* out_length) const {
Expand Down
42 changes: 42 additions & 0 deletions cpp/src/arrow/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#ifndef ARROW_UTIL_BUFFER_H
#define ARROW_UTIL_BUFFER_H

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>

#include "arrow/util/macros.h"
#include "arrow/util/status.h"

namespace arrow {

Expand Down Expand Up @@ -146,6 +148,46 @@ class PoolBuffer : public ResizableBuffer {
MemoryPool* pool_;
};

static constexpr int64_t MIN_BUFFER_CAPACITY = 1024;

class BufferBuilder {
public:
explicit BufferBuilder(MemoryPool* pool) :
pool_(pool),
capacity_(0),
size_(0) {}

Status Append(const uint8_t* data, int length) {
if (capacity_ < length + size_) {
if (capacity_ == 0) {
buffer_ = std::make_shared<PoolBuffer>(pool_);
}
capacity_ = std::max(MIN_BUFFER_CAPACITY, capacity_);
while (capacity_ < length + size_) {
capacity_ *= 2;
}
RETURN_NOT_OK(buffer_->Resize(capacity_));
data_ = buffer_->mutable_data();
}
memcpy(data_ + size_, data, length);
size_ += length;
return Status::OK();
}

std::shared_ptr<Buffer> Finish() {
auto result = buffer_;
buffer_ = nullptr;
return result;
}

private:
std::shared_ptr<PoolBuffer> buffer_;
MemoryPool* pool_;
uint8_t* data_;
int64_t capacity_;
int64_t size_;
};

} // namespace arrow

#endif // ARROW_UTIL_BUFFER_H
6 changes: 5 additions & 1 deletion python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,12 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}")

## Python and libraries
find_package(PythonLibsNew REQUIRED)
find_package(NumPy REQUIRED)
include(UseCython)

include_directories(SYSTEM
${NUMPY_INCLUDE_DIRS}
${PYTHON_INCLUDE_DIRS}
src)

############################################################
Expand Down Expand Up @@ -409,11 +412,12 @@ add_subdirectory(src/pyarrow/util)

set(PYARROW_SRCS
src/pyarrow/common.cc
src/pyarrow/config.cc
src/pyarrow/helpers.cc
src/pyarrow/init.cc
src/pyarrow/status.cc

src/pyarrow/adapters/builtin.cc
src/pyarrow/adapters/pandas.cc
)

set(LINK_LIBS
Expand Down
8 changes: 6 additions & 2 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

# flake8: noqa

from pyarrow.array import (Array, from_pylist, total_allocated_bytes,
import pyarrow.config

from pyarrow.array import (Array,
from_pandas_series, from_pylist,
total_allocated_bytes,
BooleanArray, NumericArray,
Int8Array, UInt8Array,
ListArray, StringArray)
Expand All @@ -37,4 +41,4 @@
list_, struct, field,
DataType, Field, Schema, schema)

from pyarrow.array import RowBatch
from pyarrow.array import RowBatch, Table, from_pandas_dataframe
135 changes: 135 additions & 0 deletions python/pyarrow/array.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow

import pyarrow.config

from pyarrow.compat import frombytes, tobytes
from pyarrow.error cimport check_status

Expand All @@ -44,6 +46,10 @@ cdef class Array:
self.type = DataType()
self.type.init(self.sp_array.get().type())

@staticmethod
def from_pandas(obj, mask=None):
return from_pandas_series(obj, mask)

property null_count:

def __get__(self):
Expand Down Expand Up @@ -160,7 +166,15 @@ cdef class StringArray(Array):
cdef dict _array_classes = {
Type_NA: NullArray,
Type_BOOL: BooleanArray,
Type_UINT8: UInt8Array,
Type_UINT16: UInt16Array,
Type_UINT32: UInt32Array,
Type_UINT64: UInt64Array,
Type_INT8: Int8Array,
Type_INT16: Int16Array,
Type_INT32: Int32Array,
Type_INT64: Int64Array,
Type_FLOAT: FloatArray,
Type_DOUBLE: DoubleArray,
Type_LIST: ListArray,
Type_STRING: StringArray,
Expand Down Expand Up @@ -194,6 +208,49 @@ def from_pylist(object list_obj, DataType type=None):

return box_arrow_array(sp_array)


def from_pandas_series(object series, object mask=None):
cdef:
shared_ptr[CArray] out

series_values = series_as_ndarray(series)

if mask is None:
check_status(pyarrow.PandasToArrow(pyarrow.GetMemoryPool(),
series_values, &out))
else:
mask = series_as_ndarray(mask)
check_status(pyarrow.PandasMaskedToArrow(
pyarrow.GetMemoryPool(), series_values, mask, &out))

return box_arrow_array(out)


def from_pandas_dataframe(object df, name=None):
cdef:
list names = []
list arrays = []

for name in df.columns:
col = df[name]
arr = from_pandas_series(col)

names.append(name)
arrays.append(arr)

return Table.from_arrays(names, arrays, name=name)


cdef object series_as_ndarray(object obj):
import pandas as pd

if isinstance(obj, pd.Series):
result = obj.values
else:
result = obj

return result

#----------------------------------------------------------------------
# Table-like data structures

Expand Down Expand Up @@ -225,3 +282,81 @@ cdef class RowBatch:

def __getitem__(self, i):
return self.arrays[i]


cdef class Table:
'''
Do not call this class's constructor directly.
'''
cdef:
shared_ptr[CTable] sp_table
CTable* table

def __cinit__(self):
pass

cdef init(self, const shared_ptr[CTable]& table):
self.sp_table = table
self.table = table.get()

@staticmethod
def from_pandas(df, name=None):
pass

@staticmethod
def from_arrays(names, arrays, name=None):
cdef:
Array arr
Table result
c_string c_name
vector[shared_ptr[CField]] fields
vector[shared_ptr[CColumn]] columns
shared_ptr[CSchema] schema
shared_ptr[CTable] table

cdef int K = len(arrays)

fields.resize(K)
columns.resize(K)
for i in range(K):
arr = arrays[i]
c_name = tobytes(names[i])

fields[i].reset(new CField(c_name, arr.type.sp_type, True))
columns[i].reset(new CColumn(fields[i], arr.sp_array))

if name is None:
c_name = ''
else:
c_name = tobytes(name)

schema.reset(new CSchema(fields))
table.reset(new CTable(c_name, schema, columns))

result = Table()
result.init(table)

return result

def to_pandas(self):
"""
Convert the arrow::Table to a pandas DataFrame
"""
cdef:
PyObject* arr
shared_ptr[CColumn] col

import pandas as pd

names = []
data = []
for i in range(self.table.num_columns()):
col = self.table.column(i)
check_status(pyarrow.ArrowToPandas(col, &arr))
names.append(frombytes(col.get().name()))
data.append(<object> arr)

# One ref count too many
Py_XDECREF(arr)

return pd.DataFrame(dict(zip(names, data)), columns=names)
13 changes: 12 additions & 1 deletion python/pyarrow/config.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@
# distutils: language = c++
# cython: embedsignature = True

cdef extern from 'pyarrow/init.h' namespace 'pyarrow':
cdef extern from 'pyarrow/do_import_numpy.h':
pass

cdef extern from 'pyarrow/numpy_interop.h' namespace 'pyarrow':
int import_numpy()

cdef extern from 'pyarrow/config.h' namespace 'pyarrow':
void pyarrow_init()
void pyarrow_set_numpy_nan(object o)

import_numpy()
pyarrow_init()

import numpy as np
pyarrow_set_numpy_nan(np.nan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need explicitly numpy's nan here? Couldn't we use libc.math.NAN or libc.math.isnan instead? Then PyObject_is_null could save a memory access to the heap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it, we compare memory addresses, not content in PyObject_is_null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is a pandas thing -- there are object arrays with numpy.nan floating around in them -- otherwise you have to PyFloat_Check and then unbox to double, and only then verify that it is NaN.

Loading