Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Tap level SQLAlchemy connector instance shared with Streams #1773

Closed
BuzzCutNorman opened this issue Jun 19, 2023 · 0 comments · Fixed by #1861
Closed

feat: Tap level SQLAlchemy connector instance shared with Streams #1773

BuzzCutNorman opened this issue Jun 19, 2023 · 0 comments · Fixed by #1861
Labels
kind/Feature New feature or request valuestream/SDK

Comments

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Jun 19, 2023

Feature scope

Taps (catalog, state, stream maps, tests, etc.)

Description

Currently an SQL tap will open and hold a session to the source for each stream selected. This means if a tap has 100 streams\tables selected the SQL tap will open 100 session and hold them until the meltano run is finished. It would be nice if SQL taps could utilize a single connector instance and leverage the SQLAlchemy pool of connection it has.

This could be accomplished If an instance of the tap's connector class is initialized and held as a property and that connector property is passed on to SQLStream instances when they are initialized by the discover_streams method. The changes to the SQLTap class might look like this:

class SQLTap(Tap):
    """A specialized Tap for extracting from SQL streams."""

    # Stream class used to initialize new SQL streams from their catalog declarations.
    default_stream_class: type[SQLStream]

    default_connector_class = SQLConnector
    _tap_connector: SQLConnector = None

    ... comments and code removed to show the change better

    @property
    def tap_connector(self) -> SQLConnector:
        """The connector object.

        Returns:
            The connector object.
        """
        if self._tap_connector is None:
            self._tap_connector = self.default_connector_class(dict(self.config))
        return self._tap_connector

    @property
    def catalog_dict(self) -> dict:

        ... some code removed to assist with clarity
        
        connector = self.tap_connector

        ... some code removed to assist with clarity

    def discover_streams(self) -> list[Stream]:
        """Initialize all available streams and return them as a list.

        Returns:
            List of discovered Stream objects.
        """
        result: list[Stream] = []
        for catalog_entry in self.catalog_dict["streams"]:
            result.append(
                self.default_stream_class(
                    tap=self,
                    catalog_entry=catalog_entry,
                    connector=self.tap_connector
                )
            )

        return result

An example of this working in a tap can be found in tap-mssql--buzzcutnorman:tap.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/Feature New feature or request valuestream/SDK
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant