-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1830 from fishtown-analytics/feature/improve-rpc-…
…compile-performance Feature/improve rpc compile performance (#1824)
- Loading branch information
Showing
58 changed files
with
1,291 additions
and
1,158 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,108 +1,142 @@ | ||
import threading | ||
from importlib import import_module | ||
from typing import Type, Dict, TypeVar | ||
from typing import Type, Dict, Any | ||
|
||
from dbt.exceptions import RuntimeException | ||
from dbt.include.global_project import PACKAGES | ||
from dbt.logger import GLOBAL_LOGGER as logger | ||
from dbt.contracts.connection import Credentials | ||
from dbt.contracts.connection import Credentials, HasCredentials | ||
|
||
from dbt.adapters.base.impl import BaseAdapter | ||
from dbt.adapters.base.plugin import AdapterPlugin | ||
|
||
# TODO: we can't import these because they cause an import cycle. | ||
# currently RuntimeConfig needs to figure out default quoting for its adapter. | ||
# We should push that elsewhere when we fixup project/profile stuff | ||
# Instead here are some import loop avoiding-hacks right now. And Profile has | ||
# to call into load_plugin to get credentials, so adapter/relation don't work | ||
RuntimeConfig = TypeVar('RuntimeConfig') | ||
BaseAdapter = TypeVar('BaseAdapter') | ||
BaseRelation = TypeVar('BaseRelation') | ||
# Profile has to call into load_plugin to get credentials, so adapter/relation | ||
# don't work | ||
BaseRelation = Any | ||
|
||
ADAPTER_TYPES: Dict[str, Type[BaseAdapter]] = {} | ||
|
||
_ADAPTERS: Dict[str, BaseAdapter] = {} | ||
_ADAPTER_LOCK = threading.Lock() | ||
Adapter = BaseAdapter | ||
|
||
|
||
def get_adapter_class_by_name(adapter_name: str) -> Type[BaseAdapter]: | ||
with _ADAPTER_LOCK: | ||
if adapter_name in ADAPTER_TYPES: | ||
return ADAPTER_TYPES[adapter_name] | ||
class AdpaterContainer: | ||
def __init__(self): | ||
self.lock = threading.Lock() | ||
self.adapters: Dict[str, Adapter] = {} | ||
self.adapter_types: Dict[str, Type[Adapter]] = {} | ||
|
||
adapter_names = ", ".join(ADAPTER_TYPES.keys()) | ||
def get_adapter_class_by_name(self, name: str) -> Type[Adapter]: | ||
with self.lock: | ||
if name in self.adapter_types: | ||
return self.adapter_types[name] | ||
|
||
message = "Invalid adapter type {}! Must be one of {}" | ||
formatted_message = message.format(adapter_name, adapter_names) | ||
raise RuntimeException(formatted_message) | ||
names = ", ".join(self.adapter_types.keys()) | ||
|
||
message = f"Invalid adapter type {name}! Must be one of {names}" | ||
raise RuntimeException(message) | ||
|
||
def get_relation_class_by_name(adapter_name: str) -> Type[BaseRelation]: | ||
adapter = get_adapter_class_by_name(adapter_name) | ||
return adapter.Relation | ||
def get_relation_class_by_name(self, name: str) -> Type[BaseRelation]: | ||
adapter = self.get_adapter_class_by_name(name) | ||
return adapter.Relation | ||
|
||
def load_plugin(self, name: str) -> Type[Credentials]: | ||
# this doesn't need a lock: in the worst case we'll overwrite PACKAGES | ||
# and adapter_type entries with the same value, as they're all | ||
# singletons | ||
try: | ||
mod = import_module('.' + name, 'dbt.adapters') | ||
except ImportError as e: | ||
logger.info("Error importing adapter: {}".format(e)) | ||
raise RuntimeException( | ||
"Could not find adapter type {}!".format(name) | ||
) | ||
if not hasattr(mod, 'Plugin'): | ||
raise RuntimeException( | ||
f'Could not find plugin in {name} plugin module' | ||
) | ||
plugin: AdapterPlugin = mod.Plugin # type: ignore | ||
plugin_type = plugin.adapter.type() | ||
|
||
def load_plugin(adapter_name: str) -> Credentials: | ||
# this doesn't need a lock: in the worst case we'll overwrite PACKAGES and | ||
# _ADAPTER_TYPE entries with the same value, as they're all singletons | ||
try: | ||
mod = import_module('.' + adapter_name, 'dbt.adapters') | ||
except ImportError as e: | ||
logger.info("Error importing adapter: {}".format(e)) | ||
raise RuntimeException( | ||
"Could not find adapter type {}!".format(adapter_name) | ||
) | ||
plugin = mod.Plugin | ||
if plugin_type != name: | ||
raise RuntimeException( | ||
f'Expected to find adapter with type named {name}, got ' | ||
f'adapter with type {plugin_type}' | ||
) | ||
|
||
if plugin.adapter.type() != adapter_name: | ||
raise RuntimeException( | ||
'Expected to find adapter with type named {}, got adapter with ' | ||
'type {}' | ||
.format(adapter_name, plugin.adapter.type()) | ||
) | ||
with self.lock: | ||
# things do hold the lock to iterate over it so we need it to add | ||
self.adapter_types[name] = plugin.adapter | ||
|
||
with _ADAPTER_LOCK: | ||
# things do hold the lock to iterate over it so we need ot to add stuff | ||
ADAPTER_TYPES[adapter_name] = plugin.adapter | ||
PACKAGES[plugin.project_name] = plugin.include_path | ||
|
||
PACKAGES[plugin.project_name] = plugin.include_path | ||
for dep in plugin.dependencies: | ||
self.load_plugin(dep) | ||
|
||
for dep in plugin.dependencies: | ||
load_plugin(dep) | ||
return plugin.credentials | ||
|
||
return plugin.credentials | ||
def register_adapter(self, config: HasCredentials) -> None: | ||
adapter_name = config.credentials.type | ||
adapter_type = self.get_adapter_class_by_name(adapter_name) | ||
|
||
with self.lock: | ||
if adapter_name in self.adapters: | ||
# this shouldn't really happen... | ||
return | ||
|
||
def get_adapter(config: RuntimeConfig) -> BaseAdapter: | ||
adapter_name = config.credentials.type | ||
adapter: Adapter = adapter_type(config) # type: ignore | ||
self.adapters[adapter_name] = adapter | ||
|
||
# Atomically check to see if we already have an adapter | ||
if adapter_name in _ADAPTERS: | ||
return _ADAPTERS[adapter_name] | ||
def lookup_adapter(self, adapter_name: str) -> Adapter: | ||
return self.adapters[adapter_name] | ||
|
||
adapter_type = get_adapter_class_by_name(adapter_name) | ||
def reset_adapters(self): | ||
"""Clear the adapters. This is useful for tests, which change configs. | ||
""" | ||
with self.lock: | ||
for adapter in self.adapters.values(): | ||
adapter.cleanup_connections() | ||
self.adapters.clear() | ||
|
||
with _ADAPTER_LOCK: | ||
# check again, in case something was setting it before | ||
if adapter_name in _ADAPTERS: | ||
return _ADAPTERS[adapter_name] | ||
def cleanup_connections(self): | ||
"""Only clean up the adapter connections list without resetting the actual | ||
adapters. | ||
""" | ||
with self.lock: | ||
for adapter in self.adapters.values(): | ||
adapter.cleanup_connections() | ||
|
||
adapter = adapter_type(config) | ||
_ADAPTERS[adapter_name] = adapter | ||
return adapter | ||
|
||
FACTORY: AdpaterContainer = AdpaterContainer() | ||
|
||
|
||
def register_adapter(config: HasCredentials) -> None: | ||
FACTORY.register_adapter(config) | ||
|
||
|
||
def get_adapter(config: HasCredentials): | ||
return FACTORY.lookup_adapter(config.credentials.type) | ||
|
||
|
||
def reset_adapters(): | ||
"""Clear the adapters. This is useful for tests, which change configs. | ||
""" | ||
with _ADAPTER_LOCK: | ||
for adapter in _ADAPTERS.values(): | ||
adapter.cleanup_connections() | ||
_ADAPTERS.clear() | ||
FACTORY.reset_adapters() | ||
|
||
|
||
def cleanup_connections(): | ||
"""Only clean up the adapter connections list without resetting the actual | ||
adapters. | ||
""" | ||
with _ADAPTER_LOCK: | ||
for adapter in _ADAPTERS.values(): | ||
adapter.cleanup_connections() | ||
FACTORY.cleanup_connections() | ||
|
||
|
||
def get_adapter_class_by_name(name: str) -> Type[BaseAdapter]: | ||
return FACTORY.get_adapter_class_by_name(name) | ||
|
||
|
||
def get_relation_class_by_name(name: str) -> Type[BaseRelation]: | ||
return FACTORY.get_relation_class_by_name(name) | ||
|
||
|
||
def load_plugin(name: str) -> Type[Credentials]: | ||
return FACTORY.load_plugin(name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.