Skip to content

Commit

Permalink
Source Github: add custom pagination size for large streams
Browse files Browse the repository at this point in the history
  • Loading branch information
yevhenii-ldv committed Jan 20, 2022
1 parent 9afbbff commit 974e4d8
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)

TOKEN_SEPARATOR = ","
DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM = 10
# To scan all the repos within orgnaization, organization name could be
# specified by using asteriks i.e. "airbytehq/*"
ORGANIZATION_PATTERN = re.compile("^.*/\\*$")
Expand Down Expand Up @@ -140,6 +141,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
repository_stats_stream = RepositoryStats(
authenticator=authenticator,
repositories=repositories,
page_size_for_large_streams=config.get("page_size_for_large_streams", DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM),
)
for stream_slice in repository_stats_stream.stream_slices(sync_mode=SyncMode.full_refresh):
next(repository_stats_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)
Expand All @@ -153,9 +155,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
repositories = repos + organization_repos

organizations = list({org.split("/")[0] for org in repositories})
page_size = config.get("page_size_for_large_streams", DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM)

organization_args = {"authenticator": authenticator, "organizations": organizations}
repository_args = {"authenticator": authenticator, "repositories": repositories}
repository_args = {"authenticator": authenticator, "repositories": repositories, "page_size_for_large_streams": page_size}
repository_args_with_start_date = {**repository_args, "start_date": config["start_date"]}

default_branches, branches_to_pull = self._get_branches_data(config.get("branch", ""), repository_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
}
]
},

