Skip to content

Commit

Permalink
added colorize-logs cli option
Browse files Browse the repository at this point in the history
remove colorize logs language

rename duplicate test name

rename to something useful

add cli override

add tests

rename to correct test number

update changelog

fix flake errors and code cleanup

move feature to Features section of changelog

link issues and add names to list of contributors

Update test/integration/061_use_colors_tests/test_no_use_colors.py

Co-authored-by: Jacob Beck <[email protected]>

simplify test query

refactor

properly set default

update tests

Update CHANGELOG.md

added colorize-logs cli option

remove colorize logs language

rename duplicate test name

rename to something useful

add cli override

add tests

rename to correct test number

update changelog

fix flake errors and code cleanup

move feature to Features section of changelog

link issues and add names to list of contributors

Update test/integration/061_use_colors_tests/test_no_use_colors.py

Co-authored-by: Jacob Beck <[email protected]>

simplify test query

refactor

properly set default

update tests

remove rebase leftovers

get rid of threads again

Parse selectors

This doesn't use hologram, as it needs more permissive schemas than what hologram provides.
Added some unit tests

Create selector config file, handle selector argument

Added tests
Added RPC and CLI support

add changelog

Compile assets as part of docs generate

Release dbt v0.17.1

Bump version: 0.17.1 → 0.17.2a1

add environment variables for logging

changelog fix

Release dbt v0.17.2b1

Fix fast-fail not logging original error message

Apply suggestions from code review

Co-authored-by: Jacob Beck <[email protected]>

azure pipelines silently messing with $PATH again

clean without a profile

Update CHANGELOG.md

Co-authored-by: Jeremy Cohen <[email protected]>

Always close connections in release()

Handle the fallout of closing connections in release

- close() implies rollback, so do not call it
- make sure to not open new connections for executors in single-threaded mode
- logging cleanups
- fix a test case that never acquired connections
- to cancel other connections, one must first acquire a connection for the master thread
- change a number of release() calls to rollback

release vs rollback

alter the tests so we can run rpc tests on snowflake

only try to cancel open connections

missed the snowflake rpc tests

PR feedback:
 - fix conftest adainivalue_line calls
 - deprecate the release argument to execute_macro

Release dbt v0.17.2rc1

add a test for cross-schema catalog generation, fix redshift to work for schema overrides

fix a test with too-strict bounds, any <=2 value is ok here

Docs site updates for 0.17.2: fix code block background

fix the changelog

Release dbt v0.17.2
  • Loading branch information
rsenseman committed Jul 30, 2020
1 parent f6918f5 commit 2d88d3a
Show file tree
Hide file tree
Showing 88 changed files with 3,073 additions and 417 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.18.0b1
current_version = 0.17.2
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
911 changes: 911 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

26 changes: 15 additions & 11 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ def clear_transaction(self) -> None:
self.begin()
self.commit()

def rollback_if_open(self) -> None:
conn = self.get_if_exists()
if conn is not None and conn.handle and conn.transaction_open:
self._rollback(conn)

