Skip to content

Commit

Permalink
Merge pull request #7952 from jorisvandenbossche/sql-schema
Browse files Browse the repository at this point in the history
ENH: add schema support to sql functions
  • Loading branch information
jorisvandenbossche committed Aug 31, 2014
2 parents f505097 + 1e90ba3 commit 5ef3cc3
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 55 deletions.
2 changes: 1 addition & 1 deletion ci/requirements-2.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pytz==2013b
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
html5lib==1.0b2
numexpr==1.4.2
sqlalchemy==0.7.1
sqlalchemy==0.7.4
pymysql==0.6.0
psycopg2==2.5
scipy==0.11.0
Expand Down
14 changes: 14 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3320,6 +3320,20 @@ to pass to :func:`pandas.to_datetime`:
You can check if a table exists using :func:`~pandas.io.sql.has_table`

Schema support
~~~~~~~~~~~~~~

.. versionadded:: 0.15.0

Reading from and writing to different schema's is supported through the ``schema``
keyword in the :func:`~pandas.read_sql_table` and :func:`~pandas.DataFrame.to_sql`
functions. Note however that this depends on the database flavor (sqlite does not
have schema's). For example:

.. code-block:: python
df.to_sql('table', engine, schema='other_schema')
pd.read_sql_table('table', engine, schema='other_schema')
Querying
~~~~~~~~
Expand Down
7 changes: 7 additions & 0 deletions doc/source/v0.15.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ Enhancements

- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)
- Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`).
- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`).
For example:

.. code-block:: python

df.to_sql('table', engine, schema='other_schema')
pd.read_sql_table('table', engine, schema='other_schema')

- Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`)

Expand Down
11 changes: 7 additions & 4 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,8 @@ def to_msgpack(self, path_or_buf=None, **kwargs):
from pandas.io import packers
return packers.to_msgpack(path_or_buf, self, **kwargs)

def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None, chunksize=None):
def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail',
index=True, index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.
Expand All @@ -932,6 +932,9 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Specify the schema (if database flavor supports this). If None, use
default schema.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
Expand All @@ -949,8 +952,8 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
"""
from pandas.io import sql
sql.to_sql(
self, name, con, flavor=flavor, if_exists=if_exists, index=index,
index_label=index_label, chunksize=chunksize)
self, name, con, flavor=flavor, schema=schema, if_exists=if_exists,
index=index, index_label=index_label, chunksize=chunksize)

def to_pickle(self, path):
"""
Expand Down
100 changes: 59 additions & 41 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _is_sqlalchemy_engine(con):
try:
import sqlalchemy
_SQLALCHEMY_INSTALLED = True

from distutils.version import LooseVersion
ver = LooseVersion(sqlalchemy.__version__)
# For sqlalchemy versions < 0.8.2, the BIGINT type is recognized
Expand All @@ -47,7 +47,7 @@ def _is_sqlalchemy_engine(con):
if ver < '0.8.2':
from sqlalchemy import BigInteger
from sqlalchemy.ext.compiler import compiles

@compiles(BigInteger, 'sqlite')
def compile_big_int_sqlite(type_, compiler, **kw):
return 'INTEGER'
Expand Down Expand Up @@ -145,7 +145,7 @@ def _safe_fetch(cur):
if not isinstance(result, list):
result = list(result)
return result
except Exception as e: # pragma: no cover
except Exception as e: # pragma: no cover
excName = e.__class__.__name__
if excName == 'OperationalError':
return []
Expand Down Expand Up @@ -187,7 +187,7 @@ def tquery(sql, con=None, cur=None, retry=True):
con.commit()
except Exception as e:
excName = e.__class__.__name__
if excName == 'OperationalError': # pragma: no cover
if excName == 'OperationalError': # pragma: no cover
print('Failed to commit, may need to restart interpreter')
else:
raise
Expand All @@ -199,7 +199,7 @@ def tquery(sql, con=None, cur=None, retry=True):
if result and len(result[0]) == 1:
# python 3 compat
result = list(lzip(*result)[0])
elif result is None: # pragma: no cover
elif result is None: # pragma: no cover
result = []

return result
Expand Down Expand Up @@ -253,8 +253,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None):
#------------------------------------------------------------------------------
#--- Read and write to DataFrames

def read_sql_table(table_name, con, index_col=None, coerce_float=True,
parse_dates=None, columns=None):
def read_sql_table(table_name, con, schema=None, index_col=None,
coerce_float=True, parse_dates=None, columns=None):
"""Read SQL database table into a DataFrame.
Given a table name and an SQLAlchemy engine, returns a DataFrame.
Expand All @@ -266,6 +266,9 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True,
Name of SQL table in database
con : SQLAlchemy engine
Sqlite DBAPI connection mode not supported
schema : string, default None
Name of SQL schema in database to query (if database flavor supports this).
If None, use default schema (default).
index_col : string, optional
Column to set as index
coerce_float : boolean, default True
Expand Down Expand Up @@ -298,7 +301,7 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True,
"SQLAlchemy engines.")
import sqlalchemy
from sqlalchemy.schema import MetaData
meta = MetaData(con)
meta = MetaData(con, schema=schema)
try:
meta.reflect(only=[table_name])
except sqlalchemy.exc.InvalidRequestError:
Expand Down Expand Up @@ -437,8 +440,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
coerce_float=coerce_float, parse_dates=parse_dates)


def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None, chunksize=None):
def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
index=True, index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.
Expand All @@ -455,6 +458,9 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Name of SQL schema in database to write to (if database flavor supports
this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
Expand All @@ -473,18 +479,19 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
if if_exists not in ('fail', 'replace', 'append'):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

pandas_sql = pandasSQL_builder(con, flavor=flavor)
pandas_sql = pandasSQL_builder(con, schema=schema, flavor=flavor)

if isinstance(frame, Series):
frame = frame.to_frame()
elif not isinstance(frame, DataFrame):
raise NotImplementedError

pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
index_label=index_label, chunksize=chunksize)
index_label=index_label, schema=schema,
chunksize=chunksize)


def has_table(table_name, con, flavor='sqlite'):
def has_table(table_name, con, flavor='sqlite', schema=None):
"""
Check if DataBase has named table.
Expand All @@ -500,12 +507,15 @@ def has_table(table_name, con, flavor='sqlite'):
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Name of SQL schema in database to write to (if database flavor supports
this). If None, use default schema (default).
Returns
-------
boolean
"""
pandas_sql = pandasSQL_builder(con, flavor=flavor)
pandas_sql = pandasSQL_builder(con, flavor=flavor, schema=schema)
return pandas_sql.has_table(table_name)

table_exists = has_table
Expand All @@ -515,15 +525,15 @@ def has_table(table_name, con, flavor='sqlite'):
"and will be removed in future versions. "
"MySQL will be further supported with SQLAlchemy engines.")

def pandasSQL_builder(con, flavor=None, meta=None, is_cursor=False):
def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False):
"""
Convenience function to return the correct PandasSQL subclass based on the
provided parameters
"""
# When support for DBAPI connections is removed,
# is_cursor should not be necessary.
if _is_sqlalchemy_engine(con):
return PandasSQLAlchemy(con, meta=meta)
return PandasSQLAlchemy(con, schema=schema, meta=meta)
else:
if flavor == 'mysql':
warnings.warn(_MYSQL_WARNING, FutureWarning)
Expand All @@ -540,24 +550,26 @@ class PandasSQLTable(PandasObject):
"""
# TODO: support for multiIndex
def __init__(self, name, pandas_sql_engine, frame=None, index=True,
if_exists='fail', prefix='pandas', index_label=None):
if_exists='fail', prefix='pandas', index_label=None,
schema=None):
self.name = name
self.pd_sql = pandas_sql_engine
self.prefix = prefix
self.frame = frame
self.index = self._index_name(index, index_label)
self.schema = schema

if frame is not None:
# We want to write a frame
if self.pd_sql.has_table(self.name):
if self.pd_sql.has_table(self.name, self.schema):
if if_exists == 'fail':
raise ValueError("Table '%s' already exists." % name)
elif if_exists == 'replace':
self.pd_sql.drop_table(self.name)
self.pd_sql.drop_table(self.name, self.schema)
self.table = self._create_table_statement()
self.create()
elif if_exists == 'append':
self.table = self.pd_sql.get_table(self.name)
self.table = self.pd_sql.get_table(self.name, self.schema)
if self.table is None:
self.table = self._create_table_statement()
else:
Expand All @@ -568,13 +580,13 @@ def __init__(self, name, pandas_sql_engine, frame=None, index=True,
self.create()
else:
# no data provided, read-only mode
self.table = self.pd_sql.get_table(self.name)
self.table = self.pd_sql.get_table(self.name, self.schema)

if self.table is None:
raise ValueError("Could not init table '%s'" % name)

def exists(self):
return self.pd_sql.has_table(self.name)
return self.pd_sql.has_table(self.name, self.schema)

def sql_schema(self):
from sqlalchemy.schema import CreateTable
Expand Down Expand Up @@ -709,7 +721,7 @@ def _create_table_statement(self):
columns = [Column(name, typ)
for name, typ in column_names_and_types]

return Table(self.name, self.pd_sql.meta, *columns)
return Table(self.name, self.pd_sql.meta, *columns, schema=self.schema)

def _harmonize_columns(self, parse_dates=None):
""" Make a data_frame's column type align with an sql_table
Expand Down Expand Up @@ -830,11 +842,11 @@ class PandasSQLAlchemy(PandasSQL):
using SQLAlchemy to handle DataBase abstraction
"""

def __init__(self, engine, meta=None):
def __init__(self, engine, schema=None, meta=None):
self.engine = engine
if not meta:
from sqlalchemy.schema import MetaData
meta = MetaData(self.engine)
meta = MetaData(self.engine, schema=schema)

self.meta = meta

Expand All @@ -843,9 +855,10 @@ def execute(self, *args, **kwargs):
return self.engine.execute(*args, **kwargs)

def read_table(self, table_name, index_col=None, coerce_float=True,
parse_dates=None, columns=None):
parse_dates=None, columns=None, schema=None):

table = PandasSQLTable(table_name, self, index=index_col)
table = PandasSQLTable(
table_name, self, index=index_col, schema=schema)
return table.read(coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)

Expand All @@ -868,26 +881,31 @@ def read_sql(self, sql, index_col=None, coerce_float=True,
return data_frame

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, chunksize=None):
index_label=None, schema=None, chunksize=None):
table = PandasSQLTable(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label)
index_label=index_label, schema=schema)
table.insert(chunksize)

@property
def tables(self):
return self.meta.tables

def has_table(self, name):
return self.engine.has_table(name)
def has_table(self, name, schema=None):
return self.engine.has_table(name, schema or self.meta.schema)

def get_table(self, table_name):
return self.meta.tables.get(table_name)
def get_table(self, table_name, schema=None):
schema = schema or self.meta.schema
if schema:
return self.meta.tables.get('.'.join([schema, table_name]))
else:
return self.meta.tables.get(table_name)

def drop_table(self, table_name):
if self.engine.has_table(table_name):
self.meta.reflect(only=[table_name])
self.get_table(table_name).drop()
def drop_table(self, table_name, schema=None):
schema = schema or self.meta.schema
if self.engine.has_table(table_name, schema):
self.meta.reflect(only=[table_name], schema=schema)
self.get_table(table_name, schema).drop()
self.meta.clear()

def _create_sql_schema(self, frame, table_name):
Expand Down Expand Up @@ -1113,7 +1131,7 @@ def _fetchall_as_list(self, cur):
return result

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, chunksize=None):
index_label=None, schema=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.
Expand All @@ -1133,7 +1151,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=index_label)
table.insert(chunksize)

def has_table(self, name):
def has_table(self, name, schema=None):
flavor_map = {
'sqlite': ("SELECT name FROM sqlite_master "
"WHERE type='table' AND name='%s';") % name,
Expand All @@ -1142,10 +1160,10 @@ def has_table(self, name):

return len(self.execute(query).fetchall()) > 0

def get_table(self, table_name):
def get_table(self, table_name, schema=None):
return None # not supported in Legacy mode

def drop_table(self, name):
def drop_table(self, name, schema=None):
drop_sql = "DROP TABLE %s" % name
self.execute(drop_sql)

Expand Down
Loading

0 comments on commit 5ef3cc3

Please sign in to comment.