"repository": {
"type": "string",
"examples": ["airbytehq/airbyte", "airbytehq/*"],
Expand All @@ -71,6 +70,14 @@
"title": "Branch",
"examples": ["airbytehq/airbyte/master"],
"description": "Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled."
},
"page_size_for_large_streams": {
"type": "integer",
"title": "Page size for large streams",
"minimum": 1,
"maximum": 100,
"default": 10,
"description": "The Github connector contains several streams with a large load. The page size of such streams depends on the size of your repository. Recommended to specify values between 10 and 30."
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from requests.exceptions import HTTPError

DEFAULT_PAGE_SIZE = 100


class GithubStream(HttpStream, ABC):
url_base = "https://api.github.com/"

primary_key = "id"
use_cache = True

# GitHub pagination could be from 1 to 100.
page_size = 100
# Detect streams with high API load
large_stream = False

stream_base_params = {}

def __init__(self, repositories: List[str], **kwargs):
def __init__(self, repositories: List[str], page_size_for_large_streams: int, **kwargs):
super().__init__(**kwargs)
self.repositories = repositories

# GitHub pagination could be from 1 to 100.
self.page_size = page_size_for_large_streams if self.large_stream else DEFAULT_PAGE_SIZE

MAX_RETRIES = 3
adapter = requests.adapters.HTTPAdapter(max_retries=MAX_RETRIES)
self._session.mount("https://", adapter)
Expand Down Expand Up @@ -295,6 +300,9 @@ class Organizations(GithubStream):
API docs: https://docs.github.com/en/rest/reference/orgs#get-an-organization
"""

# GitHub pagination could be from 1 to 100.
page_size = 100

def __init__(self, organizations: List[str], **kwargs):
super(GithubStream, self).__init__(**kwargs)
self.organizations = organizations
Expand Down Expand Up @@ -394,7 +402,7 @@ class PullRequests(SemiIncrementalGithubStream):
API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests
"""

page_size = 50
large_stream = True
first_read_override_key = "first_read_override"

def __init__(self, **kwargs):
Expand Down Expand Up @@ -524,7 +532,7 @@ class Comments(IncrementalGithubStream):
API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository
"""

page_size = 30 # `comments` is a large stream so it's better to set smaller page size.
large_stream = True

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/issues/comments"
Expand Down Expand Up @@ -637,7 +645,7 @@ class Issues(IncrementalGithubStream):
API docs: https://docs.github.com/en/rest/reference/issues#list-repository-issues
"""

page_size = 50 # `issues` is a large stream so it's better to set smaller page size.
large_stream = True

stream_base_params = {
"state": "all",
Expand All @@ -651,7 +659,7 @@ class ReviewComments(IncrementalGithubStream):
API docs: https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository
"""

page_size = 30 # `review-comments` is a large stream so it's better to set smaller page size.
large_stream = True

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/pulls/comments"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@responses.activate
@patch("time.sleep")
def test_bad_gateway_retry(time_mock):
args = {"authenticator": None, "repositories": ["test_repo"], "start_date": "start_date"}
args = {"authenticator": None, "repositories": ["test_repo"], "start_date": "start_date", "page_size_for_large_streams": 30}
stream = PullRequestCommentReactions(**args)
stream_slice = {"repository": "test_repo", "id": "id"}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Your token should have at least the `repo` scope. Depending on which streams you

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.14 | 2021-01-?? | [????](https://github.com/airbytehq/airbyte/pull/????) | Add custom pagination size for large streams |
| 0.2.13 | 2021-01-20 | [9619](https://github.com/airbytehq/airbyte/pull/9619) | Fix logging for function `should_retry` |
| 0.2.11 | 2021-01-17 | [9492](https://github.com/airbytehq/airbyte/pull/9492) | Remove optional parameter `Accept` for reaction`s streams to fix error with 502 HTTP status code in response |
| 0.2.10 | 2021-01-03 | [7250](https://github.com/airbytehq/airbyte/pull/7250) | Use CDK caching and convert PR-related streams to incremental |
Expand Down

1 comment on commit 974e4d8

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube report for Airbyte Connectors Source Github(#9664)

Measures

Name Value Name Value Name Value
Lines of Code 834 Reliability Rating A Duplicated Blocks 0
Duplicated Lines (%) 0.0 Lines to Cover 619 Vulnerabilities 0
Code Smells 13 Security Rating A Quality Gate Status OK
Coverage 40.5 Bugs 0 Blocker Issues 0
Critical Issues 3 Major Issues 5 Minor Issues 5

Detected Issues

Rule File Description Message
python:isort_need_format (MINOR) unit_tests/test_stream.py Please run one of the commands: "isort <path_to_updated_folder>" or "./gradlew format" 1 code part(s) should be updated.
flake8:E501 (MAJOR) source_github/streams.py:66 line too long (82 > 79 characters) line too long (150 > 140 characters)
python:S5886 (MAJOR) source_github/streams.py:76 Function return types should be consistent with their type hint Return a value of type "Union[int, float]" instead of "NoneType" or update function "backoff_time" type hint.
python:S2638 (CRITICAL) source_github/streams.py:725 Method overrides should not change contracts Add missing parameters organization.
python:isort_need_format (MINOR) unit_tests/test_source.py Please run one of the commands: "isort <path_to_updated_folder>" or "./gradlew format" 1 code part(s) should be updated.
python:isort_need_format (MINOR) unit_tests/unit_test.py Please run one of the commands: "isort <path_to_updated_folder>" or "./gradlew format" 1 code part(s) should be updated.
python:isort_need_format (MINOR) main.py Please run one of the commands: "isort <path_to_updated_folder>" or "./gradlew format" 1 code part(s) should be updated.
python:mypy_unknown (MINOR) main.py Unknown error Duplicate module named "main" (also at "./fixtures/main.py")
python:S3776 (CRITICAL) source_github/streams.py:83 Cognitive Complexity of functions should not be too high Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.
flake8:E501 (MAJOR) fixtures/github.py:22 line too long (82 > 79 characters) line too long (151 > 140 characters)
python:S5797 (CRITICAL) fixtures/github.py:79 Constants should not be used as conditions Replace this expression; used as a condition it will always be constant.
python:S112 (MAJOR) source_github/source.py:69 "Exception" and "BaseException" should not be raised Replace this generic exception class with a more specific one.
flake8:C901 (MAJOR) source_github/streams.py:83 flake8:C901 'GithubStream.read_records' is too complex (13)

Coverage (40.5%)

File Coverage File Coverage
fixtures/github.py 0.0 fixtures/main.py 0.0
integration_tests/acceptance.py 0.0 main.py 0.0
setup.py 0.0 source_github/init.py 100.0
source_github/source.py 55.1 source_github/streams.py 56.0
unit_tests/test_source.py 0.0 unit_tests/test_stream.py 0.0
unit_tests/unit_test.py 0.0

Please sign in to comment.