session.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package srtp
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "github.com/pion/logging"
  8. "github.com/pion/transport/v2/packetio"
  9. )
  10. type streamSession interface {
  11. Close() error
  12. write([]byte) (int, error)
  13. decrypt([]byte) error
  14. }
  15. type session struct {
  16. localContextMutex sync.Mutex
  17. localContext, remoteContext *Context
  18. localOptions, remoteOptions []ContextOption
  19. newStream chan readStream
  20. started chan interface{}
  21. closed chan interface{}
  22. readStreamsClosed bool
  23. readStreams map[uint32]readStream
  24. readStreamsLock sync.Mutex
  25. log logging.LeveledLogger
  26. bufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
  27. nextConn net.Conn
  28. }
  29. // Config is used to configure a session.
  30. // You can provide either a KeyingMaterialExporter to export keys
  31. // or directly pass the keys themselves.
  32. // After a Config is passed to a session it must not be modified.
  33. type Config struct {
  34. Keys SessionKeys
  35. Profile ProtectionProfile
  36. BufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
  37. LoggerFactory logging.LoggerFactory
  38. // List of local/remote context options.
  39. // ReplayProtection is enabled on remote context by default.
  40. // Default replay protection window size is 64.
  41. LocalOptions, RemoteOptions []ContextOption
  42. }
  43. // SessionKeys bundles the keys required to setup an SRTP session
  44. type SessionKeys struct {
  45. LocalMasterKey []byte
  46. LocalMasterSalt []byte
  47. RemoteMasterKey []byte
  48. RemoteMasterSalt []byte
  49. }
  50. func (s *session) getOrCreateReadStream(ssrc uint32, child streamSession, proto func() readStream) (readStream, bool) {
  51. s.readStreamsLock.Lock()
  52. defer s.readStreamsLock.Unlock()
  53. if s.readStreamsClosed {
  54. return nil, false
  55. }
  56. r, ok := s.readStreams[ssrc]
  57. if ok {
  58. return r, false
  59. }
  60. // Create the readStream.
  61. r = proto()
  62. if err := r.init(child, ssrc); err != nil {
  63. return nil, false
  64. }
  65. s.readStreams[ssrc] = r
  66. return r, true
  67. }
  68. func (s *session) removeReadStream(ssrc uint32) {
  69. s.readStreamsLock.Lock()
  70. defer s.readStreamsLock.Unlock()
  71. if s.readStreamsClosed {
  72. return
  73. }
  74. delete(s.readStreams, ssrc)
  75. }
  76. func (s *session) close() error {
  77. if s.nextConn == nil {
  78. return nil
  79. } else if err := s.nextConn.Close(); err != nil {
  80. return err
  81. }
  82. <-s.closed
  83. return nil
  84. }
  85. func (s *session) start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, child streamSession) error {
  86. var err error
  87. s.localContext, err = CreateContext(localMasterKey, localMasterSalt, profile, s.localOptions...)
  88. if err != nil {
  89. return err
  90. }
  91. s.remoteContext, err = CreateContext(remoteMasterKey, remoteMasterSalt, profile, s.remoteOptions...)
  92. if err != nil {
  93. return err
  94. }
  95. go func() {
  96. defer func() {
  97. close(s.newStream)
  98. s.readStreamsLock.Lock()
  99. s.readStreamsClosed = true
  100. s.readStreamsLock.Unlock()
  101. close(s.closed)
  102. }()
  103. b := make([]byte, 8192)
  104. for {
  105. var i int
  106. i, err = s.nextConn.Read(b)
  107. if err != nil {
  108. if !errors.Is(err, io.EOF) {
  109. s.log.Error(err.Error())
  110. }
  111. return
  112. }
  113. if err = child.decrypt(b[:i]); err != nil {
  114. s.log.Info(err.Error())
  115. }
  116. }
  117. }()
  118. close(s.started)
  119. return nil
  120. }