Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
remove database_name from stream-id (#29)
Browse files Browse the repository at this point in the history
* remove database_name from stream-id
* normalize time values to be between 00:00:00 and 23:59:59 and convert to UTC
* bump version
  • Loading branch information
louis-pie authored Oct 14, 2019
1 parent 92bcb15 commit ca03971
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 9 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-postgres',
version='1.1.7',
version='1.1.8',
description='Singer.io tap for extracting data from PostgreSQL - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand All @@ -17,7 +17,7 @@
'Programming Language :: Python :: 3 :: Only'
],
install_requires=[
'singer-python==5.3.1',
'singer-python==5.8.1',
'psycopg2==2.8.2',
'strict-rfc3339==0.7',
'nose==1.3.7'
Expand Down
4 changes: 2 additions & 2 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def discover_columns(connection, table_info):
entry = {'table_name' : table_name,
'stream' : table_name,
'metadata' : metadata.to_list(mdata),
'tap_stream_id' : post_db.compute_tap_stream_id(database_name, schema_name, table_name),
'tap_stream_id' : post_db.compute_tap_stream_id(schema_name, table_name),
'schema' : schema}

entries.append(entry)
Expand Down Expand Up @@ -413,7 +413,7 @@ def do_discovery(conn_config):
all_streams = []

with post_db.open_connection(conn_config) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='pipelinewise') as cur:
cur.itersize = post_db.cursor_iter_size
sql = """SELECT datname
FROM pg_database
Expand Down
25 changes: 23 additions & 2 deletions tap_postgres/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import pytz
from dateutil.parser import parse
import decimal
import math
import psycopg2
Expand Down Expand Up @@ -52,6 +54,25 @@ def selected_value_to_singer_value_impl(elem, sql_datatype):
cleaned_elem = elem
elif sql_datatype == 'money':
cleaned_elem = elem
elif sql_datatype == 'time with time zone':
'''time with time zone values will be converted to UTC and time zone dropped'''
# Replace hour=24 with hour=0
elem = str(elem)
if elem.startswith('24'): elem = elem.replace('24','00',1)
# convert to UTC
elem = datetime.datetime.strptime(elem, '%H:%M:%S%z')
if elem.utcoffset() != datetime.timedelta(seconds=0):
LOGGER.warning('time with time zone values are converted to UTC')
elem = elem.astimezone(pytz.utc)
# drop time zone
elem = str(elem.strftime('%H:%M:%S'))
cleaned_elem = parse(elem).isoformat().split('T')[1]
elif sql_datatype == 'time without time zone':
# Replace hour=24 with hour=0
elem = str(elem)
if elem.startswith('24'):
elem = elem.replace('24','00',1)
cleaned_elem = parse(elem).isoformat().split('T')[1]
elif isinstance(elem, datetime.datetime):
if sql_datatype == 'timestamp with time zone':
cleaned_elem = elem.isoformat()
Expand Down Expand Up @@ -132,8 +153,8 @@ def hstore_available(conn_info):
return False


def compute_tap_stream_id(database_name, schema_name, table_name):
return database_name + '-' + schema_name + '-' + table_name
def compute_tap_stream_id(schema_name, table_name):
return schema_name + '-' + table_name


#NB> numeric/decimal columns in postgres without a specified scale && precision
Expand Down
5 changes: 3 additions & 2 deletions tap_postgres/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

LOGGER = singer.get_logger()

UPDATE_BOOKMARK_PERIOD = 1000
UPDATE_BOOKMARK_PERIOD = 10000

def fetch_max_replication_key(conn_config, replication_key, schema_name, table_name):
with post_db.open_connection(conn_config, False) as conn:
Expand Down Expand Up @@ -70,7 +70,7 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):
else:
LOGGER.info("hstore is UNavailable")

with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='stitch_cursor') as cur:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor, name='pipelinewise') as cur:
cur.itersize = post_db.cursor_iter_size
LOGGER.info("Beginning new incremental replication sync %s", stream_version)
if replication_key_value:
Expand All @@ -96,6 +96,7 @@ def sync_table(conn_info, stream, state, desired_columns, md_map):

for rec in cur:
record_message = post_db.selected_row_to_singer_message(stream, rec, stream_version, desired_columns, time_extracted, md_map)

singer.write_message(record_message)
rows_saved = rows_saved + 1

Expand Down
2 changes: 1 addition & 1 deletion tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):
for s in streams:
streams_lookup[s['tap_stream_id']] = s

tap_stream_id = post_db.compute_tap_stream_id(conn_info['dbname'], payload['schema'], payload['table'])
tap_stream_id = post_db.compute_tap_stream_id(payload['schema'], payload['table'])
if streams_lookup.get(tap_stream_id) is None:
return state

Expand Down

0 comments on commit ca03971

Please sign in to comment.