You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Error
Traceback (most recent call last):
File "lib/python3.11/site-packages/pgcopy/copy.py", line 210, in f
return formatter(v)
^^^^^^^^^^^^
File "lib/python3.11/site-packages/pgcopy/copy.py", line 179, in <lambda>
return lambda v: array_formatter(att.typelem, formatter, v)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "lib/python3.11/site-packages/pgcopy/copy.py", line 162, in array_formatter
return str_formatter(struct.pack(''.join(fmt), *data))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
struct.error: required argument is not an integer
The above exception was the direct cause of the following exception:
For completeness here are the related functions:
pq.write_table(row, parquet_filepath)
parquet_to_table(parquet_filepath, table_name=inspect.stack()[0].function)
defparquet_to_table(filename, table_name=None, database_uri=None, dry_run=False):
""" Parquet file to an executable insertion `COPY FROM` into a table :param filename: Path to a Parquet file :type filename: ```str``` :param table_name: Table name to use, else use penultimate underscore surrounding word form filename basename :type table_name: ```Optional[str]``` :param database_uri: Database connection string. Defaults to `RDBMS_URI` in your env vars. :type database_uri: ```Optional[str]``` """parquet_file=ParquetFile(filename)
engine=create_engine(
environ["RDBMS_URI"] ifdatabase_uriisNoneelsedatabase_uri
)
deque(
map(
lambdadf: df.to_sql(
table_name,
con=engine,
if_exists="append",
method=psql_insert_copy,
index=False,
),
map(methodcaller("to_pandas"), parquet_file.iter_batches()),
),
maxlen=0,
)
defpsql_insert_copy(table, conn, keys, data_iter):
""" Execute SQL statement inserting data Parameters ---------- table : pandas.io.sql.SQLTable conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection keys : list of str Column names data_iter : Iterable that iterates the values to be inserted """columns=keys[1:] ifkeysandkeys[0] =="index"elsekeysmgr=CopyManager(conn.connection, table.name, columns)
mgr.copy(map(lambdaline: map(parse_col, line), data_iter))
conn.connection.commit()
The text was updated successfully, but these errors were encountered:
Unfortunately, I don’t have much experience with Parquet or with df.to_sql. I can try to find some time to look at this, but it likely won’t be this week.
Using your library to batch insert data from Parquet files into PostgreSQL, prepared like so:
Getting this weird error:
For completeness here are the related functions:
The text was updated successfully, but these errors were encountered: