-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.py
142 lines (110 loc) · 3.52 KB
/
streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Stream type classes for tap-neon."""
from __future__ import annotations
import typing as t
from tap_neon.client import NeonStream
if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
__all__ = [
"Projects",
"Operations",
"Branches",
"Databases",
"Roles",
"Endpoints",
]
class Projects(NeonStream):
"""Projects stream."""
name = "projects"
path = "/projects"
primary_keys = ("id",)
replication_key = None
swagger_ref = "ProjectListItem"
records_jsonpath = "$.projects[*]"
def get_child_context(
self,
record: Record,
context: Context | None, # noqa: ARG002
) -> dict[str, t.Any]:
"""Return the child context for this record.
Args:
record: The record to get the child context for.
context: The parent context.
Returns:
The child context.
"""
return {"project_id": record["id"]}
def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
"""Initialize the projects stream."""
super().__init__(*args, **kwargs)
self._schema["properties"]["default_endpoint_settings"]["properties"][
"autoscaling_limit_min_cu"
]["minimum"] = 0
self._schema["properties"]["default_endpoint_settings"]["properties"][
"autoscaling_limit_max_cu"
]["minimum"] = 0
class Operations(NeonStream):
"""Operations stream."""
name = "operations"
path = "/projects/{project_id}/operations"
primary_keys = ("id",)
replication_key = None
swagger_ref = "Operation"
records_jsonpath = "$.operations[*]"
next_page_token_jsonpath = "$.pagination.cursor" # noqa: S105
parent_stream_type = Projects
class Branches(NeonStream):
"""Branches stream."""
name = "branches"
path = "/projects/{project_id}/branches"
primary_keys = ("id",)
replication_key = None
swagger_ref = "Branch"
records_jsonpath = "$.branches[*]"
parent_stream_type = Projects
def get_child_context(
self,
record: Record,
context: Context | None, # noqa: ARG002
) -> dict[str, t.Any]:
"""Add branch_id to context.
Args:
record: A record dict.
context: A context dict.
Returns:
The updated context dict.
"""
return {
"project_id": record["project_id"],
"branch_id": record["id"],
}
class Databases(NeonStream):
"""Databases stream."""
name = "databases"
path = "/projects/{project_id}/branches/{branch_id}/databases"
primary_keys = ("id",)
replication_key = None
swagger_ref = "Database"
records_jsonpath = "$.databases[*]"
parent_stream_type = Branches
class Roles(NeonStream):
"""Roles stream."""
name = "roles"
path = "/projects/{project_id}/branches/{branch_id}/roles"
primary_keys = ("name",)
replication_key = None
swagger_ref = "Role"
records_jsonpath = "$.roles[*]"
parent_stream_type = Branches
class Endpoints(NeonStream):
"""Endpoints stream."""
name = "endpoints"
path = "/projects/{project_id}/endpoints"
primary_keys = ("id",)
replication_key = None
swagger_ref = "Endpoint"
records_jsonpath = "$.endpoints[*]"
parent_stream_type = Projects
def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
"""Initialize the endpoints stream."""
super().__init__(*args, **kwargs)
self._schema["properties"]["pooler_mode"]["enum"].append("")