Skip to content

Commit

Permalink
Fix: multiple message processing overlap bug when SSL mode is 'disabl…
Browse files Browse the repository at this point in the history
…ed'. (#400)
  • Loading branch information
isoos authored Dec 20, 2024
1 parent 8d57261 commit 548cb7a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 3.4.5

- `close(force: true)` to indicate intent to force-close pending queries and resources. [#396](https://github.com/isoos/postgresql-dart/pull/396) by [davidmartos96](https://github.com/davidmartos96)
- Fix: multiple message processing overlap bug when SSL mode was `disabled`.

## 3.4.4

Expand Down
14 changes: 8 additions & 6 deletions lib/src/v3/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ StreamTransformer<Uint8List, ServerMessage> _readMessages(
}
}

Future<void> handleChunk(Uint8List bytes) async {
Future<void> handleChunk() async {
try {
await framer.addBytes(bytes);
// await framer.addBytes(bytes);
emitFinishedMessages();
} catch (e, st) {
listener.addErrorSync(e, st);
Expand All @@ -80,10 +80,12 @@ StreamTransformer<Uint8List, ServerMessage> _readMessages(

// Don't cancel this subscription on error! If the listener wants that,
// they'll unsubscribe in time after we forward it synchronously.
final rawSubscription =
rawStream.listen(handleChunk, cancelOnError: false)
..onError(listener.addErrorSync)
..onDone(listener.closeSync);
final rawSubscription = rawStream
// TODO: figure out a better way to handle multiple callbacks to framer
.asyncMap(framer.addBytes)
.listen((_) => handleChunk(), cancelOnError: false)
..onError(listener.addErrorSync)
..onDone(listener.closeSync);

listener.onPause = () {
paused = true;
Expand Down
14 changes: 12 additions & 2 deletions test/event_after_closing_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@ void main() {
_print('Inserted ${numBatches * batchSize} rows in ${sw.elapsed}');
}

test('issue#398', () async {
final conn = await server.newConnection();
test('issue#398 ssl:disabled', () async {
final conn = await server.newConnection(sslMode: SslMode.disable);
await createTableAndPopulate(conn);

final rows = await conn.execute('SELECT * FROM large_table');
_print('SELECTED ROWS ${rows.length}');

await conn.close();
});

test('issue#398 ssl:require', () async {
final conn = await server.newConnection(sslMode: SslMode.require);
await createTableAndPopulate(conn);

final rows = await conn.execute('SELECT * FROM large_table');
Expand Down

0 comments on commit 548cb7a

Please sign in to comment.