-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest.py
157 lines (122 loc) · 4.47 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from datetime import datetime, timedelta
import json
import random
import unittest
from adh_sample_library_preview import ADHClient, SdsType, SdsStream
from PIOMFClient import OMFMessageAction, OMFMessageType, PIOMFClient
from program import main, getAppsettings, convertType, convertContainer
class SampleE2ETests(unittest.TestCase):
@classmethod
def test_main(cls):
create_test_environment()
main(test=True)
cleanup_test_environment()
test_cds_type = {
"Id": "DataHubToPIType",
"Name": "DataHubToPIType",
"SdsTypeCode": 1,
"Properties": [
{
"Id": "Timestamp",
"Name": "Timestamp",
"IsKey": True,
"SdsType": {
"Name": "DateTime",
"SdsTypeCode": 16,
},
},
{
"Id": "Value",
"Name": "Value",
"SdsType": {
"Name": "NullableDouble",
"SdsTypeCode": 114,
},
}
]
}
test_cds_stream = {
"TypeId": "DataHubToPIType",
"Id": "DataHubToPIStream",
"Name": "DataHubToPIStream"
}
def suppress_error(function):
"""Suppress an error"""
try:
function()
except Exception as error:
print(f'Encountered Error: {error}')
def get_current_time(offset=0):
''' Returns the current time'''
return (datetime.utcnow() - timedelta(seconds=offset)).isoformat() + 'Z'
def create_test_environment():
# Get appsettings
appsettings = getAppsettings()
data_hub_appsettings = appsettings.get('DataHub')
# Create an Cds client
sds_client = ADHClient(
data_hub_appsettings.get('ApiVersion'),
data_hub_appsettings.get('TenantId'),
data_hub_appsettings.get('Resource'),
data_hub_appsettings.get('ClientId'),
data_hub_appsettings.get('ClientSecret'))
namespace_id = data_hub_appsettings.get('NamespaceId')
# Create a type
sds_client.Types.getOrCreateType(
namespace_id, SdsType.fromJson(test_cds_type))
# Create a stream
sds_client.Streams.getOrCreateStream(
namespace_id, SdsStream.fromJson(test_cds_stream))
# Add test data
data = []
data_count = 1000
for i in range(data_count):
data.append({
'Timestamp': get_current_time(i * 5),
'Value': 100*random.random()
})
sds_client.Streams.updateValues(
namespace_id, test_cds_stream.get('Id'), json.dumps(data))
print('Environment Created!')
def cleanup_test_environment():
# Get appsettings
appsettings = getAppsettings()
data_hub_appsettings = appsettings.get('DataHub')
pi_appsettings = appsettings.get('PI')
# Create an Cds client
sds_client = ADHClient(
data_hub_appsettings.get('ApiVersion'),
data_hub_appsettings.get('TenantId'),
data_hub_appsettings.get('Resource'),
data_hub_appsettings.get('ClientId'),
data_hub_appsettings.get('ClientSecret'))
namespace_id = data_hub_appsettings.get('NamespaceId')
# Create an OMF client
pi_omf_client = PIOMFClient(
pi_appsettings.get('Resource'),
pi_appsettings.get('DataArchiveName'),
pi_appsettings.get('Username'),
pi_appsettings.get('Password'),
pi_appsettings.get('OMFVersion', '1.2'),
pi_appsettings.get('VerifySSL', True)
)
# Generate a prefix for later
namespaces = sds_client.Namespaces.getNamespaces()
namespace = [n for n in namespaces if n.Id == namespace_id]
prefix = f'{namespace[0].Description}'
# Cleanup Cds stream
suppress_error(lambda: sds_client.Streams.deleteStream(
namespace_id, test_cds_stream.get('Id')))
# Cleanup Cds type
suppress_error(lambda: sds_client.Types.deleteType(
namespace_id, test_cds_type.get('Id')))
# Cleanup OMF container
resolved_stream = SdsStream.fromJson(test_cds_stream)
resolved_stream.Type = SdsType.fromJson(test_cds_type)
suppress_error(lambda: pi_omf_client.omfRequest(OMFMessageType.Container, OMFMessageAction.Delete, [
convertContainer(resolved_stream, prefix)]))
# Cleanup OMF type
suppress_error(lambda: pi_omf_client.omfRequest(OMFMessageType.Type, OMFMessageAction.Delete, [
convertType(SdsType.fromJson(test_cds_type), prefix)]))
if __name__ == '__main__':
unittest.main()