Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Update port script
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Apr 22, 2016
1 parent 183caca commit 4063fe0
Showing 1 changed file with 81 additions and 42 deletions.
123 changes: 81 additions & 42 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
@@ -214,6 +214,10 @@ class Porter(object):

self.progress.add_table(table, postgres_size, table_size)

if table == "event_search":
yield self.handle_search_table(postgres_size, table_size, next_chunk)
return

select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
@@ -232,60 +236,95 @@ class Porter(object):
if rows:
next_chunk = rows[-1][0] + 1

if table == "event_search":
# We have to treat event_search differently since it has a
# different structure in the two different databases.
def insert(txn):
sql = (
"INSERT INTO event_search (event_id, room_id, key, sender, vector)"
" VALUES (?,?,?,?,to_tsvector('english', ?))"
)
self._convert_rows(table, headers, rows)

rows_dict = [
dict(zip(headers, row))
for row in rows
]

txn.executemany(sql, [
(
row["event_id"],
row["room_id"],
row["key"],
row["sender"],
row["value"],
)
for row in rows_dict
])

self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
)
else:
self._convert_rows(table, headers, rows)
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)

def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
)

yield self.postgres_store.execute(insert)

postgres_size += len(rows)

self.progress.update(table, postgres_size)
else:
return

@defer.inlineCallbacks
def handle_search_table(self, postgres_size, table_size, next_chunk):
select = (
"SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
" FROM event_search as es"
" INNER JOIN events AS e USING (event_id, room_id)"
" WHERE es.rowid >= ?"
" ORDER BY es.rowid LIMIT ?"
)

self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
updatevalues={"rowid": next_chunk},
while True:
def r(txn):
txn.execute(select, (next_chunk, self.batch_size,))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]

return headers, rows

headers, rows = yield self.sqlite_store.runInteraction("select", r)

if rows:
next_chunk = rows[-1][0] + 1

# We have to treat event_search differently since it has a
# different structure in the two different databases.
def insert(txn):
sql = (
"INSERT INTO event_search (event_id, room_id, key,"
" sender, vector, origin_server_ts, stream_ordering)"
" VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
)

rows_dict = [
dict(zip(headers, row))
for row in rows
]

txn.executemany(sql, [
(
row["event_id"],
row["room_id"],
row["key"],
row["sender"],
row["value"],
row["origin_server_ts"],
row["stream_ordering"],
)
for row in rows_dict
])

self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
updatevalues={"rowid": next_chunk},
)

yield self.postgres_store.execute(insert)

postgres_size += len(rows)

self.progress.update(table, postgres_size)
self.progress.update("event_search", postgres_size)

else:
return


def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{

0 comments on commit 4063fe0

Please sign in to comment.