Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to catch error "SocketException: OS Error: Broken pipe" #47538

Closed
ra1u opened this issue Oct 24, 2021 · 12 comments
Closed

Unable to catch error "SocketException: OS Error: Broken pipe" #47538

ra1u opened this issue Oct 24, 2021 · 12 comments
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. library-io

Comments

@ra1u
Copy link

ra1u commented Oct 24, 2021

We send data from dart acting as client to tcp echo server (not included here). At some point we kill this echo server.
It happens that once this happens we can not catch exception on dart that acts as client.
Application terminates with Unhandled exception: error.

import 'dart:async';
import 'dart:io';
import 'dart:convert';

class Connection {
  static final UTF8 = const Utf8Codec();
  Socket _socket;
  StreamIterator<List<int>> _strm_iter;
  Future _future = Future.value();

  Connection._create(this._socket,this._strm_iter);

  static Future<Connection> connect(host, port) {
    return Socket.connect(host, port).then((Socket sock) {
      StreamIterator<List<int>> iter = StreamIterator<List<int>>(sock);
      return Connection._create(sock,iter);
    });
  }

  Future<String> send_to_echo(String s) async {
    List<int> raw = UTF8.encode(s);
    print("socket add");
    _socket.add(raw);
    print("socket added");
    List<int> r = <int>[];
    while (true){
      print("get next");
      bool next = await _strm_iter.moveNext();
      print("got next $next");
      if(!next){
        return Future.error("closed stream"); 
      }
      List<int> packet = _strm_iter.current;
      r = r + packet;
      if( r.length >= raw.length){
        return UTF8.decode(r);
      }
    }
  }
}

Future main(List<String> args) async {
  try {
    Connection con = await Connection.connect('127.0.0.1', 3333);
    Timer.periodic(Duration(milliseconds: 1000), (p0) async {
      try {
        final String s = "ping ${p0.tick}";   
        print("send $s");
        var ret = await con.send_to_echo(s);
        print("recv $ret");
      } catch (e,s) {
        print('timer:$e $s');
      }
    });
  } catch (e,s) {
    print("main:$e $s");
  }
  await Future.delayed(Duration(milliseconds: 10000));
}

And here is last part of output, where app terminates after server is killed.

...
send ping 4
socket add
socket added
get next
got next true
recv ping 4
send ping 5
socket add
socket added
get next
got next true
recv ping 5
send ping 6
socket add
socket added
get next
got next false
timer:closed stream 
Unhandled exception:
SocketException: OS Error: Broken pipe, errno = 32, address = 127.0.0.1, port = 40050
root@kind_rhodes:/workdir#

Dart version running in container "docker.io/google/dart:2.15-dev" on linux

Dart SDK version: 2.15.0-233.0.dev (dev) (Unknown timestamp) on "linux_x64"    
@vsmenon vsmenon added area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. library-io labels Oct 24, 2021
@vsmenon
Copy link
Member

vsmenon commented Oct 24, 2021

Actually, I believe this is working as intended. The callback that you are passing to periodic is async. It's executing (repeatedly) after main is complete. I.e., it's not actually running under the try-catch in main.

@ra1u
Copy link
Author

ra1u commented Oct 24, 2021

I think that is it catch inside timer that should catch exception. Timer acts like while loop.
It has same result (application terminates with - Unhandled exception) if we replace Timer.periodic with while loop so that main becomes

Future main(List<String> args) async {
  try {
    Connection con = await Connection.connect('127.0.0.1', 3333);
    int tick = 0;
    while(true){
      try {
        final String s = "ping ${tick}";   
        print("send $s");
        var ret = await con.send_to_echo(s);
        print("recv $ret");
      } catch (e,s) {
        print('loop:$e $s');
      }
      tick += 1;
      await Future.delayed(Duration(milliseconds: 1000));
    }
  } catch (e,s) {
    print("main:$e $s");
  }
  print("done");
}

@a-siva
Copy link
Contributor

a-siva commented Nov 1, 2021

I modified the above code as follows

Future main(List<String> args) async {
  try {
      Connection con = await Connection.connect('localhost', 3333);
      int tick = 0;
      while(true){
        try {
          final String s = "ping ${tick}";
          print("send $s");
          var ret = await con.send_to_echo(s);
          print("recv $ret");
        } catch (e,s) {
          print('loop:$e $s');
          await con.Close();
          break;  // Connection broken and so exiting the loop.
        }
        tick += 1;
        await Future.delayed(Duration(milliseconds: 1000));
      }
  } catch (e,s) {
    print("main:$e $s");
  }
  print("done");
}

and I do see the message from the catch statement 'loop:closed stream', and the following 'done' being printed.
I just modified the code to close the socket and exit the loop at this point.

@ra1u
Copy link
Author

ra1u commented Nov 2, 2021

@a-siva

Thank you for your help.
Can you show how con.Close() is implemented.

Additionally can you modify your program to delay a little before main terminates and report on result.

For example:

  print("done start");
  await Future.delayed(Duration(milliseconds: 1000));
  print("done done"); 
} 
// end of file

We expect to get "done done" on output.

Reason is that I do catch first exception that is raised by return Future.error("closed stream"); in Connection.send_to_echo , but after that we get unhandled exception if we keep program running. That second exception terminates app.

@a-siva
Copy link
Contributor

a-siva commented Nov 2, 2021

  Future Close() async {
    return _socket.close();
  }

I made the modifications suggested by you as follows

Future main(List<String> args) async {
  try {
    Connection con = await Connection.connect('localhost', 3333);
    int tick = 0;
    while(true){
        try {
          final String s = "ping ${tick}";
          print("send $s");
          var ret = await con.send_to_echo(s);
          print("recv $ret");
        } catch (e,s) {
          print('loop:$e $s');
          await con.Close();
          break;  // Connection broken and so exiting the loop.
        }
        tick += 1;
        await Future.delayed(Duration(milliseconds: 1000));
    }
  } catch (e,s) {
    print("main:$e $s");
  }
  print("done start");
  await Future.delayed(Duration(milliseconds: 1000));
  print("done done");
}

and here is the output when I run it

send ping 0
socket add
socket added
get next
got next true
recv ping 0
send ping 1
socket add
socket added
get next
got next true
recv ping 1
send ping 2
socket add
socket added
get next
got next true
recv ping 2
send ping 3
socket add
socket added
get next
got next true
recv ping 3
send ping 4
socket add
socket added
get next
got next true
recv ping 4
send ping 5
socket add
socket added
get next
got next true
recv ping 5
send ping 6
socket add
socket added
get next
got next false
loop:closed stream 
done start
done done

@ra1u
Copy link
Author

ra1u commented Nov 2, 2021

Thank you @a-siva you this solve an issue.

Is this expected behavior? It seems that one needs to close connection "quick enough" to stop propagating 2nd exception.

@a-siva
Copy link
Contributor

a-siva commented Nov 2, 2021

I believe the exception mechanism seems to work on the read side but we seem to have an issue on the write side which is causing these unhandled exceptions to bubble up. I am investigating this by converitng your send_to_echo method to just do writes without any reads and then killing the server.

@a-siva
Copy link
Contributor

a-siva commented Nov 2, 2021

if the send_to_echo method is modified to only send data and not try to read anything as follows

class Connection {
  .......
  ........
  // Done method in class Connection.
  Future Done() async {
    return _socket.done;
  }

  // added a boolean field 'disconnected' to the 'Connection' class which is initialized to false.
  bool disconnected = false;
  .....
  .....
}

  void send_to_echo(String s) {
    List<int> raw = UTF8.encode(s);
    print("socket add");
    if (!disconnected) {
      _socket.add(raw);
    } else {
      throw 'Socket Disconnected';
    }
    print("socket added");
  }

