-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
executable file
·116 lines (86 loc) · 3.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
#
# (C) 2020 Jannik Vogel
#
# Licensed under AGPLv3 only.
# See LICENSE.md for more information.
#
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
import requests
# Set up your server here
HOST = "localhost"
PORT = 8000
# Set up your repository here
OWNER="xqemu"
REPO="xqemu"
BRANCH="master"
WORKFLOW_NAME="Build (Windows)"
WORKFLOW_EVENT="push"
ARTIFACT_NAME="xqemu-release"
API_URL="https://api.github.com"
def get_workflow_id(workflow_name):
response = requests.get("%s/repos/%s/%s/actions/workflows" % (API_URL, OWNER, REPO))
print(response.status_code)
json = response.json()
for workflow in json['workflows']:
if workflow['name'] == workflow_name:
print(workflow['name'])
return workflow['id']
return None #FIXME: Exception
def get_latest_workflow_run_id(workflow_id, workflow_event):
response = requests.get("%s/repos/%s/%s/actions/workflows/%s/runs" % (API_URL, OWNER, REPO, workflow_id))
print(response.status_code)
json = response.json()
for workflow_run in json['workflow_runs']:
# Only consider completed runs
if workflow_run['status'] != "completed":
continue
if workflow_run['conclusion'] != "success":
continue
# Match by BRANCH
if workflow_run['head_branch'] != BRANCH:
continue
# Match by event
if workflow_run['event'] != workflow_event:
continue
return workflow_run['id']
return None #FIXME: Exception
def get_artifact_id(workflow_run_id, name):
response = requests.get("%s/repos/%s/%s/actions/runs/%s/artifacts" % (API_URL, OWNER, REPO, workflow_run_id))
print(response.status_code)
json = response.json()
for artifact in json['artifacts']:
# Match by name
if artifact['name'] == name:
return artifact['id']
return None #FIXME: Exception
def get_latest_artifact_url(workflow_name, worfklow_event, artifact_name):
workflow_id = get_workflow_id(workflow_name)
print("found workflow %d" % workflow_id)
workflow_run_id = get_latest_workflow_run_id(workflow_id, worfklow_event)
print("found workflow_run_id %d" % workflow_run_id)
artifact_id = get_artifact_id(workflow_run_id, artifact_name)
print("found artifact_id %d" % workflow_run_id)
return "%s/repos/%s/%s/actions/artifacts/%s/zip" % (API_URL, OWNER, REPO, artifact_id)
class httpHandler(BaseHTTPRequestHandler):
def do_GET(self):
#FIXME: Decode settings from URL, so this is generic
print(self.path)
workflow_name = WORKFLOW_NAME
workflow_event = WORKFLOW_EVENT
artifact_name = ARTIFACT_NAME
#FIXME: Add error handling
artifact_url = get_latest_artifact_url(workflow_name, workflow_event, artifact_name)
# Redirect user
self.send_response(302)
self.send_header('Location', artifact_url)
self.end_headers()
if __name__ == "__main__":
# Code to test the URL fetcher
if False:
artifact_url = get_latest_artifact_url(WORKFLOW_NAME, WORKFLOW_EVENT, ARTIFACT_NAME)
print(artifact_url)
server = HTTPServer((HOST, PORT), httpHandler)
print("Starting server at http://%s:%d" % (HOST, PORT))
server.serve_forever()