Skip to content

Commit

Permalink
resolves xxgreg#75
Browse files Browse the repository at this point in the history
Added `executeMulti` and decided to add `queryMulti` as well because
some people might want to use the insert returning form and that
needs the `query` not `execute`
  • Loading branch information
lteacher committed May 10, 2016
1 parent 93a0700 commit 707b913
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ out
test/test_config.yaml
/.pub
/.idea

.packages
.atom
95 changes: 55 additions & 40 deletions lib/src/postgresql_impl/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ConnectionImpl implements Connection {

TransactionState _transactionState = unknown;
TransactionState get transactionState => _transactionState;

@deprecated TransactionState get transactionStatus => _transactionState;

final String _databaseName;
Expand All @@ -42,32 +42,32 @@ class ConnectionImpl implements Connection {
int _msgLength;
int _secretKey;
bool _isUtcTimeZone = false;

int _backendPid;
final _getDebugName;

int get backendPid => _backendPid;

String get debugName => _getDebugName();

String toString() => '$debugName:$_backendPid';

final Map<String,String> _parameters = new Map<String, String>();

Map<String,String> _parametersView;

Map<String,String> get parameters {
if (_parametersView == null)
_parametersView = new UnmodifiableMapView(_parameters);
return _parametersView;
}

Stream get messages => _messages.stream;

@deprecated Stream<Message> get unhandled => messages;

final StreamController _messages = new StreamController.broadcast();

static Future<ConnectionImpl> connect(
String uri,
{Duration connectionTimeout,
Expand All @@ -76,43 +76,43 @@ class ConnectionImpl implements Connection {
TypeConverter typeConverter,
String getDebugName(),
Future<Socket> mockSocketConnect(String host, int port)}) {

return new Future.sync(() {

var settings = new Settings.fromUri(uri);

//FIXME Currently this timeout doesn't cancel the socket connection
//FIXME Currently this timeout doesn't cancel the socket connection
// process.
// There is a bug open about adding a real socket connect timeout
// parameter to Socket.connect() if this happens then start using it.
// http://code.google.com/p/dart/issues/detail?id=19120
if (connectionTimeout == null)
connectionTimeout = new Duration(seconds: 180);

getDebugName = getDebugName == null ? () => 'pgconn' : getDebugName;

var onTimeout = () => throw new PostgresqlException(
'Postgresql connection timed out. Timeout: $connectionTimeout.',
getDebugName());

var connectFunc = mockSocketConnect == null
? Socket.connect
: mockSocketConnect;

Future<Socket> future = connectFunc(settings.host, settings.port)
.timeout(connectionTimeout, onTimeout: onTimeout);

if (settings.requireSsl) future = _connectSsl(future);

return future.timeout(connectionTimeout, onTimeout: onTimeout).then((socket) {

var conn = new ConnectionImpl._private(socket, settings,
applicationName, timeZone, typeConverter, getDebugName);
socket.listen(conn._readData,
applicationName, timeZone, typeConverter, getDebugName);

socket.listen(conn._readData,
onError: conn._handleSocketError,
onDone: conn._handleSocketClosed);

conn._state = socketConnected;
conn._sendStartupMessage();
return conn._connected.future;
Expand Down Expand Up @@ -281,7 +281,7 @@ class ConnectionImpl implements Connection {
_destroy();

var msg = closed ? 'Socket closed unexpectedly.' : 'Socket error.';

if (!_hasConnected) {
_connected.completeError(new PostgresqlException(msg, _getDebugName(),
exception: error));
Expand Down Expand Up @@ -426,7 +426,7 @@ class ConnectionImpl implements Connection {

var ex = new PostgresqlException(msg.message, _getDebugName(),
serverMessage: msg);

if (msgType == _MSG_ERROR_RESPONSE) {
if (!_hasConnected) {
_state = closed;
Expand All @@ -452,24 +452,24 @@ class ConnectionImpl implements Connection {
assert(_buffer.bytesAvailable >= length);
var name = _buffer.readUtf8String(10000);
var value = _buffer.readUtf8String(10000);

warn(msg) {
_messages.add(new ClientMessageImpl(
severity: 'WARNING',
message: msg,
connectionName: _getDebugName()));
}

_parameters[name] = value;

// Cache this value so that it doesn't need to be looked up from the map.
if (name == 'TimeZone') {
_isUtcTimeZone = value == 'UTC';
}

if (name == 'client_encoding' && value != 'UTF8') {
warn('client_encoding parameter must remain as UTF8 for correct string '
'handling. client_encoding is: "$value".');
warn('client_encoding parameter must remain as UTF8 for correct string '
'handling. client_encoding is: "$value".');
}
}

Expand All @@ -496,6 +496,21 @@ class ConnectionImpl implements Connection {
}
}

Stream queryMulti(String sql,[Iterable values]) async* {
for (var valueSet in values) {
yield* await query(sql,valueSet);
}
}

Future<int> executeMulti(String sql,[Iterable values]) async {
var futures = values.fold(new List(),(prev,curr) {
prev.add(execute(sql,curr));
return prev;
});

return await new Stream.fromFutures(futures).reduce((i,j) => i+j);
}

Future runInTransaction(Future operation(), [Isolation isolation = readCommitted]) {

var begin = 'begin';
Expand Down Expand Up @@ -571,7 +586,7 @@ class ConnectionImpl implements Connection {

int count = _buffer.readInt16();
var list = new List<_Column>(count);

for (int i = 0; i < count; i++) {
var name = _buffer.readUtf8String(length); //TODO better maxSize.
int fieldId = _buffer.readInt32();
Expand Down Expand Up @@ -615,12 +630,12 @@ class ConnectionImpl implements Connection {
var col = _query._columns[index];
if (col.isBinary) throw new PostgresqlException(
'Binary result set parsing is not implemented.', _getDebugName());

var str = _buffer.readUtf8StringN(colSize);
var value = _typeConverter.decode(str, col.fieldType,

var value = _typeConverter.decode(str, col.fieldType,
isUtcTimeZone: _isUtcTimeZone, getConnectionName: _getDebugName);

_query._rowData[index] = value;
}

Expand All @@ -642,7 +657,7 @@ class ConnectionImpl implements Connection {
}

void close() {

if (_state == closed)
return;

Expand All @@ -658,7 +673,7 @@ class ConnectionImpl implements Connection {
_query = null;
}
}

Future flushing;
try {
var msg = new MessageBuffer();
Expand All @@ -676,7 +691,7 @@ class ConnectionImpl implements Connection {
exception: e,
stackTrace: st));
}

// Wait for socket flush to succeed or fail before closing the connection.
flushing.whenComplete(_destroy);
}
Expand Down
93 changes: 80 additions & 13 deletions test/postgresql_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Settings loadSettings(){
main() {

String validUri = loadSettings().toUri();

group('Connect', () {

test('Connect', () {
Expand Down Expand Up @@ -417,41 +416,41 @@ main() {
"('-Infinity', '-Infinity');");
conn.execute("insert into dart_unit_test values "
"(@0, @0), (@1, @1), (@2, @2), (@3, @3), (@4, @4), (@5, @5);",
[-0.0, double.NAN, double.INFINITY, double.NEGATIVE_INFINITY, 1e30,
[-0.0, double.NAN, double.INFINITY, double.NEGATIVE_INFINITY, 1e30,
1e-30]);

conn.query('select a, b from dart_unit_test').toList().then(
expectAsync((rows) {
expect(rows[0][0], equals(1.1.toDouble()));
expect(rows[0][1], equals(2.2.toDouble()));

expect(rows[1][0], equals(-0.0));
expect(rows[1][1], equals(-0.0));

expect(rows[2][0], isNaN);
expect(rows[2][1], isNaN);

expect(rows[3][0], equals(double.INFINITY));
expect(rows[3][1], equals(double.INFINITY));

expect(rows[4][0], equals(double.NEGATIVE_INFINITY));
expect(rows[4][1], equals(double.NEGATIVE_INFINITY));

expect(rows[5][0], equals(-0.0));
expect(rows[5][1], equals(-0.0));

expect(rows[6][0], isNaN);
expect(rows[6][1], isNaN);

expect(rows[7][0], equals(double.INFINITY));
expect(rows[7][1], equals(double.INFINITY));

expect(rows[8][0], equals(double.NEGATIVE_INFINITY));
expect(rows[8][1], equals(double.NEGATIVE_INFINITY));

expect(rows[9][0], equals(1e30));
expect(rows[9][1], equals(1e30));

expect(rows[10][0], equals(1e-30));
expect(rows[10][1], equals(1e-30));
})
Expand Down Expand Up @@ -520,6 +519,74 @@ main() {

});

group('Multi Execute', () {
Connection conn;

setUp(() {
return connect(validUri).then((c) => conn = c);
});

tearDown(() {
if (conn != null) conn.close();
});

test('Rows affected', () {
conn.execute('create temporary table dart_unit_test (a int)');

conn.executeMulti('insert into dart_unit_test values (@0)',[
[1],[2]
]).then(
expectAsync((rowsAffected) {
expect(rowsAffected, equals(2));
})
);

conn.executeMulti('insert into dart_unit_test values (@a)',[
{'a':1},
{'a':2},
{'a':3}
]).then(
expectAsync((rowsAffected) {
expect(rowsAffected, equals(3));
})
);
});
});

group('Multi Query', () {
Connection conn;

setUp(() {
return connect(validUri).then((c) => conn = c);
});

tearDown(() {
if (conn != null) conn.close();
});

test('Insert id', () async {
conn.execute('create temporary table dart_unit_test (id serial,a int)');

var rows = await conn.queryMulti('insert into dart_unit_test (a) values (@0) returning id',[
[44],[55],[66]
]).toList();

expect(rows[0].id, equals(1));
expect(rows[1].id, equals(2));
expect(rows[2].id, equals(3));

rows = await conn.queryMulti('insert into dart_unit_test (a) values (@0) returning id',[
{'a': 22},
{'a': 33},
{'a': 44}
]).toList();

expect(rows[0].id, equals(4));
expect(rows[1].id, equals(5));
expect(rows[2].id, equals(6));
});
});

group('PgException', () {

Connection conn;
Expand Down

0 comments on commit 707b913

Please sign in to comment.