+from parsons.etl.table import Table
+from parsons.databases.redshift.rs_copy_table import RedshiftCopyTable
+from parsons.databases.redshift.rs_create_table import RedshiftCreateTable
+from parsons.databases.redshift.rs_table_utilities import RedshiftTableUtilities
+from parsons.databases.redshift.rs_schema import RedshiftSchema
+from parsons.databases.table import BaseTable
+from parsons.utilities import files
+import psycopg2
+import psycopg2.extras
+import os
+import logging
+import json
+import pickle
+import petl
+from contextlib import contextmanager
+import datetime
+import random
+
+# Max number of rows that we query at a time, so we can avoid loading huge
+# data sets into memory.
+# 100k rows per batch at ~1k bytes each = ~100MB per batch.
+QUERY_BATCH_SIZE = 100000
+
+logger = logging.getLogger(__name__)
+
+
+[docs]class Redshift(RedshiftCreateTable, RedshiftCopyTable, RedshiftTableUtilities, RedshiftSchema):
+
"""
+
A Redshift class to connect to database.
+
+
Args:
+
username: str
+
Required if env variable ``REDSHIFT_USERNAME`` not populated
+
password: str
+
Required if env variable ``REDSHIFT_PASSWORD`` not populated
+
host: str
+
Required if env variable ``REDSHIFT_HOST`` not populated
+
db: str
+
Required if env variable ``REDSHIFT_DB`` not populated
+
port: int
+
Required if env variable ``REDSHIFT_PORT`` not populated. Port 5439 is typical.
+
timeout: int
+
Seconds to timeout if connection not established
+
s3_temp_bucket: str
+
Name of the S3 bucket that will be used for storing data during bulk transfers.
+
Required if you intend to perform bulk data transfers (eg. the copy_s3 method),
+
and env variable ``S3_TEMP_BUCKET`` is not populated.
+
aws_access_key_id: str
+
The default AWS access key id for copying data from S3 into Redshift
+
when running copy/upsert/etc methods.
+
This will default to environment variable AWS_ACCESS_KEY_ID.
+
aws_secret_access_key: str
+
The default AWS secret access key for copying data from S3 into Redshift
+
when running copy/upsert/etc methods.
+
This will default to environment variable AWS_SECRET_ACCESS_KEY.
+
iam_role: str
+
AWS IAM Role ARN string -- an optional, different way for credentials to
+
be provided in the Redshift copy command that does not require an access key.
+
"""
+
+
def __init__(self, username=None, password=None, host=None, db=None, port=None,
+
timeout=10, s3_temp_bucket=None,
+
aws_access_key_id=None, aws_secret_access_key=None, iam_role=None):
+
+
try:
+
self.username = username or os.environ['REDSHIFT_USERNAME']
+
self.password = password or os.environ['REDSHIFT_PASSWORD']
+
self.host = host or os.environ['REDSHIFT_HOST']
+
self.db = db or os.environ['REDSHIFT_DB']
+
self.port = port or os.environ['REDSHIFT_PORT']
+
except KeyError as error:
+
logger.error("Connection info missing. Most include as kwarg or "
+
"env variable.")
+
raise error
+
+
self.timeout = timeout
+
self.dialect = 'redshift'
+
self.s3_temp_bucket = s3_temp_bucket or os.environ.get('S3_TEMP_BUCKET')
+
# We don't check/load the environment variables for aws_* here
+
# because the logic in S3() and rs_copy_table.py does already.
+
self.aws_access_key_id = aws_access_key_id
+
self.aws_secret_access_key = aws_secret_access_key
+
self.iam_role = iam_role
+
+
@contextmanager
+
def connection(self):
+
"""
+
Generate a Redshift connection.
+
The connection is set up as a python "context manager", so it will be closed
+
automatically (and all queries committed) when the connection goes out of scope.
+
+
When using the connection, make sure to put it in a ``with`` block (necessary for
+
any context manager):
+
``with rs.connection() as conn:``
+
+
`Returns:`
+
Psycopg2 `connection` object
+
"""
+
+
# Create a psycopg2 connection and cursor
+
conn = psycopg2.connect(user=self.username, password=self.password,
+
host=self.host, dbname=self.db, port=self.port,
+
connect_timeout=self.timeout)
+
try:
+
yield conn
+
+
conn.commit()
+
finally:
+
conn.close()
+
+
@contextmanager
+
def cursor(self, connection):
+
cur = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
try:
+
yield cur
+
finally:
+
cur.close()
+
+
def query(self, sql, parameters=None):
+
"""
+
Execute a query against the Redshift database. Will return ``None``
+
if the query returns zero rows.
+
+
To include python variables in your query, it is recommended to pass them as parameters,
+
following the `psycopg style <http://initd.org/psycopg/docs/usage.html#passing-parameters-to-sql-queries>`_.
+
Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL
+
injection attacks.
+
+
**Parameter Examples**
+
+
.. code-block:: python
+
+
# Note that the name contains a quote, which could break your query if not escaped
+
# properly.
+
name = "Beatrice O'Brady"
+
sql = "SELECT * FROM my_table WHERE name = %s"
+
rs.query(sql, parameters=[name])
+
+
.. code-block:: python
+
+
names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"]
+
placeholders = ', '.join('%s' for item in names)
+
sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})"
+
rs.query(sql, parameters=names)
+
+
`Args:`
+
sql: str
+
A valid SQL statement
+
parameters: list
+
A list of python variables to be converted into SQL values in your query
+
+
`Returns:`
+
Parsons Table
+
See :ref:`parsons-table` for output options.
+
+
""" # noqa: E501
+
+
with self.connection() as connection:
+
return self.query_with_connection(sql, connection, parameters=parameters)
+
+
def query_with_connection(self, sql, connection, parameters=None, commit=True):
+
"""
+
Execute a query against the Redshift database, with an existing connection.
+
Useful for batching queries together. Will return ``None`` if the query
+
returns zero rows.
+
+
`Args:`
+
sql: str
+
A valid SQL statement
+
connection: obj
+
A connection object obtained from ``redshift.connection()``
+
parameters: list
+
A list of python variables to be converted into SQL values in your query
+
commit: boolean
+
Whether to commit the transaction immediately. If ``False`` the transaction will
+
be committed when the connection goes out of scope and is closed (or you can
+
commit manually with ``connection.commit()``).
+
+
`Returns:`
+
Parsons Table
+
See :ref:`parsons-table` for output options.
+
"""
+
+
# To Do: Have it return an ordered dict to return the
+
# rows in the correct order
+
+
with self.cursor(connection) as cursor:
+
+
logger.debug(f'SQL Query: {sql}')
+
cursor.execute(sql, parameters)
+
+
if commit:
+
connection.commit()
+
+
# If the cursor is empty, don't cause an error
+
if not cursor.description:
+
logger.debug('Query returned 0 rows')
+
return None
+
+
else:
+
+
# Fetch the data in batches, and "pickle" the rows to a temp file.
+
# (We pickle rather than writing to, say, a CSV, so that we maintain
+
# all the type information for each field.)
+
+
temp_file = files.create_temp_file()
+
+
with open(temp_file, 'wb') as f:
+
# Grab the header
+
header = [i[0] for i in cursor.description]
+
pickle.dump(header, f)
+
+
while True:
+
batch = cursor.fetchmany(QUERY_BATCH_SIZE)
+
if not batch:
+
break
+
+
logger.debug(f'Fetched {len(batch)} rows.')
+
for row in batch:
+
pickle.dump(list(row), f)
+
+
# Load a Table from the file
+
final_tbl = Table(petl.frompickle(temp_file))
+
+
logger.debug(f'Query returned {final_tbl.num_rows} rows.')
+
return final_tbl
+
+
def copy_s3(self, table_name, bucket, key, manifest=False, data_type='csv',
+
csv_delimiter=',', compression=None, if_exists='fail', max_errors=0,
+
distkey=None, sortkey=None, padding=None, varchar_max=None,
+
statupdate=True, compupdate=True, ignoreheader=1, acceptanydate=True,
+
dateformat='auto', timeformat='auto', emptyasnull=True,
+
blanksasnull=True, nullas=None, acceptinvchars=True, truncatecolumns=False,
+
columntypes=None, specifycols=None,
+
aws_access_key_id=None, aws_secret_access_key=None, bucket_region=None):
+
"""
+
Copy a file from s3 to Redshift.
+
+
`Args:`
+
table_name: str
+
The table name and schema (``tmc.cool_table``) to point the file.
+
bucket: str
+
The s3 bucket where the file or manifest is located.
+
key: str
+
The key of the file or manifest in the s3 bucket.
+
manifest: str
+
If using a manifest
+
data_type: str
+
The data type of the file. Only ``csv`` supported currently.
+
csv_delimiter: str
+
The delimiter of the ``csv``. Only relevant if data_type is ``csv``.
+
compression: str
+
If specified (``gzip``), will attempt to decompress the file.
+
if_exists: str
+
If the table already exists, either ``fail``, ``append``, ``drop``
+
or ``truncate`` the table.
+
max_errors: int
+
The maximum number of rows that can error and be skipped before
+
the job fails.
+
distkey: str
+
The column name of the distkey
+
sortkey: str
+
The column name of the sortkey
+
padding: float
+
A percentage padding to add to varchar columns if creating a new table. This is
+
helpful to add a buffer for future copies in which the data might be wider.
+
varchar_max: list
+
A list of columns in which to set the width of the varchar column to 65,535
+
characters.
+
statupate: boolean
+
Governs automatic computation and refresh of optimizer statistics at the end
+
of a successful COPY command.
+
compupdate: boolean
+
Controls whether compression encodings are automatically applied during a COPY.
+
ignore_header: int
+
The number of header rows to skip. Ignored if data_type is ``json``.
+
acceptanydate: boolean
+
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be
+
loaded without generating an error.
+
emptyasnull: boolean
+
Indicates that Amazon Redshift should load empty char and varchar fields
+
as ``NULL``.
+
blanksasnull: boolean
+
Loads blank varchar fields, which consist of only white space characters,
+
as ``NULL``.
+
nullas: str
+
Loads fields that match string as NULL
+
acceptinvchars: boolean
+
Enables loading of data into VARCHAR columns even if the data contains
+
invalid UTF-8 characters.
+
dateformat: str
+
Set the date format. Defaults to ``auto``.
+
timeformat: str
+
Set the time format. Defaults to ``auto``.
+
truncatecolumns: boolean
+
If the table already exists, truncates data in columns to the appropriate number
+
of characters so that it fits the column specification. Applies only to columns
+
with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
+
columntypes: dict
+
Optional map of column name to redshift column type, overriding the usual type
+
inference. You only specify the columns you want to override, eg.
+
``columntypes={'phone': 'varchar(12)', 'age': 'int'})``.
+
specifycols: boolean
+
Adds a column list to the Redshift `COPY` command, allowing for the source table
+
in an append to have the columnns out of order, and to have fewer columns with any
+
leftover target table columns filled in with the `DEFAULT` value.
+
+
This will fail if all of the source table's columns do not match a column in the
+
target table. This will also fail if the target table has an `IDENTITY`
+
column and that column name is among the source table's columns.
+
aws_access_key_id:
+
An AWS access key granted to the bucket where the file is located. Not required
+
if keys are stored as environmental variables.
+
aws_secret_access_key:
+
An AWS secret access key granted to the bucket where the file is located. Not
+
required if keys are stored as environmental variables.
+
bucket_region: str
+
The AWS region that the bucket is located in. This should be provided if the
+
Redshift cluster is located in a different region from the temp bucket.
+
+
`Returns`
+
Parsons Table or ``None``
+
See :ref:`parsons-table` for output options.
+
"""
+
+
with self.connection() as connection:
+
+
if self._create_table_precheck(connection, table_name, if_exists):
+
# Grab the object from s3
+
from parsons.aws.s3 import S3
+
s3 = S3(aws_access_key_id=aws_access_key_id,
+
aws_secret_access_key=aws_secret_access_key)
+
+
local_path = s3.get_file(bucket, key)
+
+
if data_type == 'csv':
+
tbl = Table.from_csv(local_path, delimiter=csv_delimiter)
+
else:
+
raise TypeError("Invalid data type provided")
+
+
# Create the table
+
sql = self.create_statement(tbl, table_name, padding=padding,
+
distkey=distkey, sortkey=sortkey,
+
varchar_max=varchar_max,
+
columntypes=columntypes)
+
+
self.query_with_connection(sql, connection, commit=False)
+
logger.info(f'{table_name} created.')
+
+
# Copy the table
+
copy_sql = self.copy_statement(table_name, bucket, key, manifest=manifest,
+
data_type=data_type, csv_delimiter=csv_delimiter,
+
compression=compression, max_errors=max_errors,
+
statupdate=statupdate, compupdate=compupdate,
+
aws_access_key_id=aws_access_key_id,
+
aws_secret_access_key=aws_secret_access_key,
+
ignoreheader=ignoreheader, acceptanydate=acceptanydate,
+
emptyasnull=emptyasnull, blanksasnull=blanksasnull,
+
nullas=nullas, acceptinvchars=acceptinvchars,
+
truncatecolumns=truncatecolumns,
+
specifycols=specifycols,
+
dateformat=dateformat, timeformat=timeformat,
+
bucket_region=bucket_region)
+
+
self.query_with_connection(copy_sql, connection, commit=False)
+
logger.info(f'Data copied to {table_name}.')
+
+
def copy(self, tbl, table_name, if_exists='fail', max_errors=0, distkey=None,
+
sortkey=None, padding=None, statupdate=False, compupdate=True, acceptanydate=True,
+
emptyasnull=True, blanksasnull=True, nullas=None, acceptinvchars=True,
+
dateformat='auto', timeformat='auto', varchar_max=None, truncatecolumns=False,
+
columntypes=None, specifycols=None, alter_table=False,
+
aws_access_key_id=None, aws_secret_access_key=None, iam_role=None,
+
cleanup_s3_file=True, template_table=None, temp_bucket_region=None):
+
"""
+
Copy a :ref:`parsons-table` to Redshift.
+
+
`Args:`
+
tbl: obj
+
A Parsons Table.
+
table_name: str
+
The destination table name (ex. ``my_schema.my_table``).
+
if_exists: str
+
If the table already exists, either ``fail``, ``append``, ``drop``
+
or ``truncate`` the table.
+
max_errors: int
+
The maximum number of rows that can error and be skipped before
+
the job fails.
+
distkey: str
+
The column name of the distkey
+
sortkey: str
+
The column name of the sortkey
+
padding: float
+
A percentage padding to add to varchar columns if creating a new table. This is
+
helpful to add a buffer for future copies in which the data might be wider.
+
varchar_max: list
+
A list of columns in which to set the width of the varchar column to 65,535
+
characters.
+
statupate: boolean
+
Governs automatic computation and refresh of optimizer statistics at the end
+
of a successful COPY command.
+
compupdate: boolean
+
Controls whether compression encodings are automatically applied during a COPY.
+
acceptanydate: boolean
+
Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be
+
loaded without generating an error.
+
emptyasnull: boolean
+
Indicates that Amazon Redshift should load empty char and varchar fields
+
as ``NULL``.
+
blanksasnull: boolean
+
Loads blank varchar fields, which consist of only white space characters,
+
as ``NULL``.
+
nullas: str
+
Loads fields that match string as NULL
+
acceptinvchars: boolean
+
Enables loading of data into VARCHAR columns even if the data contains
+
invalid UTF-8 characters.
+
dateformat: str
+
Set the date format. Defaults to ``auto``.
+
timeformat: str
+
Set the time format. Defaults to ``auto``.
+
truncatecolumns: boolean
+
If the table already exists, truncates data in columns to the appropriate number
+
of characters so that it fits the column specification. Applies only to columns
+
with a VARCHAR or CHAR data type, and rows 4 MB or less in size.
+
columntypes: dict
+
Optional map of column name to redshift column type, overriding the usual type
+
inference. You only specify the columns you want to override, eg.
+
``columntypes={'phone': 'varchar(12)', 'age': 'int'})``.
+
specifycols: boolean
+
Adds a column list to the Redshift `COPY` command, allowing for the source table
+
in an append to have the columnns out of order, and to have fewer columns with any
+
leftover target table columns filled in with the `DEFAULT` value.
+
+
This will fail if all of the source table's columns do not match a column in the
+
target table. This will also fail if the target table has an `IDENTITY`
+
column and that column name is among the source table's columns.
+
alter_table: boolean
+
Will check if the target table varchar widths are wide enough to copy in the
+
table data. If not, will attempt to alter the table to make it wide enough. This
+
will not work with tables that have dependent views.
+
aws_access_key_id:
+
An AWS access key granted to the bucket where the file is located. Not required
+
if keys are stored as environmental variables.
+
aws_secret_access_key:
+
An AWS secret access key granted to the bucket where the file is located. Not
+
required if keys are stored as environmental variables.
+
iam_role: str
+
An AWS IAM Role ARN string; an alternative credential for the COPY command
+
from Redshift to S3. The IAM role must have been assigned to the Redshift
+
instance and have access to the S3 bucket.
+
cleanup_s3_file: boolean
+
The s3 upload is removed by default on cleanup. You can set to False for debugging.
+
template_table: str
+
Instead of specifying columns, columntypes, and/or inference, if there
+
is a pre-existing table that has the same columns/types, then use the template_table
+
table name as the schema for the new table.
+
Unless you set specifycols=False explicitly, a template_table will set it to True
+
temp_bucket_region: str
+
The AWS region that the temp bucket (specified by the TEMP_S3_BUCKET environment
+
variable) is located in. This should be provided if the Redshift cluster is located
+
in a different region from the temp bucket.
+
+
`Returns`
+
Parsons Table or ``None``
+
See :ref:`parsons-table` for output options.
+
"""
+
+
# Specify the columns for a copy statement.
+
if specifycols or (specifycols is None and template_table):
+
cols = tbl.columns
+
else:
+
cols = None
+
+
with self.connection() as connection:
+
+
# Check to see if the table exists. If it does not or if_exists = drop, then
+
# create the new table.
+
if self._create_table_precheck(connection, table_name, if_exists):
+
if template_table:
+
# Copy the schema from the template table
+
sql = f'CREATE TABLE {table_name} (LIKE {template_table})'
+
else:
+
sql = self.create_statement(tbl, table_name, padding=padding,
+
distkey=distkey, sortkey=sortkey,
+
varchar_max=varchar_max,
+
columntypes=columntypes)
+
self.query_with_connection(sql, connection, commit=False)
+
logger.info(f'{table_name} created.')
+
+
# If alter_table is True, then alter table if the table column widths
+
# are wider than the existing table.
+
if alter_table:
+
self.alter_varchar_column_widths(tbl, table_name)
+
+
# Upload the table to S3
+
key = self.temp_s3_copy(tbl, aws_access_key_id=aws_access_key_id,
+
aws_secret_access_key=aws_secret_access_key)
+
+
try:
+
# Copy to Redshift database.
+
copy_args = {'max_errors': max_errors,
+
'ignoreheader': 1,
+
'statupdate': statupdate,
+
'compupdate': compupdate,
+
'acceptanydate': acceptanydate,
+
'dateformat': dateformat,
+
'timeformat': timeformat,
+
'blanksasnull': blanksasnull,
+
'nullas': nullas,
+
'emptyasnull': emptyasnull,
+
'acceptinvchars': acceptinvchars,
+
'truncatecolumns': truncatecolumns,
+
'specifycols': cols,
+
'aws_access_key_id': aws_access_key_id,
+
'aws_secret_access_key': aws_secret_access_key,
+
'compression': 'gzip',
+
'bucket_region': temp_bucket_region}
+
+
# Copy from S3 to Redshift
+
sql = self.copy_statement(table_name, self.s3_temp_bucket, key, **copy_args)
+
logger.debug(f'Copy SQL command: {sql}')
+
self.query_with_connection(sql, connection, commit=False)
+
+
logger.info(f'Data copied to {table_name}.')
+
+
# Clean up the S3 bucket.
+
finally:
+
if key and cleanup_s3_file:
+
self.temp_s3_delete(key)
+
+
def unload(self, sql, bucket, key_prefix, manifest=True, header=True, delimiter='|',
+
compression='gzip', add_quotes=True, null_as=None, escape=True, allow_overwrite=True,
+
parallel=True, max_file_size='6.2 GB', aws_region=None, aws_access_key_id=None,
+
aws_secret_access_key=None):
+
"""
+
Unload Redshift data to S3 Bucket. This is a more efficient method than running a query
+
to export data as it can export in parallel and directly into an S3 bucket. Consider
+
using this for exports of 10MM or more rows.
+
+
sql: str
+
The SQL string to execute to generate the data to unload.
+
buckey: str
+
The destination S3 bucket
+
key_prefix: str
+
The prefix of the key names that will be written
+
manifest: boolean
+
Creates a manifest file that explicitly lists details for the data files
+
that are created by the UNLOAD process.
+
header: boolean
+
Adds a header line containing column names at the top of each output file.
+
delimiter: str
+
Specificies the character used to separate fields. Defaults to '|'.
+
compression: str
+
One of ``gzip``, ``bzip2`` or ``None``. Unloads data to one or more compressed
+
files per slice. Each resulting file is appended with a ``.gz`` or ``.bz2`` extension.
+
add_quotes: boolean
+
Places quotation marks around each unloaded data field, so that Amazon Redshift
+
can unload data values that contain the delimiter itself.
+
null_as: str
+
Specifies a string that represents a null value in unload files. If this option is
+
not specified, null values are unloaded as zero-length strings for delimited output.
+
escape: boolean
+
For CHAR and VARCHAR columns in delimited unload files, an escape character (\) is
+
placed before every linefeed, carriage return, escape characters and delimiters.
+
allow_overwrite: boolean
+
If ``True``, will overwrite existing files, including the manifest file. If ``False``
+
will fail.
+
parallel: boolean
+
By default, UNLOAD writes data in parallel to multiple files, according to the number
+
of slices in the cluster. The default option is ON or TRUE. If PARALLEL is OFF or
+
FALSE, UNLOAD writes to one or more data files serially, sorted absolutely according
+
to the ORDER BY clause, if one is used.
+
max_file_size: str
+
The maximum size of files UNLOAD creates in Amazon S3. Specify a decimal value between
+
5 MB and 6.2 GB.
+
region: str
+
The AWS Region where the target Amazon S3 bucket is located. REGION is required for
+
UNLOAD to an Amazon S3 bucket that is not in the same AWS Region as the Amazon Redshift
+
cluster.
+
aws_access_key_id:
+
An AWS access key granted to the bucket where the file is located. Not required
+
if keys are stored as environmental variables.
+
aws_secret_access_key:
+
An AWS secret access key granted to the bucket where the file is located. Not
+
required if keys are stored as environmental variables.
+
""" # NOQA W605
+
+
# The sql query is provided between single quotes, therefore single
+
# quotes within the actual query must be escaped.
+
# https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html#unload-parameters
+
sql = sql.replace("'", "''")
+
+
statement = f"""
+
UNLOAD ('{sql}') to 's3://{bucket}/{key_prefix}' \n
+
{self.get_creds(aws_access_key_id, aws_secret_access_key)} \n
+
PARALLEL {parallel} \n
+
MAXFILESIZE {max_file_size}
+
"""
+
if manifest:
+
statement += "MANIFEST \n"
+
if header:
+
statement += "HEADER \n"
+
if delimiter:
+
statement += f"DELIMITER as '{delimiter}' \n"
+
if compression:
+
statement += f"{compression.upper()} \n"
+
if add_quotes:
+
statement += "ADDQUOTES \n"
+
if null_as:
+
statement += f"NULL {null_as} \n"
+
if escape:
+
statement += "ESCAPE \n"
+
if allow_overwrite:
+
statement += "ALLOWOVERWRITE \n"
+
if aws_region:
+
statement += f"REGION {aws_region} \n"
+
+
logger.info(f'Unloading data to s3://{bucket}/{key_prefix}')
+
logger.debug(statement)
+
+
return self.query(statement)
+
+
def generate_manifest(self, buckets, aws_access_key_id=None, aws_secret_access_key=None,
+
mandatory=True, prefix=None, manifest_bucket=None, manifest_key=None,
+
path=None):
+
"""
+
Given a list of S3 buckets, generate a manifest file (JSON format). A manifest file
+
allows you to copy multiple files into a single table at once. Once the manifest is
+
generated, you can pass it with the :func:`~parsons.redshift.Redshift.copy_s3` method.
+
+
AWS keys are not required if ``AWS_ACCESS_KEY_ID`` and
+
``AWS_SECRET_ACCESS_KEY`` environmental variables set.
+
+
`Args:`
+
+
buckets: list or str
+
A list of buckets or single bucket from which to generate manifest
+
aws_access_key_id: str
+
AWS access key id to access S3 bucket
+
aws_secret_access_key: str
+
AWS secret access key to access S3 bucket
+
mandatory: boolean
+
The mandatory flag indicates whether the Redshift COPY should
+
terminate if the file does not exist.
+
prefix: str
+
Optional filter for key prefixes
+
manifest_bucket: str
+
Optional bucket to write manifest file.
+
manifest_key: str
+
Optional key name for S3 bucket to write file
+
+
`Returns:`
+
``dict`` of manifest
+
"""
+
+
from parsons.aws import S3
+
s3 = S3(aws_access_key_id=aws_access_key_id,
+
aws_secret_access_key=aws_secret_access_key)
+
+
# Deal with a single bucket being passed, rather than list.
+
if isinstance(buckets, str):
+
buckets = [buckets]
+
+
# Generate manifest file
+
manifest = {'entries': []}
+
for bucket in buckets:
+
+
# Retrieve list of files in bucket
+
key_list = s3.list_keys(bucket, prefix=prefix)
+
for key in key_list:
+
manifest['entries'].append({
+
'url': '/'.join(['s3:/', bucket, key]),
+
'mandatory': mandatory
+
})
+
+
logger.info('Manifest generated.')
+
+
# Save the file to s3 bucket if provided
+
if manifest_key and manifest_bucket:
+
# Dump the manifest to a temp JSON file
+
manifest_path = files.create_temp_file()
+
with open(manifest_path, 'w') as manifest_file_obj:
+
json.dump(manifest, manifest_file_obj, sort_keys=True, indent=4)
+
+
# Upload the file to S3
+
s3.put_file(manifest_bucket, manifest_key, manifest_path)
+
+
logger.info(f'Manifest saved to s3://{manifest_bucket}/{manifest_key}')
+
+
return manifest
+
+
def upsert(self, table_obj, target_table, primary_key, vacuum=True, distinct_check=True,
+
cleanup_temp_table=True, alter_table=True, **copy_args):
+
"""
+
Preform an upsert on an existing table. An upsert is a function in which records
+
in a table are updated and inserted at the same time. Unlike other SQL databases,
+
it does not exist natively in Redshift.
+
+
`Args:`
+
table_obj: obj
+
A Parsons table object
+
target_table: str
+
The schema and table name to upsert
+
primary_key: str or list
+
The primary key column(s) of the target table
+
vacuum: boolean
+
Re-sorts rows and reclaims space in the specified table. You must be a table owner
+
or super user to effectively vacuum a table, however the method will not fail
+
if you lack these priviledges.
+
distinct_check: boolean
+
Check if the primary key column is distinct. Raise error if not.
+
cleanup_temp_table: boolean
+
A temp table is dropped by default on cleanup. You can set to False for debugging.
+
\**copy_args: kwargs
+
See :func:`~parsons.databases.Redshift.copy`` for options.
+
""" # noqa: W605
+
+
if not self.table_exists(target_table):
+
logger.info('Target table does not exist. Copying into newly \
+
created target table.')
+
self.copy(table_obj, target_table)
+
return None
+
+
if alter_table:
+
# Make target table column widths match incoming table, if necessary
+
self.alter_varchar_column_widths(table_obj, target_table)
+
+
noise = f'{random.randrange(0, 10000):04}'[:4]
+
date_stamp = datetime.datetime.now().strftime('%Y%m%d_%H%M')
+
# Generate a temp table like "table_tmp_20200210_1230_14212"
+
staging_tbl = '{}_stg_{}_{}'.format(target_table, date_stamp, noise)
+
+
if isinstance(primary_key, str):
+
primary_keys = [primary_key]
+
else:
+
primary_keys = primary_key
+
+
if distinct_check:
+
primary_keys_statement = ', '.join(primary_keys)
+
diff = self.query(f'''
+
select (
+
select count(*)
+
from {target_table}
+
) - (
+
SELECT COUNT(*) from (
+
select distinct {primary_keys_statement}
+
from {target_table}
+
)
+
) as total_count
+
''').first
+
if diff > 0:
+
raise ValueError('Primary key column contains duplicate values.')
+
+
with self.connection() as connection:
+
+
try:
+
# Copy to a staging table
+
logger.info(f'Building staging table: {staging_tbl}')
+
if 'compupdate' not in copy_args:
+
# Especially with a lot of columns, compupdate=True can
+
# cause a lot of processing/analysis by Redshift before upload.
+
# Since this is a temporary table, setting compression for each
+
# column is not impactful barely impactful
+
# https://docs.aws.amazon.com/redshift/latest/dg/c_Loading_tables_auto_compress.html
+
copy_args = dict(copy_args, compupdate=False)
+
self.copy(table_obj, staging_tbl,
+
template_table=target_table,
+
alter_table=False, # We just did our own alter table above
+
**copy_args)
+
+
staging_table_name = staging_tbl.split('.')[1]
+
target_table_name = target_table.split('.')[1]
+
+
# Delete rows
+
comparisons = [
+
f'{staging_table_name}.{primary_key} = {target_table_name}.{primary_key}'
+
for primary_key in primary_keys
+
]
+
where_clause = ' and '.join(comparisons)
+
+
sql = f"""
+
DELETE FROM {target_table}
+
USING {staging_tbl}
+
WHERE {where_clause}
+
"""
+
self.query_with_connection(sql, connection, commit=False)
+
logger.debug(f'Target rows deleted from {target_table}.')
+
+
# Insert rows
+
# ALTER TABLE APPEND would be more efficient, but you can't run it in a
+
# transaction block. It's worth the performance hit to not commit until the
+
# end.
+
sql = f"""
+
INSERT INTO {target_table}
+
SELECT * FROM {staging_tbl};
+
"""
+
+
self.query_with_connection(sql, connection, commit=False)
+
logger.info(f'Target rows inserted to {target_table}')
+
+
finally:
+
if cleanup_temp_table:
+
# Drop the staging table
+
self.query_with_connection(f"DROP TABLE IF EXISTS {staging_tbl};",
+
connection, commit=False)
+
logger.info(f'{staging_tbl} staging table dropped.')
+
+
# Vacuum table. You must commit when running this type of transaction.
+
if vacuum:
+
with self.connection() as connection:
+
connection.set_session(autocommit=True)
+
self.query_with_connection(f'VACUUM {target_table};', connection)
+
logger.info(f'{target_table} vacuumed.')
+
+
def alter_varchar_column_widths(self, tbl, table_name):
+
"""
+
Alter the width of a varchar columns in a Redshift table to match the widths
+
of a Parsons table. The columns are matched by column name and not their
+
index.
+
+
`Args:`
+
tbl: obj
+
A Parsons table
+
table_name:
+
The target table name (e.g. ``my_schema.my_table``)
+
`Returns:`
+
``None``
+
"""
+
+
# Make the Parsons table column names match valid Redshift names
+
tbl.table = petl.setheader(tbl.table, self.column_name_validate(tbl.columns))
+
+
# Create a list of column names and max width for string values.
+
pc = {c: tbl.get_column_max_width(c) for c in tbl.columns}
+
+
# Determine the max width of the varchar columns in the Redshift table
+
s, t = self.split_full_table_name(table_name)
+
cols = self.get_columns(s, t)
+
rc = {k: v['max_length'] for k, v in cols.items() if v['data_type'] == 'character varying'} # noqa: E501, E261
+
+
# Figure out if any of the destination table varchar columns are smaller than the
+
# associated Parsons table columns. If they are, then alter column types to expand
+
# their width.
+
for c in set(rc.keys()).intersection(set(pc.keys())):
+
if rc[c] < pc[c]:
+
logger.info(f'{c} not wide enough. Expanding column width.')
+
self.alter_table_column_type(table_name, c, 'varchar', varchar_width=pc[c])
+
+
def alter_table_column_type(self, table_name, column_name, data_type, varchar_width=None):
+
"""
+
Alter a column type of an existing table.
+
+
table_name: str
+
The table name (ex. ``my_schema.my_table``).
+
column_name: str
+
The target column name
+
data_type: str
+
A valid Redshift data type to alter the table to.
+
varchar_width:
+
The new width of the column if of type varchar.
+
"""
+
+
sql = f"ALTER TABLE {table_name} ALTER COLUMN {column_name} TYPE {data_type}"
+
+
if varchar_width:
+
sql += f"({varchar_width})"
+
+
with self.connection() as connection:
+
connection.set_session(autocommit=True)
+
self.query_with_connection(sql, connection)
+
logger.info(f'Altered {table_name} {column_name}.')
+
+
def table(self, table_name):
+
# Return a Redshift table object
+
+
return RedshiftTable(self, table_name)
+
+
+class RedshiftTable(BaseTable):
+ # Redshift table object.
+
+ pass
+