doc.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. Copyright 2025 Psiphon Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package udsipc provides performance Unix Domain Socket (UDS) inter-process communication.
  14. //
  15. // This package implements a client-server communication system over UDS
  16. // with built-in (but optional) support for systemd socket activation,
  17. // automatic reconnection with exponential backoff, write retry logic,
  18. // and comprehensive error reporting.
  19. //
  20. // # Basic Usage
  21. //
  22. // Create a reader to receive messages using functional options:
  23. //
  24. // reader, err := udsipc.NewReader(
  25. // func(data []byte) error {
  26. // // IMPORTANT: Do not retain references to data slice!
  27. // // Copy data if you need to store it beyond this function.
  28. // fmt.Printf("Received: %s\n", data)
  29. // return nil
  30. // },
  31. // "/tmp/ipc.sock", // fallback socket path
  32. // udsipc.WithMaxMessageSize(1024*1024), // max message size (1MB)
  33. // udsipc.WithInactivityTimeout(30*time.Second), // close idle connections
  34. // udsipc.WithReaderErrorCallback(errorHandler), // error callback
  35. // udsipc.WithMaxAcceptErrors(10), // max consecutive accept errors
  36. // )
  37. // if err != nil {
  38. // log.Fatal(err)
  39. // }
  40. //
  41. // if err := reader.Start(); err != nil {
  42. // log.Fatal(err)
  43. // }
  44. // defer reader.Stop(context.Background())
  45. //
  46. // Create a writer to send messages using functional options:
  47. //
  48. // writer, err := udsipc.NewWriter(
  49. // "/tmp/ipc.sock", // socket path
  50. // udsipc.WithWriterErrorCallback(errorHandler), // error callback
  51. // udsipc.WithWriteTimeout(5*time.Second), // write timeout
  52. // udsipc.WithDialTimeout(2*time.Second), // dial timeout
  53. // udsipc.WithMaxBackoff(30*time.Second), // max backoff
  54. // udsipc.WithMaxBufferedWrites(1000), // max buffered writes
  55. // )
  56. // if err != nil {
  57. // log.Fatal(err)
  58. // }
  59. //
  60. // writer.Start()
  61. // defer writer.Stop(context.Background())
  62. //
  63. // // Send messages (non-blocking, returns error if queue is full)
  64. // if err := writer.WriteMessage([]byte("hello world")); err != nil {
  65. // log.Printf("Failed to queue message: %v", err)
  66. // }
  67. //
  68. // # API Design
  69. //
  70. // Both Reader and Writer use a simple Start()/Stop() lifecycle pattern:
  71. //
  72. // - Start() begins operation (non-blocking for Writer, may return error for Reader)
  73. // - Stop(ctx) gracefully shuts down and waits for cleanup or context cancellation/expiration
  74. // - Both methods are idempotent and safe to call multiple times
  75. // - Context controls shutdown timeout - graceful drain until context is cancelled/expires, then forced
  76. //
  77. // # Graceful Shutdown
  78. //
  79. // Both Reader and Writer support context-controlled graceful shutdown:
  80. //
  81. // - Reader.Stop(ctx): Stops accepting new connections and allows in-flight message
  82. // handlers to complete until the context is cancelled or expires. When the context
  83. // is done, forces immediate termination (though already-executing handlers will complete).
  84. //
  85. // - Writer.Stop(ctx): Stops accepting new writes and drains buffered messages
  86. // until the context is cancelled or expires. When the context is done, discards
  87. // remaining buffered messages and terminates immediately.
  88. //
  89. // For immediate shutdown, use a short timeout or cancellation:
  90. //
  91. // ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
  92. // defer cancel()
  93. // reader.Stop(ctx)
  94. // writer.Stop(ctx)
  95. //
  96. // # For indefinite graceful drain, use context.Background() or a long timeout
  97. //
  98. // # Functional Options
  99. //
  100. // Both constructors use functional options for configuration flexibility:
  101. //
  102. // Reader options:
  103. //
  104. // - [WithMaxMessageSize](size uint64): Set maximum message size (default: 10MB)
  105. // - [WithInactivityTimeout](timeout time.Duration): Close idle connections (default: 10s)
  106. // - [WithReaderErrorCallback](callback ErrorCallback): Set error callback
  107. // - [WithMaxAcceptErrors](maxErrors int): Set max consecutive accept errors (default: 10)
  108. // - [WithReadBufferSize](size uint32): Set socket read buffer size (default: 256KB)
  109. //
  110. // Writer options:
  111. //
  112. // - [WithMaxBufferedWrites](size uint32): Set message channel buffer size (default: 10,000)
  113. // - [WithWriteTimeout](timeout time.Duration): Set write timeout (default: 1s)
  114. // - [WithDialTimeout](timeout time.Duration): Set connection timeout (default: 1s)
  115. // - [WithMaxBackoff](maxBackoff time.Duration): Set max retry backoff (default: 10s)
  116. // - [WithWriterErrorCallback](callback ErrorCallback): Set error callback
  117. // - [WithWriteBufferSize](size uint32): Set socket write buffer size (default: 256KB)
  118. //
  119. // # Systemd Integration
  120. //
  121. // The package automatically detects systemd environments and uses socket
  122. // activation when available. When running under systemd with socket
  123. // activation, the reader will use the pre-configured listener instead
  124. // of creating its own socket.
  125. //
  126. // Environment variables used for systemd detection:
  127. //
  128. // - RUNTIME_DIRECTORY: systemd runtime directory
  129. // - STATE_DIRECTORY: systemd state directory
  130. // - LISTEN_FDS: number of file descriptors passed by systemd
  131. // - LISTEN_PID: process ID that should receive the file descriptors
  132. // - NOTIFY_SOCKET: socket for systemd notifications
  133. //
  134. // # Error Handling
  135. //
  136. // Both Reader and Writer accept optional ErrorCallback functions
  137. // to handle various error conditions:
  138. //
  139. // errorHandler := func(err error, context string) {
  140. // log.Printf("Error in %s: %v", context, err)
  141. // // Implement custom error handling logic.
  142. // }
  143. //
  144. // # Protocol
  145. //
  146. // Messages are sent using a length-prefixed protocol:
  147. // 1. Variable-length integer (varint) indicating message length
  148. // 2. Message bytes of the specified length
  149. //
  150. // This ensures reliable message framing and supports messages up to the
  151. // configured maximum size. Protocol overhead is minimal (~0.1% for 1KB+ messages).
  152. //
  153. // # MessageHandler Safety
  154. //
  155. // MessageHandler implementations must NOT retain references to the data slice passed to them.
  156. // The slice is backed by pooled buffers that are reused after the handler returns.
  157. // If you need to retain the data, make a copy:
  158. //
  159. // func handler(data []byte) error {
  160. // // GOOD: Copy if you need to retain.
  161. // msg := make([]byte, len(data))
  162. // copy(msg, data)
  163. //
  164. // // GOOD: Process immediately.
  165. // return process(data)
  166. //
  167. // // BAD: Don't store references.
  168. // // global = data // This risks message corruption via buffer reuse
  169. // }
  170. //
  171. // # Thread Safety
  172. //
  173. // All types in this package are safe for concurrent use. Multiple goroutines
  174. // can safely call WriteMessage() concurrently, and all methods are protected
  175. // by appropriate synchronization mechanisms.
  176. //
  177. // # Metrics and Monitoring
  178. //
  179. // Both Reader and Writer provide comprehensive metrics:
  180. //
  181. // // Reader metrics
  182. // received, connections, errors := reader.GetMetrics()
  183. //
  184. // // Writer metrics
  185. // sent, dropped, failed, queueDepth := writer.GetMetrics()
  186. //
  187. // Use these metrics for health monitoring, alerting, and performance analysis.
  188. package udsipc