@abc.abstractmethod
def exception_handler(self, sql: str) -> ContextManager:
"""Create a context manager that handles exceptions caused by database
Expand Down Expand Up @@ -172,11 +177,9 @@ def release(self) -> None:
return

try:
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
else:
self.close(conn)
# always close the connection. close() calls _rollback() if there
# is an open transaction
self.close(conn)
except Exception:
# if rollback or close failed, remove our busted connection
self.clear_thread_connection()
Expand Down Expand Up @@ -226,11 +229,10 @@ def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug('On {}: Close'.format(connection.name))
logger.debug(f'On {connection.name}: Close')
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))
logger.debug(f'On {connection.name}: No close available on handle')

@classmethod
def _rollback(cls, connection: Connection) -> None:
Expand All @@ -243,10 +245,11 @@ def _rollback(cls, connection: Connection) -> None:

if connection.transaction_open is False:
raise dbt.exceptions.InternalException(
'Tried to rollback transaction on connection "{}", but '
'it does not have one open!'.format(connection.name))
f'Tried to rollback transaction on connection '
f'"{connection.name}", but it does not have one open!'
)

logger.debug('On {}: ROLLBACK'.format(connection.name))
logger.debug(f'On {connection.name}: ROLLBACK')
cls._rollback_handle(connection)

connection.transaction_open = False
Expand All @@ -264,6 +267,7 @@ def close(cls, connection: Connection) -> Connection:
return connection

if connection.transaction_open and connection.handle:
logger.debug('On {}: ROLLBACK'.format(connection.name))
cls._rollback_handle(connection)
connection.transaction_open = False

Expand Down
80 changes: 40 additions & 40 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,6 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _list_relations_get_connection(
self, schema_relation: BaseRelation
) -> List[BaseRelation]:
name = f'list_{schema_relation.database}_{schema_relation.schema}'
with self.connection_named(name):
return self.list_relations_without_caching(schema_relation)

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
Expand All @@ -325,10 +318,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:

cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = [
tpe.submit(self._list_relations_get_connection, cache_schema)
for cache_schema in cache_schemas
]
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
fut = tpe.submit_connected(
self,
f'list_{cache_schema.database}_{cache_schema.schema}',
self.list_relations_without_caching,
cache_schema
)
futures.append(fut)

for future in as_completed(futures):
# if we can't read the relations we need to just raise anyway,
# so just call future.result() and let that raise on failure
Expand Down Expand Up @@ -932,8 +931,10 @@ def execute_macro(
execution context.
:param kwargs: An optional dict of keyword args used to pass to the
macro.
:param release: If True, release the connection after executing.
:param release: Ignored.
"""
if release is not False:
deprecations.warn('execute-macro-release')
if kwargs is None:
kwargs = {}
if context_override is None:
Expand Down Expand Up @@ -969,11 +970,7 @@ def execute_macro(
macro_function = MacroGenerator(macro, macro_context)

with self.connections.exception_handler(f'macro {macro_name}'):
try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection()
result = macro_function(**kwargs)
return result

@classmethod
Expand All @@ -998,24 +995,17 @@ def _get_one_catalog(
manifest: Manifest,
) -> agate.Table:

name = '.'.join([
str(information_schema.database),
'information_schema'
])

with self.connection_named(name):
kwargs = {
'information_schema': information_schema,
'schemas': schemas
}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
)
kwargs = {
'information_schema': information_schema,
'schemas': schemas
}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest)
return results
Expand All @@ -1026,10 +1016,21 @@ def get_catalog(
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = [
tpe.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = '.'.join([
str(info.database),
'information_schema'
])

fut = tpe.submit_connected(
self, name,
self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)

return catalogs, exceptions
Expand All @@ -1056,7 +1057,6 @@ def calculate_freshness(
table = self.execute_macro(
FRESHNESS_MACRO_NAME,
kwargs=kwargs,
release=True,
manifest=manifest
)
# now we have a 1-row table of the maximum `loaded_at_field` value and
Expand Down
6 changes: 5 additions & 1 deletion core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dbt.clients.agate_helper
import dbt.exceptions
from dbt.contracts.connection import Connection, ConnectionState
from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import Connection
from dbt.logger import GLOBAL_LOGGER as logger
Expand Down Expand Up @@ -38,7 +39,10 @@ def cancel_open(self) -> List[str]:

# if the connection failed, the handle will be None so we have
# nothing to cancel.
if connection.handle is not None:
if (
connection.handle is not None and
connection.state == ConnectionState.OPEN
):
self.cancel(connection)
if connection.name is not None:
names.append(connection.name)
Expand Down
38 changes: 37 additions & 1 deletion core/dbt/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from dbt.exceptions import RecursionException
from dbt.exceptions import SemverException
from dbt.exceptions import validator_error_message
from dbt.exceptions import RuntimeException
from dbt.graph import SelectionSpec
from dbt.helper_types import NoValue
from dbt.semver import VersionSpecifier
from dbt.semver import versions_compatible
Expand All @@ -37,6 +39,11 @@
from hologram import ValidationError

from .renderer import DbtProjectYamlRenderer
from .selectors import (
selector_config_from_data,
selector_data_from_root,
SelectorConfig,
)


INVALID_VERSION_ERROR = """\
Expand Down Expand Up @@ -211,10 +218,12 @@ class PartialProject:

def render(self, renderer):
packages_dict = package_data_from_root(self.project_root)
selectors_dict = selector_data_from_root(self.project_root)
return Project.render_from_dict(
self.project_root,
self.project_dict,
packages_dict,
selectors_dict,
renderer,
)

Expand Down Expand Up @@ -295,6 +304,7 @@ class Project:
test_paths: List[str]
analysis_paths: List[str]
docs_paths: List[str]
asset_paths: List[str]
target_path: str
snapshot_paths: List[str]
clean_targets: List[str]
Expand All @@ -310,6 +320,7 @@ class Project:
vars: VarProvider
dbt_version: List[VersionSpecifier]
packages: Dict[str, Any]
selectors: SelectorConfig
query_comment: QueryComment
config_version: int

Expand Down Expand Up @@ -351,6 +362,7 @@ def from_project_config(
cls,
project_dict: Dict[str, Any],
packages_dict: Optional[Dict[str, Any]] = None,
selectors_dict: Optional[Dict[str, Any]] = None,
) -> 'Project':
"""Create a project from its project and package configuration, as read
by yaml.safe_load().
Expand Down Expand Up @@ -401,6 +413,7 @@ def from_project_config(
)

docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
asset_paths: List[str] = value_or(cfg.asset_paths, [])
target_path: str = value_or(cfg.target_path, 'target')
clean_targets: List[str] = value_or(cfg.clean_targets, [target_path])
log_path: str = value_or(cfg.log_path, 'logs')
Expand Down Expand Up @@ -464,6 +477,11 @@ def from_project_config(
except ValidationError as e:
raise DbtProjectError(validator_error_message(e)) from e

try:
selectors = selector_config_from_data(selectors_dict)
except ValidationError as e:
raise DbtProjectError(validator_error_message(e)) from e

project = cls(
project_name=name,
version=version,
Expand All @@ -475,6 +493,7 @@ def from_project_config(
test_paths=test_paths,
analysis_paths=analysis_paths,
docs_paths=docs_paths,
asset_paths=asset_paths,
target_path=target_path,
snapshot_paths=snapshot_paths,
clean_targets=clean_targets,
Expand All @@ -488,6 +507,7 @@ def from_project_config(
snapshots=snapshots,
dbt_version=dbt_version,
packages=packages,
selectors=selectors,
query_comment=query_comment,
sources=sources,
vars=vars_value,
Expand Down Expand Up @@ -527,6 +547,7 @@ def to_project_config(self, with_packages=False):
'test-paths': self.test_paths,
'analysis-paths': self.analysis_paths,
'docs-paths': self.docs_paths,
'asset-paths': self.asset_paths,
'target-path': self.target_path,
'snapshot-paths': self.snapshot_paths,
'clean-targets': self.clean_targets,
Expand Down Expand Up @@ -568,14 +589,21 @@ def render_from_dict(
project_root: str,
project_dict: Dict[str, Any],
packages_dict: Dict[str, Any],
selectors_dict: Dict[str, Any],
renderer: DbtProjectYamlRenderer,
) -> 'Project':
rendered_project = renderer.render_data(project_dict)
rendered_project['project-root'] = project_root
package_renderer = renderer.get_package_renderer()
rendered_packages = package_renderer.render_data(packages_dict)
selectors_renderer = renderer.get_selector_renderer()
rendered_selectors = selectors_renderer.render_data(selectors_dict)
try:
return cls.from_project_config(rendered_project, rendered_packages)
return cls.from_project_config(
rendered_project,
rendered_packages,
rendered_selectors,
)
except DbtProjectError as exc:
if exc.path is None:
exc.path = os.path.join(project_root, 'dbt_project.yml')
Expand Down Expand Up @@ -659,6 +687,14 @@ def as_v1(self, all_projects: Iterable[str]):
project.packages = self.packages
return project

def get_selector(self, name: str) -> SelectionSpec:
if name not in self.selectors:
raise RuntimeException(
f'Could not find selector named {name}, expected one of '
f'{list(self.selectors)}'
)
return self.selectors[name]


def v2_vars_to_v1(
dst: Dict[str, Any], src_vars: Dict[str, Any], project_names: Set[str]
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/config/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def name(self):
def get_package_renderer(self) -> BaseRenderer:
return PackageRenderer(self.context)

def get_selector_renderer(self) -> BaseRenderer:
return SelectorRenderer(self.context)

def should_render_keypath_v1(self, keypath: Keypath) -> bool:
if not keypath:
return True
Expand Down Expand Up @@ -206,3 +209,9 @@ class PackageRenderer(BaseRenderer):
@property
def name(self):
return 'Packages config'


class SelectorRenderer(BaseRenderer):
@property
def name(self):
return 'Selector config'
Loading

0 comments on commit 2d88d3a

Please sign in to comment.