bsbuffer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. // Copyright 2017 Sergey Frolov
  2. // Use of this source code is governed by a LGPL-style
  3. // license that can be found in the LICENSE file.
  4. package bsbuffer
  5. import (
  6. "bytes"
  7. "io"
  8. "io/ioutil"
  9. "sync"
  10. )
  11. // BSBuffer:
  12. // B - Blocking - Read() calls are blocking.
  13. // S - Safe - Supports arbitrary amount of readers and writers.
  14. // Could be unblocked and turned into SBuffer.
  15. type BSBuffer struct {
  16. mu sync.Mutex
  17. bufBlocked bytes.Buffer // used before Unblock() is called
  18. bufUnblocked bytes.Buffer // used after Unblock() is called
  19. r *io.PipeReader
  20. w *io.PipeWriter
  21. unblocked chan struct{} // closed on unblocking
  22. engineExit chan struct{} // after unblocking, engine will wrap up, close this and exit
  23. hasData chan struct{} // never closed
  24. unblockOnce sync.Once
  25. }
  26. // Creates new BSBuffer
  27. func NewBSBuffer() *BSBuffer {
  28. bsb := new(BSBuffer)
  29. bsb.r, bsb.w = io.Pipe()
  30. bsb.hasData = make(chan struct{}, 1)
  31. bsb.unblocked = make(chan struct{})
  32. bsb.engineExit = make(chan struct{})
  33. go bsb.engine()
  34. return bsb
  35. }
  36. // # How this is supposed to work #
  37. // (all operations, except piped ones, are locked)
  38. //
  39. // before Unblock:
  40. // Write stores data to bufBlocked
  41. // engine copies data from bufBlocked, writes to pipe
  42. // Read reads from pipe
  43. // after Unblock:
  44. // Write still writes data to bufBlocked
  45. // engine will copy data from bufBlocked to bufUnblocked and close `engineExit`
  46. // Read reads from pipe
  47. // after engineExit is closed:
  48. // Write writes to bufUnblocked
  49. // Read reads from bufUnblocked
  50. func (b *BSBuffer) engine() {
  51. for {
  52. select {
  53. case _ = <-b.hasData:
  54. b.mu.Lock()
  55. buf, _ := ioutil.ReadAll(&b.bufBlocked)
  56. b.mu.Unlock()
  57. n, _ := b.w.Write(buf) // blocking, unless Unblock was called
  58. select {
  59. case _ = <-b.unblocked:
  60. b.mu.Lock()
  61. // copy from buf whatever wasn't written to the pipe
  62. b.bufUnblocked.Write(buf[n:])
  63. // copy everything from bufBlocked to bufUnblocked
  64. // bufBlocked shouldn't be touched after engineExit is closed
  65. // and we have the Lock.
  66. b.bufUnblocked.Write(b.bufBlocked.Bytes())
  67. close(b.engineExit)
  68. b.mu.Unlock()
  69. return
  70. default:
  71. }
  72. }
  73. }
  74. }
  75. // Reads data from the BSBuffer, blocking until a writer arrives or the BSBuffer is unblocked.
  76. // If the write end is closed with an error, that error is returned as err; otherwise err is EOF.
  77. // Supports multiple concurrent goroutines and p is valid forever.
  78. func (b *BSBuffer) Read(p []byte) (n int, err error) {
  79. n, err = b.r.Read(p) // blocking, unless Unblock was called
  80. if err != nil {
  81. if n != 0 {
  82. // There might be remaining data in underlying buffer, and we want user to
  83. // come back for it, so we clean the error and push data we have upwards
  84. err = nil
  85. } else {
  86. // Unblocked and no data in engine.
  87. // Operate as SafeBuffer
  88. b.mu.Lock()
  89. n, err = b.bufUnblocked.Read(p)
  90. b.mu.Unlock()
  91. }
  92. }
  93. return
  94. }
  95. // Non-blocking write appends the contents of p to the buffer, growing the buffer as needed.
  96. // The return value n is the length of p; err is always nil.
  97. // If the buffer becomes too large, Write will panic with ErrTooLarge.
  98. // Supports multiple concurrent goroutines and p is safe for reuse right away.
  99. func (b *BSBuffer) Write(p []byte) (n int, err error) {
  100. if len(p) == 0 {
  101. return 0, nil
  102. }
  103. b.mu.Lock()
  104. select {
  105. case _ = <-b.engineExit:
  106. n, err = b.bufUnblocked.Write(p)
  107. b.mu.Unlock()
  108. default:
  109. // Push data to engine and wake it up, if needed.
  110. n, err = b.bufBlocked.Write(p)
  111. select {
  112. case b.hasData <- struct{}{}:
  113. default:
  114. }
  115. b.mu.Unlock()
  116. }
  117. return
  118. }
  119. // Turns BSBuffer into SBuffer: Read() is no longer blocking, but still safe.
  120. // Unblock() is safe to call multiple times.
  121. func (b *BSBuffer) Unblock() {
  122. b.unblockOnce.Do(func() {
  123. // closing the pipes will make engine and reads non-blocking
  124. b.w.Close()
  125. b.r.Close()
  126. b.mu.Lock()
  127. close(b.unblocked)
  128. select {
  129. case b.hasData <- struct{}{}:
  130. default:
  131. }
  132. b.mu.Unlock()
  133. })
  134. }