nonblock.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // +build darwin linux
  2. /*
  3. * Copyright (c) 2017, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package tun
  21. import (
  22. "io"
  23. "sync"
  24. "sync/atomic"
  25. "syscall"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  27. "github.com/creack/goselect"
  28. )
  29. // NonblockingIO provides interruptible I/O for non-pollable
  30. // and/or foreign file descriptors that can't use the netpoller
  31. // available in os.OpenFile as of Go 1.9.
  32. //
  33. // A NonblockingIO wraps a file descriptor in an
  34. // io.ReadWriteCloser interface. The underlying implementation
  35. // uses select and a pipe to interrupt Read and Write calls that
  36. // are blocked when Close is called.
  37. //
  38. // Read and write mutexes allow, for each operation, only one
  39. // concurrent goroutine to call syscalls, preventing an unbounded
  40. // number of OS threads from being created by blocked select
  41. // syscalls.
  42. type NonblockingIO struct {
  43. closed int32
  44. ioFD int
  45. controlFDs [2]int
  46. readMutex sync.Mutex
  47. readFDSet *goselect.FDSet
  48. writeMutex sync.Mutex
  49. writeFDSets []*goselect.FDSet
  50. }
  51. // NewNonblockingIO creates a new NonblockingIO with the specified
  52. // file descriptor, which is duplicated and set to nonblocking and
  53. // close-on-exec.
  54. func NewNonblockingIO(ioFD int) (*NonblockingIO, error) {
  55. syscall.ForkLock.RLock()
  56. defer syscall.ForkLock.RUnlock()
  57. newFD, err := syscall.Dup(ioFD)
  58. if err != nil {
  59. return nil, errors.Trace(err)
  60. }
  61. init := func(fd int) error {
  62. syscall.CloseOnExec(fd)
  63. return syscall.SetNonblock(fd, true)
  64. }
  65. err = init(newFD)
  66. if err != nil {
  67. return nil, errors.Trace(err)
  68. }
  69. var controlFDs [2]int
  70. err = syscall.Pipe(controlFDs[:])
  71. if err != nil {
  72. return nil, errors.Trace(err)
  73. }
  74. for _, fd := range controlFDs {
  75. err = init(fd)
  76. if err != nil {
  77. return nil, errors.Trace(err)
  78. }
  79. }
  80. return &NonblockingIO{
  81. ioFD: newFD,
  82. controlFDs: controlFDs,
  83. readFDSet: new(goselect.FDSet),
  84. writeFDSets: []*goselect.FDSet{
  85. new(goselect.FDSet), new(goselect.FDSet)},
  86. }, nil
  87. }
  88. // Read implements the io.Reader interface.
  89. func (nio *NonblockingIO) Read(p []byte) (int, error) {
  90. nio.readMutex.Lock()
  91. defer nio.readMutex.Unlock()
  92. if atomic.LoadInt32(&nio.closed) != 0 {
  93. return 0, io.EOF
  94. }
  95. for {
  96. nio.readFDSet.Zero()
  97. nio.readFDSet.Set(uintptr(nio.controlFDs[0]))
  98. nio.readFDSet.Set(uintptr(nio.ioFD))
  99. max := nio.ioFD
  100. if nio.controlFDs[0] > max {
  101. max = nio.controlFDs[0]
  102. }
  103. err := goselect.Select(max+1, nio.readFDSet, nil, nil, -1)
  104. if err == syscall.EINTR {
  105. continue
  106. } else if err != nil {
  107. return 0, errors.Trace(err)
  108. }
  109. if nio.readFDSet.IsSet(uintptr(nio.controlFDs[0])) {
  110. return 0, io.EOF
  111. }
  112. n, err := syscall.Read(nio.ioFD, p)
  113. if err != nil && err != io.EOF {
  114. return n, errors.Trace(err)
  115. }
  116. if n == 0 && err == nil {
  117. // https://godoc.org/io#Reader:
  118. // "Implementations of Read are discouraged from
  119. // returning a zero byte count with a nil error".
  120. continue
  121. }
  122. return n, err
  123. }
  124. }
  125. // Write implements the io.Writer interface.
  126. func (nio *NonblockingIO) Write(p []byte) (int, error) {
  127. nio.writeMutex.Lock()
  128. defer nio.writeMutex.Unlock()
  129. if atomic.LoadInt32(&nio.closed) != 0 {
  130. return 0, errors.TraceNew("file already closed")
  131. }
  132. n := 0
  133. t := len(p)
  134. for n < t {
  135. nio.writeFDSets[0].Zero()
  136. nio.writeFDSets[0].Set(uintptr(nio.controlFDs[0]))
  137. nio.writeFDSets[1].Zero()
  138. nio.writeFDSets[1].Set(uintptr(nio.ioFD))
  139. max := nio.ioFD
  140. if nio.controlFDs[0] > max {
  141. max = nio.controlFDs[0]
  142. }
  143. err := goselect.Select(max+1, nio.writeFDSets[0], nio.writeFDSets[1], nil, -1)
  144. if err == syscall.EINTR {
  145. continue
  146. } else if err != nil {
  147. return 0, errors.Trace(err)
  148. }
  149. if nio.writeFDSets[0].IsSet(uintptr(nio.controlFDs[0])) {
  150. return 0, errors.TraceNew("file has closed")
  151. }
  152. m, err := syscall.Write(nio.ioFD, p)
  153. n += m
  154. if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
  155. return n, errors.Trace(err)
  156. }
  157. if n < t {
  158. p = p[m:]
  159. }
  160. }
  161. return n, nil
  162. }
  163. // IsClosed indicates whether the NonblockingIO is closed.
  164. func (nio *NonblockingIO) IsClosed() bool {
  165. return atomic.LoadInt32(&nio.closed) != 0
  166. }
  167. // Close implements the io.Closer interface.
  168. func (nio *NonblockingIO) Close() error {
  169. if !atomic.CompareAndSwapInt32(&nio.closed, 0, 1) {
  170. return nil
  171. }
  172. // Interrupt any Reads/Writes blocked in Select.
  173. var b [1]byte
  174. _, err := syscall.Write(nio.controlFDs[1], b[:])
  175. if err != nil {
  176. return errors.Trace(err)
  177. }
  178. // Lock to ensure concurrent Read/Writes have
  179. // exited and are no longer using the file
  180. // descriptors before closing the file descriptors.
  181. nio.readMutex.Lock()
  182. defer nio.readMutex.Unlock()
  183. nio.writeMutex.Lock()
  184. defer nio.writeMutex.Unlock()
  185. syscall.Close(nio.controlFDs[0])
  186. syscall.Close(nio.controlFDs[1])
  187. syscall.Close(nio.ioFD)
  188. return nil
  189. }