Skip to content

Commit

Permalink
Revert "server: drop graceful handling (#546)" (#560)
Browse files Browse the repository at this point in the history
This reverts commit 8223ae8.
  • Loading branch information
miekg authored Nov 9, 2017
1 parent 8223ae8 commit 4bb60ce
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 18 deletions.
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestTimeout(t *testing.T) {
length := time.Since(start)

if length > allowable {
t.Errorf("exchange took longer (%v) than specified Timeout (%v)", length, allowable)
t.Errorf("exchange took longer (%v) than specified Timeout (%v)", length, timeout)
}
}

Expand Down
102 changes: 85 additions & 17 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,22 @@ type Server struct {
DecorateReader DecorateReader
// DecorateWriter is optional, allows customization of the process that writes raw DNS messages.
DecorateWriter DecorateWriter

// Graceful shutdown handling

inFlight sync.WaitGroup

lock sync.RWMutex
started bool
}

// ListenAndServe starts a nameserver on the configured address in *Server.
func (srv *Server) ListenAndServe() error {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.started {
return &Error{err: "server already started"}
}
addr := srv.Addr
if addr == "" {
addr = ":domain"
Expand All @@ -318,7 +330,10 @@ func (srv *Server) ListenAndServe() error {
return err
}
srv.Listener = l
srv.started = true
srv.lock.Unlock()
err = srv.serveTCP(l)
srv.lock.Lock() // to satisfy the defer at the top
return err
case "tcp-tls", "tcp4-tls", "tcp6-tls":
network := "tcp"
Expand All @@ -333,7 +348,10 @@ func (srv *Server) ListenAndServe() error {
return err
}
srv.Listener = l
srv.started = true
srv.lock.Unlock()
err = srv.serveTCP(l)
srv.lock.Lock() // to satisfy the defer at the top
return err
case "udp", "udp4", "udp6":
a, err := net.ResolveUDPAddr(srv.Net, addr)
Expand All @@ -348,7 +366,10 @@ func (srv *Server) ListenAndServe() error {
return e
}
srv.PacketConn = l
srv.started = true
srv.lock.Unlock()
err = srv.serveUDP(l)
srv.lock.Lock() // to satisfy the defer at the top
return err
}
return &Error{err: "bad network"}
Expand All @@ -357,38 +378,72 @@ func (srv *Server) ListenAndServe() error {
// ActivateAndServe starts a nameserver with the PacketConn or Listener
// configured in *Server. Its main use is to start a server from systemd.
func (srv *Server) ActivateAndServe() error {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.started {
return &Error{err: "server already started"}
}
pConn := srv.PacketConn
l := srv.Listener

if srv.UDPSize == 0 {
srv.UDPSize = MinMsgSize
}
// Check PacketConn interface's type is valid and value
// is not nil
if t, ok := pConn.(*net.UDPConn); ok && t != nil {
if e := setUDPSocketOptions(t); e != nil {
if pConn != nil {
if srv.UDPSize == 0 {
srv.UDPSize = MinMsgSize
}
// Check PacketConn interface's type is valid and value
// is not nil
if t, ok := pConn.(*net.UDPConn); ok && t != nil {
if e := setUDPSocketOptions(t); e != nil {
return e
}
srv.started = true
srv.lock.Unlock()
e := srv.serveUDP(t)
srv.lock.Lock() // to satisfy the defer at the top
return e
}
e := srv.serveUDP(t)
return e
}

if l != nil {
srv.started = true
srv.lock.Unlock()
e := srv.serveTCP(l)
srv.lock.Lock() // to satisfy the defer at the top
return e
}
return &Error{err: "bad listeners"}
}

// Shutdown shuts down a server. After a call to Shutdown, ListenAndServe and ActivateAndServe will return.
// Shutdown gracefully shuts down a server. After a call to Shutdown, ListenAndServe and
// ActivateAndServe will return. All in progress queries are completed before the server
// is taken down. If the Shutdown is taking longer than the reading timeout an error
// is returned.
func (srv *Server) Shutdown() error {
srv.lock.Lock()
if !srv.started {
srv.lock.Unlock()
return &Error{err: "server not started"}
}
srv.started = false
srv.lock.Unlock()

if srv.PacketConn != nil {
srv.PacketConn.Close()
}
if srv.Listener != nil {
srv.Listener.Close()
}
return nil

fin := make(chan bool)
go func() {
srv.inFlight.Wait()
fin <- true
}()

select {
case <-time.After(srv.getReadTimeout()):
return &Error{err: "server shutdown is pending"}
case <-fin:
return nil
}
}

// getReadTimeout is a helper func to use system timeout if server did not intend to change it.
Expand Down Expand Up @@ -429,9 +484,16 @@ func (srv *Server) serveTCP(l net.Listener) error {
return err
}
m, err := reader.ReadTCP(rw, rtimeout)
srv.lock.RLock()
if !srv.started {
srv.lock.RUnlock()
return nil
}
srv.lock.RUnlock()
if err != nil {
continue
}
srv.inFlight.Add(1)
go srv.serve(rw.RemoteAddr(), handler, m, nil, nil, rw)
}
}
Expand All @@ -458,18 +520,24 @@ func (srv *Server) serveUDP(l *net.UDPConn) error {
// deadline is not used here
for {
m, s, err := reader.ReadUDP(l, rtimeout)
srv.lock.RLock()
if !srv.started {
srv.lock.RUnlock()
return nil
}
srv.lock.RUnlock()
if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Temporary() {
continue
}
return err
continue
}
srv.inFlight.Add(1)
go srv.serve(s.RemoteAddr(), handler, m, l, s, nil)
}
}

// Serve a new connection.
func (srv *Server) serve(a net.Addr, h Handler, m []byte, u *net.UDPConn, s *SessionUDP, t net.Conn) {
defer srv.inFlight.Done()

w := &response{tsigSecret: srv.TsigSecret, udp: u, tcp: t, remoteAddr: a, udpSession: s}
if srv.DecorateWriter != nil {
w.writer = srv.DecorateWriter(w)
Expand Down

0 comments on commit 4bb60ce

Please sign in to comment.