diff --git a/ci/requirements-2.6.txt b/ci/requirements-2.6.txt index fec0a96a3d077..baba82f588ed6 100644 --- a/ci/requirements-2.6.txt +++ b/ci/requirements-2.6.txt @@ -5,7 +5,7 @@ pytz==2013b http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz html5lib==1.0b2 numexpr==1.4.2 -sqlalchemy==0.7.1 +sqlalchemy==0.7.4 pymysql==0.6.0 psycopg2==2.5 scipy==0.11.0 diff --git a/doc/source/io.rst b/doc/source/io.rst index ad0a5bb3b67c9..d60fc234650e0 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3320,6 +3320,20 @@ to pass to :func:`pandas.to_datetime`: You can check if a table exists using :func:`~pandas.io.sql.has_table` +Schema support +~~~~~~~~~~~~~~ + +.. versionadded:: 0.15.0 + +Reading from and writing to different schema's is supported through the ``schema`` +keyword in the :func:`~pandas.read_sql_table` and :func:`~pandas.DataFrame.to_sql` +functions. Note however that this depends on the database flavor (sqlite does not +have schema's). For example: + +.. code-block:: python + + df.to_sql('table', engine, schema='other_schema') + pd.read_sql_table('table', engine, schema='other_schema') Querying ~~~~~~~~ diff --git a/doc/source/v0.15.0.txt b/doc/source/v0.15.0.txt index bf959141e29b2..9bfd3dee8c40a 100644 --- a/doc/source/v0.15.0.txt +++ b/doc/source/v0.15.0.txt @@ -430,6 +430,13 @@ Enhancements - Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`) - Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`). +- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`). + For example: + +.. code-block:: python + + df.to_sql('table', engine, schema='other_schema') + pd.read_sql_table('table', engine, schema='other_schema') - Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index d56095b6300a4..42814c7eca4a4 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -915,8 +915,8 @@ def to_msgpack(self, path_or_buf=None, **kwargs): from pandas.io import packers return packers.to_msgpack(path_or_buf, self, **kwargs) - def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True, - index_label=None, chunksize=None): + def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail', + index=True, index_label=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -932,6 +932,9 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True, The flavor of SQL to use. Ignored when using SQLAlchemy engine. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy engines. + schema : string, default None + Specify the schema (if database flavor supports this). If None, use + default schema. if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. @@ -949,8 +952,8 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True, """ from pandas.io import sql sql.to_sql( - self, name, con, flavor=flavor, if_exists=if_exists, index=index, - index_label=index_label, chunksize=chunksize) + self, name, con, flavor=flavor, schema=schema, if_exists=if_exists, + index=index, index_label=index_label, chunksize=chunksize) def to_pickle(self, path): """ diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 434fc409f671b..b72c41e45c9ca 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -38,7 +38,7 @@ def _is_sqlalchemy_engine(con): try: import sqlalchemy _SQLALCHEMY_INSTALLED = True - + from distutils.version import LooseVersion ver = LooseVersion(sqlalchemy.__version__) # For sqlalchemy versions < 0.8.2, the BIGINT type is recognized @@ -47,7 +47,7 @@ def _is_sqlalchemy_engine(con): if ver < '0.8.2': from sqlalchemy import BigInteger from sqlalchemy.ext.compiler import compiles - + @compiles(BigInteger, 'sqlite') def compile_big_int_sqlite(type_, compiler, **kw): return 'INTEGER' @@ -145,7 +145,7 @@ def _safe_fetch(cur): if not isinstance(result, list): result = list(result) return result - except Exception as e: # pragma: no cover + except Exception as e: # pragma: no cover excName = e.__class__.__name__ if excName == 'OperationalError': return [] @@ -187,7 +187,7 @@ def tquery(sql, con=None, cur=None, retry=True): con.commit() except Exception as e: excName = e.__class__.__name__ - if excName == 'OperationalError': # pragma: no cover + if excName == 'OperationalError': # pragma: no cover print('Failed to commit, may need to restart interpreter') else: raise @@ -199,7 +199,7 @@ def tquery(sql, con=None, cur=None, retry=True): if result and len(result[0]) == 1: # python 3 compat result = list(lzip(*result)[0]) - elif result is None: # pragma: no cover + elif result is None: # pragma: no cover result = [] return result @@ -253,8 +253,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None): #------------------------------------------------------------------------------ #--- Read and write to DataFrames -def read_sql_table(table_name, con, index_col=None, coerce_float=True, - parse_dates=None, columns=None): +def read_sql_table(table_name, con, schema=None, index_col=None, + coerce_float=True, parse_dates=None, columns=None): """Read SQL database table into a DataFrame. Given a table name and an SQLAlchemy engine, returns a DataFrame. @@ -266,6 +266,9 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True, Name of SQL table in database con : SQLAlchemy engine Sqlite DBAPI connection mode not supported + schema : string, default None + Name of SQL schema in database to query (if database flavor supports this). + If None, use default schema (default). index_col : string, optional Column to set as index coerce_float : boolean, default True @@ -298,7 +301,7 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True, "SQLAlchemy engines.") import sqlalchemy from sqlalchemy.schema import MetaData - meta = MetaData(con) + meta = MetaData(con, schema=schema) try: meta.reflect(only=[table_name]) except sqlalchemy.exc.InvalidRequestError: @@ -437,8 +440,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, coerce_float=coerce_float, parse_dates=parse_dates) -def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, - index_label=None, chunksize=None): +def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', + index=True, index_label=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -455,6 +458,9 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, The flavor of SQL to use. Ignored when using SQLAlchemy engine. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy engines. + schema : string, default None + Name of SQL schema in database to write to (if database flavor supports + this). If None, use default schema (default). if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. @@ -473,7 +479,7 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, if if_exists not in ('fail', 'replace', 'append'): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) - pandas_sql = pandasSQL_builder(con, flavor=flavor) + pandas_sql = pandasSQL_builder(con, schema=schema, flavor=flavor) if isinstance(frame, Series): frame = frame.to_frame() @@ -481,10 +487,11 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, raise NotImplementedError pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, - index_label=index_label, chunksize=chunksize) + index_label=index_label, schema=schema, + chunksize=chunksize) -def has_table(table_name, con, flavor='sqlite'): +def has_table(table_name, con, flavor='sqlite', schema=None): """ Check if DataBase has named table. @@ -500,12 +507,15 @@ def has_table(table_name, con, flavor='sqlite'): The flavor of SQL to use. Ignored when using SQLAlchemy engine. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy engines. + schema : string, default None + Name of SQL schema in database to write to (if database flavor supports + this). If None, use default schema (default). Returns ------- boolean """ - pandas_sql = pandasSQL_builder(con, flavor=flavor) + pandas_sql = pandasSQL_builder(con, flavor=flavor, schema=schema) return pandas_sql.has_table(table_name) table_exists = has_table @@ -515,7 +525,7 @@ def has_table(table_name, con, flavor='sqlite'): "and will be removed in future versions. " "MySQL will be further supported with SQLAlchemy engines.") -def pandasSQL_builder(con, flavor=None, meta=None, is_cursor=False): +def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False): """ Convenience function to return the correct PandasSQL subclass based on the provided parameters @@ -523,7 +533,7 @@ def pandasSQL_builder(con, flavor=None, meta=None, is_cursor=False): # When support for DBAPI connections is removed, # is_cursor should not be necessary. if _is_sqlalchemy_engine(con): - return PandasSQLAlchemy(con, meta=meta) + return PandasSQLAlchemy(con, schema=schema, meta=meta) else: if flavor == 'mysql': warnings.warn(_MYSQL_WARNING, FutureWarning) @@ -540,24 +550,26 @@ class PandasSQLTable(PandasObject): """ # TODO: support for multiIndex def __init__(self, name, pandas_sql_engine, frame=None, index=True, - if_exists='fail', prefix='pandas', index_label=None): + if_exists='fail', prefix='pandas', index_label=None, + schema=None): self.name = name self.pd_sql = pandas_sql_engine self.prefix = prefix self.frame = frame self.index = self._index_name(index, index_label) + self.schema = schema if frame is not None: # We want to write a frame - if self.pd_sql.has_table(self.name): + if self.pd_sql.has_table(self.name, self.schema): if if_exists == 'fail': raise ValueError("Table '%s' already exists." % name) elif if_exists == 'replace': - self.pd_sql.drop_table(self.name) + self.pd_sql.drop_table(self.name, self.schema) self.table = self._create_table_statement() self.create() elif if_exists == 'append': - self.table = self.pd_sql.get_table(self.name) + self.table = self.pd_sql.get_table(self.name, self.schema) if self.table is None: self.table = self._create_table_statement() else: @@ -568,13 +580,13 @@ def __init__(self, name, pandas_sql_engine, frame=None, index=True, self.create() else: # no data provided, read-only mode - self.table = self.pd_sql.get_table(self.name) + self.table = self.pd_sql.get_table(self.name, self.schema) if self.table is None: raise ValueError("Could not init table '%s'" % name) def exists(self): - return self.pd_sql.has_table(self.name) + return self.pd_sql.has_table(self.name, self.schema) def sql_schema(self): from sqlalchemy.schema import CreateTable @@ -709,7 +721,7 @@ def _create_table_statement(self): columns = [Column(name, typ) for name, typ in column_names_and_types] - return Table(self.name, self.pd_sql.meta, *columns) + return Table(self.name, self.pd_sql.meta, *columns, schema=self.schema) def _harmonize_columns(self, parse_dates=None): """ Make a data_frame's column type align with an sql_table @@ -830,11 +842,11 @@ class PandasSQLAlchemy(PandasSQL): using SQLAlchemy to handle DataBase abstraction """ - def __init__(self, engine, meta=None): + def __init__(self, engine, schema=None, meta=None): self.engine = engine if not meta: from sqlalchemy.schema import MetaData - meta = MetaData(self.engine) + meta = MetaData(self.engine, schema=schema) self.meta = meta @@ -843,9 +855,10 @@ def execute(self, *args, **kwargs): return self.engine.execute(*args, **kwargs) def read_table(self, table_name, index_col=None, coerce_float=True, - parse_dates=None, columns=None): + parse_dates=None, columns=None, schema=None): - table = PandasSQLTable(table_name, self, index=index_col) + table = PandasSQLTable( + table_name, self, index=index_col, schema=schema) return table.read(coerce_float=coerce_float, parse_dates=parse_dates, columns=columns) @@ -868,26 +881,31 @@ def read_sql(self, sql, index_col=None, coerce_float=True, return data_frame def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, chunksize=None): + index_label=None, schema=None, chunksize=None): table = PandasSQLTable( name, self, frame=frame, index=index, if_exists=if_exists, - index_label=index_label) + index_label=index_label, schema=schema) table.insert(chunksize) @property def tables(self): return self.meta.tables - def has_table(self, name): - return self.engine.has_table(name) + def has_table(self, name, schema=None): + return self.engine.has_table(name, schema or self.meta.schema) - def get_table(self, table_name): - return self.meta.tables.get(table_name) + def get_table(self, table_name, schema=None): + schema = schema or self.meta.schema + if schema: + return self.meta.tables.get('.'.join([schema, table_name])) + else: + return self.meta.tables.get(table_name) - def drop_table(self, table_name): - if self.engine.has_table(table_name): - self.meta.reflect(only=[table_name]) - self.get_table(table_name).drop() + def drop_table(self, table_name, schema=None): + schema = schema or self.meta.schema + if self.engine.has_table(table_name, schema): + self.meta.reflect(only=[table_name], schema=schema) + self.get_table(table_name, schema).drop() self.meta.clear() def _create_sql_schema(self, frame, table_name): @@ -1113,7 +1131,7 @@ def _fetchall_as_list(self, cur): return result def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, chunksize=None): + index_label=None, schema=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -1133,7 +1151,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True, index_label=index_label) table.insert(chunksize) - def has_table(self, name): + def has_table(self, name, schema=None): flavor_map = { 'sqlite': ("SELECT name FROM sqlite_master " "WHERE type='table' AND name='%s';") % name, @@ -1142,10 +1160,10 @@ def has_table(self, name): return len(self.execute(query).fetchall()) > 0 - def get_table(self, table_name): + def get_table(self, table_name, schema=None): return None # not supported in Legacy mode - def drop_table(self, name): + def drop_table(self, name, schema=None): drop_sql = "DROP TABLE %s" % name self.execute(drop_sql) diff --git a/pandas/io/tests/test_sql.py b/pandas/io/tests/test_sql.py index 0d55f4c1dbcd8..93c95169a60d1 100644 --- a/pandas/io/tests/test_sql.py +++ b/pandas/io/tests/test_sql.py @@ -28,7 +28,7 @@ from datetime import datetime, date, time -from pandas import DataFrame, Series, Index, MultiIndex, isnull +from pandas import DataFrame, Series, Index, MultiIndex, isnull, concat from pandas import date_range, to_datetime, to_timedelta import pandas.compat as compat from pandas.compat import StringIO, range, lrange, string_types @@ -457,12 +457,12 @@ def test_roundtrip(self): tm.assert_frame_equal(result, self.test_frame1) def test_roundtrip_chunksize(self): - sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn, + sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn, index=False, flavor='sqlite', chunksize=2) result = sql.read_sql_query( 'SELECT * FROM test_frame_roundtrip', con=self.conn) - tm.assert_frame_equal(result, self.test_frame1) + tm.assert_frame_equal(result, self.test_frame1) def test_execute_sql(self): # drop_sql = "DROP TABLE IF EXISTS test" # should already be done @@ -591,13 +591,13 @@ def test_to_sql_index_label_multiindex(self): index_label='C') def test_multiindex_roundtrip(self): - df = DataFrame.from_records([(1,2.1,'line1'), (2,1.5,'line2')], + df = DataFrame.from_records([(1,2.1,'line1'), (2,1.5,'line2')], columns=['A','B','C'], index=['A','B']) df.to_sql('test_multiindex_roundtrip', self.conn) - result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip', + result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip', self.conn, index_col=['A','B']) - tm.assert_frame_equal(df, result, check_index_type=True) + tm.assert_frame_equal(df, result, check_index_type=True) def test_integer_col_names(self): df = DataFrame([[1, 2], [3, 4]], columns=[0, 1]) @@ -1196,8 +1196,8 @@ class TestPostgreSQLAlchemy(_TestSQLAlchemy): flavor = 'postgresql' def connect(self): - return sqlalchemy.create_engine( - 'postgresql+{driver}://postgres@localhost/pandas_nosetest'.format(driver=self.driver)) + url = 'postgresql+{driver}://postgres@localhost/pandas_nosetest' + return sqlalchemy.create_engine(url.format(driver=self.driver)) def setup_driver(self): try: @@ -1213,6 +1213,61 @@ def tearDown(self): for table in c.fetchall(): self.conn.execute("DROP TABLE %s" % table[0]) + def test_schema_support(self): + # only test this for postgresql (schema's not supported in mysql/sqlite) + df = DataFrame({'col1':[1, 2], 'col2':[0.1, 0.2], 'col3':['a', 'n']}) + + # create a schema + self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;") + self.conn.execute("CREATE SCHEMA other;") + + # write dataframe to different schema's + df.to_sql('test_schema_public', self.conn, index=False) + df.to_sql('test_schema_public_explicit', self.conn, index=False, + schema='public') + df.to_sql('test_schema_other', self.conn, index=False, schema='other') + + # read dataframes back in + res1 = sql.read_sql_table('test_schema_public', self.conn) + tm.assert_frame_equal(df, res1) + res2 = sql.read_sql_table('test_schema_public_explicit', self.conn) + tm.assert_frame_equal(df, res2) + res3 = sql.read_sql_table('test_schema_public_explicit', self.conn, + schema='public') + tm.assert_frame_equal(df, res3) + res4 = sql.read_sql_table('test_schema_other', self.conn, + schema='other') + tm.assert_frame_equal(df, res4) + self.assertRaises(ValueError, sql.read_sql_table, 'test_schema_other', + self.conn, schema='public') + + ## different if_exists options + + # create a schema + self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;") + self.conn.execute("CREATE SCHEMA other;") + + # write dataframe with different if_exists options + df.to_sql('test_schema_other', self.conn, schema='other', index=False) + df.to_sql('test_schema_other', self.conn, schema='other', index=False, + if_exists='replace') + df.to_sql('test_schema_other', self.conn, schema='other', index=False, + if_exists='append') + res = sql.read_sql_table('test_schema_other', self.conn, schema='other') + tm.assert_frame_equal(concat([df, df], ignore_index=True), res) + + ## specifying schema in user-provided meta + + engine2 = self.connect() + meta = sqlalchemy.MetaData(engine2, schema='other') + pdsql = sql.PandasSQLAlchemy(engine2, meta=meta) + pdsql.to_sql(df, 'test_schema_other2', index=False) + pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='replace') + pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='append') + res1 = sql.read_sql_table('test_schema_other2', self.conn, schema='other') + res2 = pdsql.read_table('test_schema_other2') + tm.assert_frame_equal(res1, res2) + #------------------------------------------------------------------------------ #--- Test Sqlite / MySQL fallback @@ -1295,7 +1350,7 @@ def test_datetime_date(self): tm.assert_frame_equal(res, df.astype(str)) elif self.flavor == 'mysql': tm.assert_frame_equal(res, df) - + def test_datetime_time(self): # test support for datetime.time raise nose.SkipTest("datetime.time not supported for sqlite fallback")