-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjf_poll.py
75 lines (59 loc) · 2.42 KB
/
jf_poll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import requests
import time
import os
from requests.structures import CaseInsensitiveDict
def notification(content, date="20220505", check_grammar=False):
#replace print with, or add the call to the trrrBird block
print(content)
try:
# improvement_id = "690875a6-2ac9-46bc-99f1-de195a326eae"
# improvement_slug ="improvement-improvement-test-4166174"
improvement_id = os.environ['IMPROVEMENT_ID']
improvement_slug = os.environ['IMPROVEMENT_SLUG']
bearer = os.environ['JF_TOKEN']
except:
print("ENVIRONMENT VARIABLES NOT AVAILABLE")
quit()
progress = -1
status = ""
# Configuring JF call
url = "https://api.ly.fish/api/v2/query"
headers = CaseInsensitiveDict()
headers["Accept"] = "application/json"
headers["Authorization"] = "Bearer " + bearer
headers["Content-Type"] = "application/json"
data = """
{
"query":{"description":"Fetch all contracts linked to """ + improvement_slug + """","type":"object","required":["id","type"],"properties":{"id":{"const":\"""" + improvement_id + """"}},"$$links":{"is implemented by":{"type":"object","required":["type"],"additionalProperties":false,"properties":{"type":{"const":"[email protected]"}}}}},"options":{"limit":1,"mask":{"type":"object","required":["loop"],"properties":{"loop":{"enum":["[email protected]",null],"type":"string"}}}}
}
"""
#looping queries to JF to detect progress changes
while(1):
try:
resp = requests.post(url, headers=headers, data=data)
except Exception as e:
print("EXCEPTION accesing JF.")
print(e)
quit()
try:
content = resp.json()
#newprogress = content['data'][0]['data']['milestonesPercentComplete']
newstatus = content['data'][0]['data']['status']
name = content['data'][0]['name']
description = content['data'][0]['data']['description']
print("Read status: %s" % newstatus)
if status == "":
print("Registering initial status value: %s" %newstatus)
status = newstatus
else:
if status != newstatus:
print("Recording new status: %s" %newstatus)
status = newstatus
if status == "completed":
print("IMPROVEMENT COMPLETE")
notification("We have completed the %s improvement!! %s" %(name, description) )
time.sleep(5)
except Exception as e:
print("EXCEPTION accesing data. ")
print(e)
quit()