srtp_writer_future.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "io"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/pion/rtp"
  12. "github.com/pion/srtp/v2"
  13. )
  14. // srtpWriterFuture blocks Read/Write calls until
  15. // the SRTP Session is available
  16. type srtpWriterFuture struct {
  17. ssrc SSRC
  18. rtpSender *RTPSender
  19. rtcpReadStream atomic.Value // *srtp.ReadStreamSRTCP
  20. rtpWriteStream atomic.Value // *srtp.WriteStreamSRTP
  21. mu sync.Mutex
  22. closed bool
  23. }
  24. func (s *srtpWriterFuture) init(returnWhenNoSRTP bool) error {
  25. if returnWhenNoSRTP {
  26. select {
  27. case <-s.rtpSender.stopCalled:
  28. return io.ErrClosedPipe
  29. case <-s.rtpSender.transport.srtpReady:
  30. default:
  31. return nil
  32. }
  33. } else {
  34. select {
  35. case <-s.rtpSender.stopCalled:
  36. return io.ErrClosedPipe
  37. case <-s.rtpSender.transport.srtpReady:
  38. }
  39. }
  40. s.mu.Lock()
  41. defer s.mu.Unlock()
  42. if s.closed {
  43. return io.ErrClosedPipe
  44. }
  45. srtcpSession, err := s.rtpSender.transport.getSRTCPSession()
  46. if err != nil {
  47. return err
  48. }
  49. rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(s.ssrc))
  50. if err != nil {
  51. return err
  52. }
  53. srtpSession, err := s.rtpSender.transport.getSRTPSession()
  54. if err != nil {
  55. return err
  56. }
  57. rtpWriteStream, err := srtpSession.OpenWriteStream()
  58. if err != nil {
  59. return err
  60. }
  61. s.rtcpReadStream.Store(rtcpReadStream)
  62. s.rtpWriteStream.Store(rtpWriteStream)
  63. return nil
  64. }
  65. func (s *srtpWriterFuture) Close() error {
  66. s.mu.Lock()
  67. defer s.mu.Unlock()
  68. if s.closed {
  69. return nil
  70. }
  71. s.closed = true
  72. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  73. return value.Close()
  74. }
  75. return nil
  76. }
  77. func (s *srtpWriterFuture) Read(b []byte) (n int, err error) {
  78. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  79. return value.Read(b)
  80. }
  81. if err := s.init(false); err != nil || s.rtcpReadStream.Load() == nil {
  82. return 0, err
  83. }
  84. return s.Read(b)
  85. }
  86. func (s *srtpWriterFuture) SetReadDeadline(t time.Time) error {
  87. if value, ok := s.rtcpReadStream.Load().(*srtp.ReadStreamSRTCP); ok {
  88. return value.SetReadDeadline(t)
  89. }
  90. if err := s.init(false); err != nil || s.rtcpReadStream.Load() == nil {
  91. return err
  92. }
  93. return s.SetReadDeadline(t)
  94. }
  95. func (s *srtpWriterFuture) WriteRTP(header *rtp.Header, payload []byte) (int, error) {
  96. if value, ok := s.rtpWriteStream.Load().(*srtp.WriteStreamSRTP); ok {
  97. return value.WriteRTP(header, payload)
  98. }
  99. if err := s.init(true); err != nil || s.rtpWriteStream.Load() == nil {
  100. return 0, err
  101. }
  102. return s.WriteRTP(header, payload)
  103. }
  104. func (s *srtpWriterFuture) Write(b []byte) (int, error) {
  105. if value, ok := s.rtpWriteStream.Load().(*srtp.WriteStreamSRTP); ok {
  106. return value.Write(b)
  107. }
  108. if err := s.init(true); err != nil || s.rtpWriteStream.Load() == nil {
  109. return 0, err
  110. }
  111. return s.Write(b)
  112. }