Skip to content

Commit

Permalink
Merge pull request #8 from imubit/add-max-results-to-read-period
Browse files Browse the repository at this point in the history
add support for max_results argument
  • Loading branch information
cloud-rocket authored Nov 15, 2023
2 parents a1e1917 + fd845e6 commit e1552e9
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 31 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ dagent exec list_tags --conn_name=pi
dagent exec read_tag_values_period --conn_name=pi --tags="['sinusoid', 'sinusoidu']" --first_timestamp=*-100h --last_timestamp=*
dagent exec copy_period --src_conn=pi --tags="['SINUSOID', 'sinusoidu']" --dest_conn=csv --dest_group='sinus.csv' --first_timestamp=*-100h --last_timestamp=*
```


## Troubleshooting

### OSIsoft.AF.PI.PITimeoutException when reading historical data

Increase the SDK data access timeout settings on the client machine. There are two timeouts for the SDK, a connection timeout and a data access timeout. The connection timeout default is 10 seconds. The data access timeout is 60 seconds. Data access timeouts are the most likely cause of the error.
* Launch AboutPI-SDK.exe.
* Navigate to the Connections tab.
* Select the PI Data Archive in question.
* Increase the Data Access Timeout and Connection Timeout to 120 seconds or more.
45 changes: 29 additions & 16 deletions src/data_agent_osisoft_pi/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ def timestamp_to_datetime(timestamp):

MAP_TIME_FREQUENCY_TO_PI = {"raw data": None}

DEFAULT_PAGE_SIZE = 200000


def cast2python(val):
if str(type(val)) == "<class 'System.DateTime'>":
Expand Down Expand Up @@ -121,13 +119,17 @@ class OsisoftPiConnector(AbstractConnector):
("compressing", {"Type": "int", "Name": "Compression"}),
("changer", {"Type": "str", "Name": "Modified By"}),
]
DEFAULT_PAGE_SIZE = 200000
ABSOLUTE_MAX_VALUES_TO_READ = 1000000000

def __init__(self, conn_name="pi_client", server_name="default", **kwargs):
super(OsisoftPiConnector, self).__init__(conn_name)
self._server = None
self._server_name = server_name
self._page_size = (
int(kwargs["page_size"]) if "page_size" in kwargs else DEFAULT_PAGE_SIZE
int(kwargs["page_size"])
if "page_size" in kwargs
else OsisoftPiConnector.DEFAULT_PAGE_SIZE
)

@staticmethod
Expand All @@ -146,7 +148,7 @@ def list_connection_fields():
"name": "Data Read Page Size",
"type": "list",
"values": ["200000", "20000", "10000", "5000"],
"default_value": DEFAULT_PAGE_SIZE,
"default_value": OsisoftPiConnector.DEFAULT_PAGE_SIZE,
"optional": False,
},
}
Expand Down Expand Up @@ -189,6 +191,7 @@ def connect(self):
)

try:
# https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIServer_Connect.htm
self._server.Connect(force=True)

log.debug(f"Connected to {self._server_name}, page_size={self._page_size}")
Expand Down Expand Up @@ -313,6 +316,7 @@ def read_tag_values_period(
first_timestamp=None,
last_timestamp=None,
time_frequency=None,
max_results=None,
result_format="dataframe",
progress_callback=None,
):
Expand All @@ -325,6 +329,8 @@ def read_tag_values_period(
if isinstance(last_timestamp, datetime):
last_timestamp = last_timestamp.strftime("%Y/%m/%d %H:%M:%S")

total_values_to_read = max_results or self.ABSOLUTE_MAX_VALUES_TO_READ

assert result_format in ["dataframe", "series", "tuple"]

names = tags
Expand Down Expand Up @@ -364,20 +370,18 @@ def read_tag_values_period(
time_span = AFTimeSpan.Parse(freq)

next_start_time = start_time
next_end_time = (
time_span.Multiply(next_start_time, self._page_size)
if time_span.Multiply(next_start_time, self._page_size)
< time_range.EndTime
else time_range.EndTime
)

# print('starting')
# print(f'range: {time_range} next_start_time={next_start_time}, next_end_time={next_end_time}')

while next_start_time < time_range.EndTime:
while (
next_start_time < time_range.EndTime
and total_values_to_read > 0
):
values_to_read = min(self._page_size, total_values_to_read)

next_end_time = (
time_span.Multiply(next_start_time, self._page_size)
if time_span.Multiply(next_start_time, self._page_size)
time_span.Multiply(next_start_time, values_to_read - 1)
if time_span.Multiply(next_start_time, values_to_read - 1)
< time_range.EndTime
else time_range.EndTime
)
Expand All @@ -392,6 +396,8 @@ def read_tag_values_period(
if records.Count == 0:
break

total_values_to_read -= records.Count

formatted_data = {
timestamp_to_datetime(val.Timestamp.UtcTime): val.Value
for val in records
Expand Down Expand Up @@ -419,19 +425,26 @@ def read_tag_values_period(
# print('starting')
# print(f'range: {time_range} next_start_time={next_start_time}')

while next_start_time < time_range.EndTime:
while (
next_start_time < time_range.EndTime
and total_values_to_read > 0
):
page_time_range = AFTimeRange(
next_start_time, time_range.EndTime
)

values_to_read = min(self._page_size, total_values_to_read)

# https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIPoint_RecordedValues.htm
records = pt.RecordedValues(
page_time_range, boundary, "", False, self._page_size
page_time_range, boundary, "", False, values_to_read
)

if records.Count == 0:
break

total_values_to_read -= records.Count

formatted_data = {
timestamp_to_datetime(val.Timestamp.UtcTime): val.Value
for val in records
Expand Down
47 changes: 32 additions & 15 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,18 @@ def test_read_tag_values_period_interpolated(target_conn):
time_frequency="1 minute",
)

assert len(df) == 6001
assert len(df) == 6000
assert list(df.columns) == ["SINUSOIDU"]

df = target_conn.read_tag_values_period(
["sinusoidu"],
first_timestamp="*-100h",
last_timestamp="*",
time_frequency="1 minute",
max_results=10,
)

assert len(df) == 10
assert list(df.columns) == ["SINUSOIDU"]

df = target_conn.read_tag_values_period(
Expand All @@ -96,23 +107,11 @@ def test_read_tag_values_period_interpolated(target_conn):
time_frequency="3 minutes",
)

assert len(df) == 2001
assert len(df) == 2000
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]


def test_read_tag_values_period(target_conn):
# target_conn.read_tag_values(["sinusoid", "sinusoidu", "cdt158", "cdm158"],
# first_timestamp='2022/09/02 00:00:05',
# last_timestamp='2022/09/10 00:00:10',
# )

# df = target_conn.read_tag_values_period(["sinusoid", "sinusoidu"],
# first_timestamp='*-100h',
# last_timestamp='*',
# )
# assert 72 > len(df) > 10
# assert list(df.columns) == ['SINUSOID', 'SINUSOIDU']

def test_read_tag_values_period_recorded(target_conn):
df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
# first_timestamp='*-100h',
Expand All @@ -129,6 +128,24 @@ def test_read_tag_values_period(target_conn):
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]

df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
first_timestamp="2020-04-15 12:00:00",
last_timestamp="2020-05-16 12:00:00",
max_results=10,
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]
assert len(df) == 10

df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
first_timestamp="2020-04-15 12:00:00",
last_timestamp="2020-12-16 12:00:00",
max_results=2000,
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]
assert len(df) == 2000


def test_read_tag_attributes(target_conn):
res = target_conn.read_tag_attributes(
Expand Down

0 comments on commit e1552e9

Please sign in to comment.