Skip to content

Commit

Permalink
Merging in MySQL support #2482
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan authored and changhiskhan committed Jan 22, 2013
1 parent 3718bc3 commit a835118
Show file tree
Hide file tree
Showing 2 changed files with 409 additions and 93 deletions.
205 changes: 128 additions & 77 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Collection of query wrappers / abstractions to both facilitate data
retrieval and to reduce dependency on DB-specific API.
"""
from datetime import datetime
from datetime import datetime, date

import numpy as np
import traceback
Expand Down Expand Up @@ -158,99 +158,150 @@ def read_frame(sql, con, index_col=None, coerce_float=True):
frame_query = read_frame


def write_frame(frame, name=None, con=None, flavor='sqlite', append=False):
def write_frame(frame, name, con, flavor='sqlite', if_exists='fail', **kwargs):
"""
Write records stored in a DataFrame to SQLite. The index will currently be
dropped
Write records stored in a DataFrame to a SQL database.
Parameters
----------
frame: DataFrame
name: name of SQL table
conn: an open SQL database connection object
flavor: {'sqlite', 'mysql', 'oracle'}, default 'sqlite'
if_exists: {'fail', 'replace', 'append'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
"""
if flavor == 'sqlite':
schema = get_sqlite_schema(frame, name)
else:
raise NotImplementedError

if not append and not has_table(name, con):
con.execute(schema)
if 'append' in kwargs:
import warnings
warnings.warn("append is deprecated, use if_exists instead",
FutureWarning)
if kwargs['append']:
if_exists='append'
else:
if_exists='fail'
exists = table_exists(name, con, flavor)
if if_exists == 'fail' and exists:
raise ValueError, "Table '%s' already exists." % name

#create or drop-recreate if necessary
create = None
if exists and if_exists == 'replace':
create = "DROP TABLE %s" % name
elif not exists:
create = get_schema(frame, name, flavor)

if create is not None:
cur = con.cursor()
cur.execute(create)
cur.close()

cur = con.cursor()
# Replace spaces in DataFrame column names with _.
safe_names = [s.replace(' ', '_').strip() for s in frame.columns]
flavor_picker = {'sqlite' : _write_sqlite,
'mysql' : _write_mysql}

func = flavor_picker.get(flavor, None)
if func is None:
raise NotImplementedError
func(frame, name, safe_names, cur)
cur.close()
con.commit()

wildcards = ','.join(['?'] * len(frame.columns))
insert_sql = 'INSERT INTO %s VALUES (%s)' % (name, wildcards)
def _write_sqlite(frame, table, names, cur):
bracketed_names = ['[' + column + ']' for column in names]
col_names = ','.join(bracketed_names)
wildcards = ','.join(['?'] * len(names))
insert_query = 'INSERT INTO %s (%s) VALUES (%s)' % (
table, col_names, wildcards)
data = [tuple(x) for x in frame.values]
con.executemany(insert_sql, data)


def has_table(name, con):
"""
Check if a given SQLite table exists.
cur.executemany(insert_query, data)

def _write_mysql(frame, table, names, cur):
bracketed_names = ['`' + column + '`' for column in names]
col_names = ','.join(bracketed_names)
wildcards = ','.join([r'%s'] * len(names))
insert_query = "INSERT INTO %s (%s) VALUES (%s)" % (
table, col_names, wildcards)
data = [tuple(x) for x in frame.values]
cur.executemany(insert_query, data)

def table_exists(name, con, flavor):
flavor_map = {
'sqlite': ("SELECT name FROM sqlite_master "
"WHERE type='table' AND name='%s';") % name,
'mysql' : "SHOW TABLES LIKE '%s'" % name}
query = flavor_map.get(flavor, None)
if query is None:
raise NotImplementedError
return len(tquery(query, con)) > 0

Parameters
----------
name: string
SQLite table name
con: DB connection object
"""
sqlstr = "SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % name
rs = tquery(sqlstr, con)
return len(rs) > 0
def get_sqltype(pytype, flavor):
sqltype = {'mysql': 'VARCHAR (63)',
'sqlite': 'TEXT'}

if issubclass(pytype, np.floating):
sqltype['mysql'] = 'FLOAT'
sqltype['sqlite'] = 'REAL'

def get_sqlite_schema(frame, name, dtypes=None, keys=None):
template = """
CREATE TABLE %(name)s (
%(columns)s%(keystr)s
);"""
if issubclass(pytype, np.integer):
#TODO: Refine integer size.
sqltype['mysql'] = 'BIGINT'
sqltype['sqlite'] = 'INTEGER'

column_types = []
if issubclass(pytype, np.datetime64) or pytype is datetime:
# Caution: np.datetime64 is also a subclass of np.number.
sqltype['mysql'] = 'DATETIME'
sqltype['sqlite'] = 'TIMESTAMP'

frame_types = frame.dtypes
for k in frame_types.index:
dt = frame_types[k]
if pytype is datetime.date:
sqltype['mysql'] = 'DATE'
sqltype['sqlite'] = 'TIMESTAMP'

if issubclass(dt.type, (np.integer, np.bool_)):
sqltype = 'INTEGER'
elif issubclass(dt.type, np.floating):
sqltype = 'REAL'
else:
sqltype = 'TEXT'
if issubclass(pytype, np.bool_):
sqltype['sqlite'] = 'INTEGER'

if dtypes is not None:
sqltype = dtypes.get(k, sqltype)
return sqltype[flavor]

column_types.append((k, sqltype))
columns = ',\n '.join('[%s] %s' % x for x in column_types)
def get_schema(frame, name, flavor, keys=None):
"Return a CREATE TABLE statement to suit the contents of a DataFrame."
lookup_type = lambda dtype: get_sqltype(dtype.type, flavor)
# Replace spaces in DataFrame column names with _.
safe_columns = [s.replace(' ', '_').strip() for s in frame.dtypes.index]
column_types = zip(safe_columns, map(lookup_type, frame.dtypes))
if flavor == 'sqlite':
columns = ',\n '.join('[%s] %s' % x for x in column_types)
else:
columns = ',\n '.join('`%s` %s' % x for x in column_types)

keystr = ''
if keys is not None:
if isinstance(keys, basestring):
keys = (keys,)
keystr = ', PRIMARY KEY (%s)' % ','.join(keys)
return template % {'name': name, 'columns': columns, 'keystr': keystr}


#------------------------------------------------------------------------------
# Query formatting

_formatters = {
datetime: lambda dt: "'%s'" % date_format(dt),
str: lambda x: "'%s'" % x,
np.str_: lambda x: "'%s'" % x,
unicode: lambda x: "'%s'" % x,
float: lambda x: "%.8f" % x,
int: lambda x: "%s" % x,
type(None): lambda x: "NULL",
np.float64: lambda x: "%.10f" % x,
bool: lambda x: "'%s'" % x,
}


def format_query(sql, *args):
template = """CREATE TABLE %(name)s (
%(columns)s
%(keystr)s
);"""
create_statement = template % {'name': name, 'columns': columns,
'keystr': keystr}
return create_statement

def sequence2dict(seq):
"""Helper function for cx_Oracle.
For each element in the sequence, creates a dictionary item equal
to the element and keyed by the position of the item in the list.
>>> sequence2dict(("Matt", 1))
{'1': 'Matt', '2': 1}
Source:
http://www.gingerandjohn.com/archives/2004/02/26/cx_oracle-executemany-example/
"""
"""
processed_args = []
for arg in args:
if isinstance(arg, float) and isnull(arg):
arg = None

formatter = _formatters[type(arg)]
processed_args.append(formatter(arg))

return sql % tuple(processed_args)
d = {}
for k,v in zip(range(1, 1 + len(seq)), seq):
d[str(k)] = v
return d
Loading

0 comments on commit a835118

Please sign in to comment.