Future main(List<String> args) async {
  try {
    Connection con = await Connection.connect('localhost', 3333);
    con.Done().then((obj) {
      print('done method on socket called');
      con.disconnected = true;
    });
    int tick = 0;
    while(true){
        try {
          final String s = "ping ${tick}";
          print("send $s");
          con.send_to_echo(s);
        } catch (e,s) {
          print('loop:$e $s');
          await con.Close();
          break;  // Connection broken and so exiting the loop.
        }
        tick += 1;
        await Future.delayed(Duration(milliseconds: 1000));
    }
  } catch (e,s) {
    print("main:$e $s");
  }
  print("done start");
  await Future.delayed(Duration(milliseconds: 1000));
  print("done done");
}

and run the modified 'main' method I have above then it does result in the unhandled exception
SocketException: OS Error: Broken pipe, errno = 32, address = localhost, port = 44122

We seem to have an issue propogating exceptions correctly when a write is done to a socket that has been disconnected.

@a-siva
Copy link
Contributor

a-siva commented Nov 3, 2021

The reason I was getting unhandled SocketExceptions in the above code was because I did not register a listener for the socket stream. I modified the code as follows and things work just fine

class Connection {
  static final UTF8 = const Utf8Codec();
  Socket _socket;
  Future _future = Future.value();

  Connection._create(this._socket);

  static Future<Connection> connect(host, port) {
    return Socket.connect(host, port).then((Socket sock) {
        sock.listen(
          null,
          onError : (obj) {
            print("Error event received");
            sock.close();
          },
          onDone : () {
            print("onDone event received");
            sock.close();
          }
        );
        return Connection._create(sock);
      },
      onError: (e, s) {
        print("Connection Failed : $e");
        exit(0);
      });
  }

  Future Close() async {
    return _socket.close();
  }

  void send_to_echo(String s) {
    List<int> raw = UTF8.encode(s);
    print("socket add");
    _socket.add(raw);
    print("socket added");
  }
}

Future main(List<String> args) async {
  try {
    Connection con = await Connection.connect('localhost', 3333);
    int tick = 0;
    while(true){
        try {
          final String s = "ping ${tick}";
          print("send $s");
          con.send_to_echo(s);
        } catch (e,s) {
          print('loop:$e');
          break;  // Connection broken and so exiting the loop.
        }
        tick += 1;
        await Future.delayed(Duration(milliseconds: 1000));
    }
  } catch (e,s) {
    print("main:$e $s");
  }
  print("done start");
  await Future.delayed(Duration(milliseconds: 1000));
  print("done done");
}

Output from client running the above code

send ping 0
socket add
socket added
send ping 1
socket add
socket added
send ping 2
socket add
socket added
send ping 3
socket add
socket added
send ping 4
socket add
socket added
onDone event received
send ping 5
socket add
loop:Bad state: StreamSink is closed
done start
done done


@a-siva
Copy link
Contributor

a-siva commented Nov 3, 2021

Things appear to work as expected, I am closing this issue, if you think there are more issues please do not hesitate to reopen it.

@a-siva a-siva closed this as completed Nov 3, 2021
@ra1u
Copy link
Author

ra1u commented Nov 4, 2021

I think that current approach is similar to initial workaround and that issue remains.

One need to close socket to stop propagating exceptions. It is also not clear how to catch generated exception.

For example if we comment out socket.close() in onDone we would expect to catch error in onError.
Particularly we expect to get error: "SocketException: OS Error: Broken pipe, errno = 32, address = localhost, port = 53946" , That is not the case and application terminates.

@ogios
Copy link

ogios commented Oct 13, 2023

i find that we have to call socket.close() in listen(onError:(err){}) then it will not raise that Unhandled Exception.

before this i couldn't even get the stacktrace of where it was raised even if i called socket.destory()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. library-io
Projects
None yet
Development

No branches or pull requests

4 participants