Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for the issue 3503. #3519

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,17 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName,
{
input_data += j.position;
}

if (j.shape.size() > 0 and j.shape[0] > 1 and j.start.size() > 0 and
j.start.size() == j.count.size() and
j.start.size() == varStart.size() and
j.start.size() == varCount.size())
/* single values */
if (j.shape.empty() or
std::all_of(j.shape.begin(), j.shape.end(),
[&](size_t i) { return i == 1; }))
{
std::memcpy(reinterpret_cast<char *>(outputData), input_data,
sizeof(T));
}
else if (j.start.size() > 0 and j.start.size() == j.count.size() and
j.start.size() == varStart.size() and
j.start.size() == varCount.size())
{
if (m_ContiguousMajor)
{
Expand All @@ -284,10 +290,12 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName,
sizeof(T), j.start, j.count, varMemStart, varMemCount);
}
}
if (j.shape.empty() or (j.shape.size() == 1 and j.shape[0] == 1))
else
{
std::memcpy(reinterpret_cast<char *>(outputData), input_data,
sizeof(T));
throw std::runtime_error(
"DataManSerializer::GeData end with Step \" + "
"std::to_string(step) +\n"
" \" Var \" + varName failed");
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions testing/adios2/engine/dataman/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ foreach(tst IN ITEMS
)
endforeach()

if (ADIOS2_HAVE_Python)
python_add_test(NAME Test.Engine.DataMan1D.Serial SCRIPT TestDataMan1D.py)
python_add_test(NAME Test.Engine.DataMan1xN.Serial SCRIPT TestDataMan1xN.py)
python_add_test(NAME Test.Engine.DataManSingleValues SCRIPT TestDataManSingleValues.py)
endif()

if(ADIOS2_HAVE_ZFP)
gtest_add_tests_helper(2DZfp MPI_NONE DataMan Engine.DataMan. "")
set_tests_properties(${Test.Engine.DataMan.2DZfp-TESTS}
Expand Down
91 changes: 91 additions & 0 deletions testing/adios2/engine/dataman/TestDataMan1D.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin [email protected]
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 10
self.fill_value = 1.0
self.shape = [self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
bufshape = recvar.Shape()
self.assertTrue(bufshape[0] == self.Nx)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()
92 changes: 92 additions & 0 deletions testing/adios2/engine/dataman/TestDataMan1xN.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin [email protected]
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 10
self.fill_value = 1.0
self.shape = [1, self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
bufshape = recvar.Shape()
self.assertTrue(bufshape[0] == 1)
self.assertTrue(bufshape[1] == self.Nx)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[0][i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()
89 changes: 89 additions & 0 deletions testing/adios2/engine/dataman/TestDataManSingleValues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin [email protected]
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 1
self.fill_value = 1.0
self.shape = [self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()