nonblock.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // +build darwin linux
  2. /*
  3. * Copyright (c) 2017, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package tun
  21. import (
  22. "errors"
  23. "io"
  24. "sync"
  25. "sync/atomic"
  26. "syscall"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/creack/goselect"
  29. )
  30. // NonblockingIO provides interruptible I/O for non-pollable
  31. // and/or foreign file descriptors that can't use the netpoller
  32. // available in os.OpenFile as of Go 1.9.
  33. //
  34. // A NonblockingIO wraps a file descriptor in an
  35. // io.ReadWriteCloser interface. The underlying implementation
  36. // uses select and a pipe to interrupt Read and Write calls that
  37. // are blocked when Close is called.
  38. //
  39. // Read and write mutexes allow, for each operation, only one
  40. // concurrent goroutine to call syscalls, preventing an unbounded
  41. // number of OS threads from being created by blocked select
  42. // syscalls.
  43. type NonblockingIO struct {
  44. closed int32
  45. ioFD int
  46. controlFDs [2]int
  47. readMutex sync.Mutex
  48. readFDSet *goselect.FDSet
  49. writeMutex sync.Mutex
  50. writeFDSets []*goselect.FDSet
  51. }
  52. // NewNonblockingIO creates a new NonblockingIO with the specified
  53. // file descriptor, which is duplicated and set to nonblocking and
  54. // close-on-exec.
  55. func NewNonblockingIO(ioFD int) (*NonblockingIO, error) {
  56. syscall.ForkLock.RLock()
  57. defer syscall.ForkLock.RUnlock()
  58. newFD, err := syscall.Dup(ioFD)
  59. if err != nil {
  60. return nil, common.ContextError(err)
  61. }
  62. init := func(fd int) error {
  63. syscall.CloseOnExec(fd)
  64. return syscall.SetNonblock(fd, true)
  65. }
  66. err = init(newFD)
  67. if err != nil {
  68. return nil, common.ContextError(err)
  69. }
  70. var controlFDs [2]int
  71. err = syscall.Pipe(controlFDs[:])
  72. if err != nil {
  73. return nil, common.ContextError(err)
  74. }
  75. for _, fd := range controlFDs {
  76. err = init(fd)
  77. if err != nil {
  78. return nil, common.ContextError(err)
  79. }
  80. }
  81. return &NonblockingIO{
  82. ioFD: newFD,
  83. controlFDs: controlFDs,
  84. readFDSet: new(goselect.FDSet),
  85. writeFDSets: []*goselect.FDSet{
  86. new(goselect.FDSet), new(goselect.FDSet)},
  87. }, nil
  88. }
  89. // Read implements the io.Reader interface.
  90. func (nio *NonblockingIO) Read(p []byte) (int, error) {
  91. nio.readMutex.Lock()
  92. defer nio.readMutex.Unlock()
  93. if atomic.LoadInt32(&nio.closed) != 0 {
  94. return 0, io.EOF
  95. }
  96. for {
  97. nio.readFDSet.Zero()
  98. nio.readFDSet.Set(uintptr(nio.controlFDs[0]))
  99. nio.readFDSet.Set(uintptr(nio.ioFD))
  100. max := nio.ioFD
  101. if nio.controlFDs[0] > max {
  102. max = nio.controlFDs[0]
  103. }
  104. err := goselect.Select(max+1, nio.readFDSet, nil, nil, -1)
  105. if err == syscall.EINTR {
  106. continue
  107. } else if err != nil {
  108. return 0, common.ContextError(err)
  109. }
  110. if nio.readFDSet.IsSet(uintptr(nio.controlFDs[0])) {
  111. return 0, io.EOF
  112. }
  113. n, err := syscall.Read(nio.ioFD, p)
  114. if err != nil && err != io.EOF {
  115. return n, common.ContextError(err)
  116. }
  117. if n == 0 && err == nil {
  118. // https://godoc.org/io#Reader:
  119. // "Implementations of Read are discouraged from
  120. // returning a zero byte count with a nil error".
  121. continue
  122. }
  123. return n, err
  124. }
  125. }
  126. // Write implements the io.Writer interface.
  127. func (nio *NonblockingIO) Write(p []byte) (int, error) {
  128. nio.writeMutex.Lock()
  129. defer nio.writeMutex.Unlock()
  130. if atomic.LoadInt32(&nio.closed) != 0 {
  131. return 0, common.ContextError(errors.New("file already closed"))
  132. }
  133. n := 0
  134. t := len(p)
  135. for n < t {
  136. nio.writeFDSets[0].Zero()
  137. nio.writeFDSets[0].Set(uintptr(nio.controlFDs[0]))
  138. nio.writeFDSets[1].Zero()
  139. nio.writeFDSets[1].Set(uintptr(nio.ioFD))
  140. max := nio.ioFD
  141. if nio.controlFDs[0] > max {
  142. max = nio.controlFDs[0]
  143. }
  144. err := goselect.Select(max+1, nio.writeFDSets[0], nio.writeFDSets[1], nil, -1)
  145. if err == syscall.EINTR {
  146. continue
  147. } else if err != nil {
  148. return 0, common.ContextError(err)
  149. }
  150. if nio.writeFDSets[0].IsSet(uintptr(nio.controlFDs[0])) {
  151. return 0, common.ContextError(errors.New("file has closed"))
  152. }
  153. m, err := syscall.Write(nio.ioFD, p)
  154. n += m
  155. if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
  156. return n, common.ContextError(err)
  157. }
  158. if n < t {
  159. p = p[m:]
  160. }
  161. }
  162. return n, nil
  163. }
  164. // IsClosed indicates whether the NonblockingIO is closed.
  165. func (nio *NonblockingIO) IsClosed() bool {
  166. return atomic.LoadInt32(&nio.closed) != 0
  167. }
  168. // Close implements the io.Closer interface.
  169. func (nio *NonblockingIO) Close() error {
  170. if !atomic.CompareAndSwapInt32(&nio.closed, 0, 1) {
  171. return nil
  172. }
  173. // Interrupt any Reads/Writes blocked in Select.
  174. var b [1]byte
  175. _, err := syscall.Write(nio.controlFDs[1], b[:])
  176. if err != nil {
  177. return common.ContextError(err)
  178. }
  179. // Lock to ensure concurrent Read/Writes have
  180. // exited and are no longer using the file
  181. // descriptors before closing the file descriptors.
  182. nio.readMutex.Lock()
  183. defer nio.readMutex.Unlock()
  184. nio.writeMutex.Lock()
  185. defer nio.writeMutex.Unlock()
  186. syscall.Close(nio.controlFDs[0])
  187. syscall.Close(nio.controlFDs[1])
  188. syscall.Close(nio.ioFD)
  189. return nil
  190. }