-
Notifications
You must be signed in to change notification settings - Fork 0
/
github_activity.py
40 lines (32 loc) · 983 Bytes
/
github_activity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from dataclasses import dataclass
from typing import Any, List
import os
from github import Github
from temporalio import activity
TASK_QUEUE_NAME = "stargazers"
# using an access token
g = Github(os.environ["GITHUB_TOKEN"])
@dataclass
class GitHubRepo:
name: str
@activity.defn
async def star_gazers(gh: GitHubRepo) -> List[Any]:
repo = g.get_repo(gh.name)
stargazers = repo.get_stargazers()
users = []
activity.heartbeat("Getting stargazers")
for user in stargazers:
user_row = {
"login": user.login,
"followers": user.followers,
"following": user.following,
"public_gists": user.public_gists,
"public_repos": user.public_repos,
"created_at": user.created_at.date().isoformat(),
"email": user.email,
"bio": user.bio,
"blog": user.blog,
"hireable": user.hireable,
}
users.append(user_row)
return users