nonblock.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // +build darwin linux
  2. /*
  3. * Copyright (c) 2017, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package tun
  21. import (
  22. "errors"
  23. "io"
  24. "sync"
  25. "sync/atomic"
  26. "syscall"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/creack/goselect"
  29. )
  30. // NonblockingIO provides interruptible I/O for non-pollable
  31. // and/or foreign file descriptors that can't use the netpoller
  32. // available in os.OpenFile as of Go 1.9.
  33. //
  34. // A NonblockingIO wraps a file descriptor in an
  35. // io.ReadWriteCloser interface. The underlying implementation
  36. // uses select and a pipe to interrupt Read and Write calls that
  37. // are blocked when Close is called.
  38. //
  39. // Read and write mutexes allow, for each operation, only one
  40. // concurrent goroutine to call syscalls, preventing an unbounded
  41. // number of OS threads from being created by blocked select
  42. // syscalls.
  43. type NonblockingIO struct {
  44. closed int32
  45. ioFD int
  46. controlFDs [2]int
  47. readMutex sync.Mutex
  48. readFDSet *goselect.FDSet
  49. writeMutex sync.Mutex
  50. writeFDSets []*goselect.FDSet
  51. }
  52. // NewNonblockingIO creates a new NonblockingIO with the specified
  53. // file descriptor, which is duplicated and set to nonblocking and
  54. // close-on-exec.
  55. func NewNonblockingIO(ioFD int) (*NonblockingIO, error) {
  56. syscall.ForkLock.RLock()
  57. defer syscall.ForkLock.RUnlock()
  58. newFD, err := syscall.Dup(ioFD)
  59. if err != nil {
  60. return nil, common.ContextError(err)
  61. }
  62. init := func(fd int) error {
  63. syscall.CloseOnExec(fd)
  64. return syscall.SetNonblock(fd, true)
  65. }
  66. err = init(newFD)
  67. if err != nil {
  68. return nil, common.ContextError(err)
  69. }
  70. var controlFDs [2]int
  71. err = syscall.Pipe(controlFDs[:])
  72. if err != nil {
  73. return nil, common.ContextError(err)
  74. }
  75. for _, fd := range controlFDs {
  76. err = init(fd)
  77. if err != nil {
  78. return nil, common.ContextError(err)
  79. }
  80. }
  81. return &NonblockingIO{
  82. ioFD: newFD,
  83. controlFDs: controlFDs,
  84. readFDSet: new(goselect.FDSet),
  85. writeFDSets: []*goselect.FDSet{
  86. new(goselect.FDSet), new(goselect.FDSet)},
  87. }, nil
  88. }
  89. // Read implements the io.Reader interface.
  90. func (nio *NonblockingIO) Read(p []byte) (int, error) {
  91. nio.readMutex.Lock()
  92. defer nio.readMutex.Unlock()
  93. if atomic.LoadInt32(&nio.closed) != 0 {
  94. return 0, io.EOF
  95. }
  96. for {
  97. nio.readFDSet.Zero()
  98. nio.readFDSet.Set(uintptr(nio.controlFDs[0]))
  99. nio.readFDSet.Set(uintptr(nio.ioFD))
  100. max := nio.ioFD
  101. if nio.controlFDs[0] > max {
  102. max = nio.controlFDs[0]
  103. }
  104. err := goselect.Select(max+1, nio.readFDSet, nil, nil, -1)
  105. if err != nil {
  106. return 0, common.ContextError(err)
  107. }
  108. if nio.readFDSet.IsSet(uintptr(nio.controlFDs[0])) {
  109. return 0, io.EOF
  110. }
  111. n, err := syscall.Read(nio.ioFD, p)
  112. if err != nil && err != io.EOF {
  113. return n, common.ContextError(err)
  114. }
  115. return n, err
  116. }
  117. }
  118. // Write implements the io.Writer interface.
  119. func (nio *NonblockingIO) Write(p []byte) (int, error) {
  120. nio.writeMutex.Lock()
  121. defer nio.writeMutex.Unlock()
  122. if atomic.LoadInt32(&nio.closed) != 0 {
  123. return 0, common.ContextError(errors.New("file already closed"))
  124. }
  125. n := 0
  126. t := len(p)
  127. for n < t {
  128. nio.writeFDSets[0].Zero()
  129. nio.writeFDSets[0].Set(uintptr(nio.controlFDs[0]))
  130. nio.writeFDSets[1].Zero()
  131. nio.writeFDSets[1].Set(uintptr(nio.ioFD))
  132. max := nio.ioFD
  133. if nio.controlFDs[0] > max {
  134. max = nio.controlFDs[0]
  135. }
  136. err := goselect.Select(max+1, nio.writeFDSets[0], nio.writeFDSets[1], nil, -1)
  137. if err != nil {
  138. return 0, common.ContextError(err)
  139. }
  140. if nio.writeFDSets[0].IsSet(uintptr(nio.controlFDs[0])) {
  141. return 0, common.ContextError(errors.New("file has closed"))
  142. }
  143. m, err := syscall.Write(nio.ioFD, p)
  144. n += m
  145. if err != nil && err != syscall.EAGAIN && err != syscall.EWOULDBLOCK {
  146. return n, common.ContextError(err)
  147. }
  148. if n < t {
  149. p = p[m:]
  150. }
  151. }
  152. return n, nil
  153. }
  154. // IsClosed indicates whether the NonblockingIO is closed.
  155. func (nio *NonblockingIO) IsClosed() bool {
  156. return atomic.LoadInt32(&nio.closed) != 0
  157. }
  158. // Close implements the io.Closer interface.
  159. func (nio *NonblockingIO) Close() error {
  160. if !atomic.CompareAndSwapInt32(&nio.closed, 0, 1) {
  161. return nil
  162. }
  163. // Interrupt any Reads/Writes blocked in Select.
  164. var b [1]byte
  165. _, err := syscall.Write(nio.controlFDs[1], b[:])
  166. if err != nil {
  167. return common.ContextError(err)
  168. }
  169. // Lock to ensure concurrent Read/Writes have
  170. // exited and are no longer using the file
  171. // descriptors before closing the file descriptors.
  172. nio.readMutex.Lock()
  173. defer nio.readMutex.Unlock()
  174. nio.writeMutex.Lock()
  175. defer nio.writeMutex.Unlock()
  176. syscall.Close(nio.controlFDs[0])
  177. syscall.Close(nio.controlFDs[1])
  178. syscall.Close(nio.ioFD)
  179. return nil
  180. }