Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "optimize pyreader" #13787

Merged
merged 1 commit into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion paddle/fluid/API.spec
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ endif(NOT WIN32)
if(WITH_INFERENCE)
# NOTE: please add subdirectory inference at last.
add_subdirectory(inference)
add_subdirectory(train)
endif()

add_subdirectory(train)
325 changes: 114 additions & 211 deletions python/paddle/fluid/layers/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

__all__ = [
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
'Preprocessor', 'load'
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
]


Expand Down Expand Up @@ -471,158 +470,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var)


def _py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True,
feed_list=None):

if feed_list is not None:
if not isinstance(feed_list, list):
raise TypeError("feed_list should be a list of Variable"
" instead of " + str(type(feed_list)))
lod_levels = []
dtypes = []
shape_concat = []
ranks = []
shapes = []

for data in feed_list:
dtypes.append(data.dtype)
shape_concat.extend(data.shape)
ranks.append(len(data.shape))
shapes.append(data.shape)
lod_levels.append(data.lod_level)
else:
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

if lod_levels is None:
lod_levels = [0] * len(shapes)

if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func

def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
actual_feed_list = feed_list
if actual_feed_list is None:
actual_feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
actual_feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

feeder = DataFeeder(
feed_list=actual_feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)

def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]

__set_tensor_provider__(__tensor_provider__)

def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False

def __start__():
start_provide_thread(reader.tensor_provider)

reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__

return reader


def py_reader(capacity,
shapes,
dtypes,
Expand Down Expand Up @@ -747,72 +594,128 @@ def py_reader(capacity,
>>> except fluid.core.EOFException:
>>> test_reader.reset()
"""
return _py_reader(
capacity=capacity,
shapes=shapes,
dtypes=dtypes,
lod_levels=lod_levels,
name=name,
use_double_buffer=use_double_buffer)
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

def create_py_reader_by_data(capacity,
feed_list,
name=None,
use_double_buffer=True):
"""
Create a Python reader for data feeding in Python
if lod_levels is None:
lod_levels = [0] * len(shapes)

This layer returns a Reader Variable.
if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

Works much like py_reader except that it's input is feed_list
instead of shapes, dtypes and lod_levels
var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

Args:
capacity(int): The buffer capacity maintained by :code:`py_reader`.
feed_list(list(Variable)): The data feed list.
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
use_double_buffer(bool): Whether use double buffer or not.
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

Returns:
Variable: A Reader from which we can get feeding data.
startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

Examples:
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

1. The basic usage of :code:`py_reader` is as follows:
reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

>>> import paddle.fluid as fluid
>>> import paddle.dataset.mnist as mnist
>>>
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
>>> reader.decorate_paddle_reader(
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
>>>
>>> img, label = fluid.layers.read_file(reader)
>>> loss = network(img, label) # some network definition
>>>
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
>>>
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
>>> for epoch_id in range(10):
>>> reader.start()
>>> try:
>>> while True:
>>> exe.run(fetch_list=[loss.name])
>>> except fluid.core.EOFException:
>>> reader.reset()
"""
return _py_reader(
capacity=capacity,
shapes=None,
dtypes=None,
lod_levels=None,
name=name,
use_double_buffer=use_double_buffer,
feed_list=feed_list)
# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func

def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)

def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]

__set_tensor_provider__(__tensor_provider__)

def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False

def __start__():
start_provide_thread(reader.tensor_provider)

reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__

return reader


def open_files(filenames,
Expand Down
Loading