Skip to content

Commit

Permalink
feat: Implement paginators
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jun 17, 2022
1 parent d0e1977 commit 5a184a4
Show file tree
Hide file tree
Showing 6 changed files with 845 additions and 95 deletions.
36 changes: 18 additions & 18 deletions samples/sample_tap_gitlab/gitlab_rest_streams.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Sample tap stream test for tap-gitlab."""

from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from __future__ import annotations

import requests
from pathlib import Path
from typing import Any, cast

from singer_sdk.authenticators import SimpleAuthenticator
from singer_sdk.pagination import SimpleHeaderPaginator
from singer_sdk.streams.rest import RESTStream
from singer_sdk.typing import (
ArrayType,
Expand All @@ -21,7 +22,7 @@
DEFAULT_URL_BASE = "https://gitlab.com/api/v4"


class GitlabStream(RESTStream):
class GitlabStream(RESTStream[str]):
"""Sample tap test for gitlab."""

_LOG_REQUEST_METRIC_URLS = True
Expand All @@ -39,8 +40,8 @@ def authenticator(self) -> SimpleAuthenticator:
)

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
self, context: dict | None, next_page_token: str | None
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
if next_page_token:
Expand All @@ -50,21 +51,20 @@ def get_url_params(
params["order_by"] = self.replication_key
return params

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""Return token for identifying next page or None if not applicable."""
next_page_token = response.headers.get("X-Next-Page", None)
if next_page_token:
self.logger.debug(f"Next page token retrieved: {next_page_token}")
return next_page_token
def get_new_paginator(self) -> SimpleHeaderPaginator:
"""Return a new paginator for GitLab API endpoints.
Returns:
A new paginator.
"""
return SimpleHeaderPaginator(None, "X-Next-Page")


class ProjectBasedStream(GitlabStream):
"""Base class for streams that are keys based on project ID."""

@property
def partitions(self) -> List[dict]:
def partitions(self) -> list[dict]:
"""Return a list of partition key dicts (if applicable), otherwise None."""
if "{project_id}" in self.path:
return [
Expand Down Expand Up @@ -162,7 +162,7 @@ class EpicsStream(ProjectBasedStream):

# schema_filepath = SCHEMAS_DIR / "epics.json"

def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
def get_child_context(self, record: dict, context: dict | None) -> dict:
"""Perform post processing, including queuing up any child stream types."""
# Ensure child state record(s) are created
return {
Expand All @@ -183,8 +183,8 @@ class EpicIssuesStream(GitlabStream):
parent_stream_type = EpicsStream # Stream should wait for parents to complete.

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
self, context: dict | None, next_page_token: str | None
) -> dict[str, Any]:
"""Return a dictionary of values to be used in parameterization."""
result = super().get_url_params(context, next_page_token)
if not context or "epic_id" not in context:
Expand Down
Loading

0 comments on commit 5a184a4

Please sign in to comment.