channel.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package ssh
  5. import (
  6. "encoding/binary"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "log"
  11. "sync"
  12. )
  13. const (
  14. minPacketLength = 9
  15. // channelMaxPacket contains the maximum number of bytes that will be
  16. // sent in a single packet. As per RFC 4253, section 6.1, 32k is also
  17. // the minimum.
  18. channelMaxPacket = 1 << 15
  19. // [Psiphon]
  20. //
  21. // Use getChannelWindowSize() instead of channelWindowSize.
  22. //
  23. // We follow OpenSSH here.
  24. //channelWindowSize = 64 * channelMaxPacket
  25. )
  26. // [Psiphon]
  27. //
  28. // - Use a smaller initial/max channel window size.
  29. // - Testing with the full Psiphon stack shows that
  30. // this smaller channel window size is more performant
  31. // for low bandwidth connections while still adequate for
  32. // higher bandwidth connections.
  33. // - In Psiphon, a single SSH connection is used for all
  34. // client port forwards. Bulk data transfers with large
  35. // channel windows can immediately backlog the connection
  36. // with many large SSH packets, introducing large latency
  37. // for opening new channels. For Psiphon, we don't wish to
  38. // optimize for a single bulk transfer throughput.
  39. // - TODO: can we implement some sort of adaptive max
  40. // channel window size, starting with this small initial
  41. // value and only growing based on connection properties?
  42. // - channelWindowSize directly defines the local channel
  43. // window initial and max size. We also cap remote channel
  44. // window sizes via an extra customization in the
  45. // channelOpenConfirmMsg handler. Both upstream and
  46. // downstream bulk data transfers have the same latency
  47. // issue.
  48. // - For packet tunnel, use a larger channel window size,
  49. // since all tunneled traffic flows through a single
  50. // channel; we still select a size smaller than the stock
  51. // channelWindowSize due to client memory constraints.
  52. func getChannelWindowSize(chanType string) int {
  53. // From "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol".
  54. // Copied here to avoid import cycle.
  55. packetTunnelChannelType := "tun@psiphon.ca"
  56. if chanType == packetTunnelChannelType {
  57. return 16 * channelMaxPacket
  58. }
  59. return 4 * channelMaxPacket
  60. }
  61. // NewChannel represents an incoming request to a channel. It must either be
  62. // accepted for use by calling Accept, or rejected by calling Reject.
  63. type NewChannel interface {
  64. // Accept accepts the channel creation request. It returns the Channel
  65. // and a Go channel containing SSH requests. The Go channel must be
  66. // serviced otherwise the Channel will hang.
  67. Accept() (Channel, <-chan *Request, error)
  68. // Reject rejects the channel creation request. After calling
  69. // this, no other methods on the Channel may be called.
  70. Reject(reason RejectionReason, message string) error
  71. // ChannelType returns the type of the channel, as supplied by the
  72. // client.
  73. ChannelType() string
  74. // ExtraData returns the arbitrary payload for this channel, as supplied
  75. // by the client. This data is specific to the channel type.
  76. ExtraData() []byte
  77. }
  78. // A Channel is an ordered, reliable, flow-controlled, duplex stream
  79. // that is multiplexed over an SSH connection.
  80. type Channel interface {
  81. // Read reads up to len(data) bytes from the channel.
  82. Read(data []byte) (int, error)
  83. // Write writes len(data) bytes to the channel.
  84. Write(data []byte) (int, error)
  85. // Close signals end of channel use. No data may be sent after this
  86. // call.
  87. Close() error
  88. // CloseWrite signals the end of sending in-band
  89. // data. Requests may still be sent, and the other side may
  90. // still send data
  91. CloseWrite() error
  92. // SendRequest sends a channel request. If wantReply is true,
  93. // it will wait for a reply and return the result as a
  94. // boolean, otherwise the return value will be false. Channel
  95. // requests are out-of-band messages so they may be sent even
  96. // if the data stream is closed or blocked by flow control.
  97. // If the channel is closed before a reply is returned, io.EOF
  98. // is returned.
  99. SendRequest(name string, wantReply bool, payload []byte) (bool, error)
  100. // Stderr returns an io.ReadWriter that writes to this channel
  101. // with the extended data type set to stderr. Stderr may
  102. // safely be read and written from a different goroutine than
  103. // Read and Write respectively.
  104. Stderr() io.ReadWriter
  105. }
  106. // Request is a request sent outside of the normal stream of
  107. // data. Requests can either be specific to an SSH channel, or they
  108. // can be global.
  109. type Request struct {
  110. Type string
  111. WantReply bool
  112. Payload []byte
  113. ch *channel
  114. mux *mux
  115. }
  116. // Reply sends a response to a request. It must be called for all requests
  117. // where WantReply is true and is a no-op otherwise. The payload argument is
  118. // ignored for replies to channel-specific requests.
  119. func (r *Request) Reply(ok bool, payload []byte) error {
  120. if !r.WantReply {
  121. return nil
  122. }
  123. if r.ch == nil {
  124. return r.mux.ackRequest(ok, payload)
  125. }
  126. return r.ch.ackRequest(ok)
  127. }
  128. // RejectionReason is an enumeration used when rejecting channel creation
  129. // requests. See RFC 4254, section 5.1.
  130. type RejectionReason uint32
  131. const (
  132. Prohibited RejectionReason = iota + 1
  133. ConnectionFailed
  134. UnknownChannelType
  135. ResourceShortage
  136. )
  137. // String converts the rejection reason to human readable form.
  138. func (r RejectionReason) String() string {
  139. switch r {
  140. case Prohibited:
  141. return "administratively prohibited"
  142. case ConnectionFailed:
  143. return "connect failed"
  144. case UnknownChannelType:
  145. return "unknown channel type"
  146. case ResourceShortage:
  147. return "resource shortage"
  148. }
  149. return fmt.Sprintf("unknown reason %d", int(r))
  150. }
  151. func min(a uint32, b int) uint32 {
  152. if a < uint32(b) {
  153. return a
  154. }
  155. return uint32(b)
  156. }
  157. type channelDirection uint8
  158. const (
  159. channelInbound channelDirection = iota
  160. channelOutbound
  161. )
  162. // channel is an implementation of the Channel interface that works
  163. // with the mux class.
  164. type channel struct {
  165. // R/O after creation
  166. chanType string
  167. extraData []byte
  168. localId, remoteId uint32
  169. // maxIncomingPayload and maxRemotePayload are the maximum
  170. // payload sizes of normal and extended data packets for
  171. // receiving and sending, respectively. The wire packet will
  172. // be 9 or 13 bytes larger (excluding encryption overhead).
  173. maxIncomingPayload uint32
  174. maxRemotePayload uint32
  175. mux *mux
  176. // decided is set to true if an accept or reject message has been sent
  177. // (for outbound channels) or received (for inbound channels).
  178. decided bool
  179. // direction contains either channelOutbound, for channels created
  180. // locally, or channelInbound, for channels created by the peer.
  181. direction channelDirection
  182. // Pending internal channel messages.
  183. msg chan interface{}
  184. // Since requests have no ID, there can be only one request
  185. // with WantReply=true outstanding. This lock is held by a
  186. // goroutine that has such an outgoing request pending.
  187. sentRequestMu sync.Mutex
  188. incomingRequests chan *Request
  189. sentEOF bool
  190. // thread-safe data
  191. remoteWin window
  192. pending *buffer
  193. extPending *buffer
  194. // windowMu protects myWindow, the flow-control window, and myConsumed,
  195. // the number of bytes consumed since we last increased myWindow
  196. windowMu sync.Mutex
  197. myWindow uint32
  198. myConsumed uint32
  199. // writeMu serializes calls to mux.conn.writePacket() and
  200. // protects sentClose and packetPool. This mutex must be
  201. // different from windowMu, as writePacket can block if there
  202. // is a key exchange pending.
  203. writeMu sync.Mutex
  204. sentClose bool
  205. // packetPool has a buffer for each extended channel ID to
  206. // save allocations during writes.
  207. packetPool map[uint32][]byte
  208. }
  209. // writePacket sends a packet. If the packet is a channel close, it updates
  210. // sentClose. This method takes the lock c.writeMu.
  211. func (ch *channel) writePacket(packet []byte) error {
  212. ch.writeMu.Lock()
  213. if ch.sentClose {
  214. ch.writeMu.Unlock()
  215. return io.EOF
  216. }
  217. ch.sentClose = (packet[0] == msgChannelClose)
  218. err := ch.mux.conn.writePacket(packet)
  219. ch.writeMu.Unlock()
  220. return err
  221. }
  222. func (ch *channel) sendMessage(msg interface{}) error {
  223. if debugMux {
  224. log.Printf("send(%d): %#v", ch.mux.chanList.offset, msg)
  225. }
  226. p := Marshal(msg)
  227. binary.BigEndian.PutUint32(p[1:], ch.remoteId)
  228. return ch.writePacket(p)
  229. }
  230. // WriteExtended writes data to a specific extended stream. These streams are
  231. // used, for example, for stderr.
  232. func (ch *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
  233. if ch.sentEOF {
  234. return 0, io.EOF
  235. }
  236. // 1 byte message type, 4 bytes remoteId, 4 bytes data length
  237. opCode := byte(msgChannelData)
  238. headerLength := uint32(9)
  239. if extendedCode > 0 {
  240. headerLength += 4
  241. opCode = msgChannelExtendedData
  242. }
  243. ch.writeMu.Lock()
  244. packet := ch.packetPool[extendedCode]
  245. // We don't remove the buffer from packetPool, so
  246. // WriteExtended calls from different goroutines will be
  247. // flagged as errors by the race detector.
  248. ch.writeMu.Unlock()
  249. for len(data) > 0 {
  250. space := min(ch.maxRemotePayload, len(data))
  251. if space, err = ch.remoteWin.reserve(space); err != nil {
  252. return n, err
  253. }
  254. if want := headerLength + space; uint32(cap(packet)) < want {
  255. packet = make([]byte, want)
  256. } else {
  257. packet = packet[:want]
  258. }
  259. todo := data[:space]
  260. packet[0] = opCode
  261. binary.BigEndian.PutUint32(packet[1:], ch.remoteId)
  262. if extendedCode > 0 {
  263. binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
  264. }
  265. binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
  266. copy(packet[headerLength:], todo)
  267. if err = ch.writePacket(packet); err != nil {
  268. return n, err
  269. }
  270. n += len(todo)
  271. data = data[len(todo):]
  272. }
  273. ch.writeMu.Lock()
  274. ch.packetPool[extendedCode] = packet
  275. ch.writeMu.Unlock()
  276. return n, err
  277. }
  278. func (ch *channel) handleData(packet []byte) error {
  279. headerLen := 9
  280. isExtendedData := packet[0] == msgChannelExtendedData
  281. if isExtendedData {
  282. headerLen = 13
  283. }
  284. if len(packet) < headerLen {
  285. // malformed data packet
  286. return parseError(packet[0])
  287. }
  288. var extended uint32
  289. if isExtendedData {
  290. extended = binary.BigEndian.Uint32(packet[5:])
  291. }
  292. length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
  293. if length == 0 {
  294. return nil
  295. }
  296. if length > ch.maxIncomingPayload {
  297. // TODO(hanwen): should send Disconnect?
  298. return errors.New("ssh: incoming packet exceeds maximum payload size")
  299. }
  300. data := packet[headerLen:]
  301. if length != uint32(len(data)) {
  302. return errors.New("ssh: wrong packet length")
  303. }
  304. ch.windowMu.Lock()
  305. if ch.myWindow < length {
  306. ch.windowMu.Unlock()
  307. // TODO(hanwen): should send Disconnect with reason?
  308. return errors.New("ssh: remote side wrote too much")
  309. }
  310. ch.myWindow -= length
  311. ch.windowMu.Unlock()
  312. if extended == 1 {
  313. ch.extPending.write(data)
  314. } else if extended > 0 {
  315. // discard other extended data.
  316. } else {
  317. ch.pending.write(data)
  318. }
  319. return nil
  320. }
  321. func (c *channel) adjustWindow(adj uint32) error {
  322. c.windowMu.Lock()
  323. // Since myConsumed and myWindow are managed on our side, and can never
  324. // exceed the initial window setting, we don't worry about overflow.
  325. c.myConsumed += adj
  326. var sendAdj uint32
  327. channelWindowSize := uint32(getChannelWindowSize(c.chanType))
  328. if (channelWindowSize-c.myWindow > 3*c.maxIncomingPayload) ||
  329. (c.myWindow < channelWindowSize/2) {
  330. sendAdj = c.myConsumed
  331. c.myConsumed = 0
  332. c.myWindow += sendAdj
  333. }
  334. c.windowMu.Unlock()
  335. if sendAdj == 0 {
  336. return nil
  337. }
  338. return c.sendMessage(windowAdjustMsg{
  339. AdditionalBytes: sendAdj,
  340. })
  341. }
  342. func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
  343. switch extended {
  344. case 1:
  345. n, err = c.extPending.Read(data)
  346. case 0:
  347. n, err = c.pending.Read(data)
  348. default:
  349. return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
  350. }
  351. if n > 0 {
  352. err = c.adjustWindow(uint32(n))
  353. // sendWindowAdjust can return io.EOF if the remote
  354. // peer has closed the connection, however we want to
  355. // defer forwarding io.EOF to the caller of Read until
  356. // the buffer has been drained.
  357. if n > 0 && err == io.EOF {
  358. err = nil
  359. }
  360. }
  361. return n, err
  362. }
  363. func (c *channel) close() {
  364. c.pending.eof()
  365. c.extPending.eof()
  366. close(c.msg)
  367. close(c.incomingRequests)
  368. c.writeMu.Lock()
  369. // This is not necessary for a normal channel teardown, but if
  370. // there was another error, it is.
  371. c.sentClose = true
  372. c.writeMu.Unlock()
  373. // Unblock writers.
  374. c.remoteWin.close()
  375. }
  376. // responseMessageReceived is called when a success or failure message is
  377. // received on a channel to check that such a message is reasonable for the
  378. // given channel.
  379. func (ch *channel) responseMessageReceived() error {
  380. if ch.direction == channelInbound {
  381. return errors.New("ssh: channel response message received on inbound channel")
  382. }
  383. if ch.decided {
  384. return errors.New("ssh: duplicate response received for channel")
  385. }
  386. ch.decided = true
  387. return nil
  388. }
  389. func (ch *channel) handlePacket(packet []byte) error {
  390. switch packet[0] {
  391. case msgChannelData, msgChannelExtendedData:
  392. return ch.handleData(packet)
  393. case msgChannelClose:
  394. ch.sendMessage(channelCloseMsg{PeersID: ch.remoteId})
  395. ch.mux.chanList.remove(ch.localId)
  396. ch.close()
  397. return nil
  398. case msgChannelEOF:
  399. // RFC 4254 is mute on how EOF affects dataExt messages but
  400. // it is logical to signal EOF at the same time.
  401. ch.extPending.eof()
  402. ch.pending.eof()
  403. return nil
  404. }
  405. decoded, err := decode(packet)
  406. if err != nil {
  407. return err
  408. }
  409. switch msg := decoded.(type) {
  410. case *channelOpenFailureMsg:
  411. if err := ch.responseMessageReceived(); err != nil {
  412. return err
  413. }
  414. ch.mux.chanList.remove(msg.PeersID)
  415. ch.msg <- msg
  416. case *channelOpenConfirmMsg:
  417. if err := ch.responseMessageReceived(); err != nil {
  418. return err
  419. }
  420. if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
  421. return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
  422. }
  423. ch.remoteId = msg.MyID
  424. ch.maxRemotePayload = msg.MaxPacketSize
  425. // [Psiphon]
  426. //
  427. // - Use a smaller initial/max channel window size.
  428. // - See comments above channelWindowSize definition.
  429. //ch.remoteWin.add(msg.MyWindow)
  430. ch.remoteWin.add(min(msg.MyWindow, getChannelWindowSize(ch.chanType)))
  431. ch.msg <- msg
  432. case *windowAdjustMsg:
  433. if !ch.remoteWin.add(msg.AdditionalBytes) {
  434. return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
  435. }
  436. case *channelRequestMsg:
  437. req := Request{
  438. Type: msg.Request,
  439. WantReply: msg.WantReply,
  440. Payload: msg.RequestSpecificData,
  441. ch: ch,
  442. }
  443. ch.incomingRequests <- &req
  444. default:
  445. ch.msg <- msg
  446. }
  447. return nil
  448. }
  449. func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
  450. ch := &channel{
  451. remoteWin: window{Cond: newCond()},
  452. myWindow: uint32(getChannelWindowSize(chanType)),
  453. pending: newBuffer(),
  454. extPending: newBuffer(),
  455. direction: direction,
  456. incomingRequests: make(chan *Request, chanSize),
  457. msg: make(chan interface{}, chanSize),
  458. chanType: chanType,
  459. extraData: extraData,
  460. mux: m,
  461. packetPool: make(map[uint32][]byte),
  462. }
  463. ch.localId = m.chanList.add(ch)
  464. return ch
  465. }
  466. var errUndecided = errors.New("ssh: must Accept or Reject channel")
  467. var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
  468. type extChannel struct {
  469. code uint32
  470. ch *channel
  471. }
  472. func (e *extChannel) Write(data []byte) (n int, err error) {
  473. return e.ch.WriteExtended(data, e.code)
  474. }
  475. func (e *extChannel) Read(data []byte) (n int, err error) {
  476. return e.ch.ReadExtended(data, e.code)
  477. }
  478. func (ch *channel) Accept() (Channel, <-chan *Request, error) {
  479. if ch.decided {
  480. return nil, nil, errDecidedAlready
  481. }
  482. ch.maxIncomingPayload = channelMaxPacket
  483. confirm := channelOpenConfirmMsg{
  484. PeersID: ch.remoteId,
  485. MyID: ch.localId,
  486. MyWindow: ch.myWindow,
  487. MaxPacketSize: ch.maxIncomingPayload,
  488. }
  489. ch.decided = true
  490. if err := ch.sendMessage(confirm); err != nil {
  491. return nil, nil, err
  492. }
  493. return ch, ch.incomingRequests, nil
  494. }
  495. func (ch *channel) Reject(reason RejectionReason, message string) error {
  496. if ch.decided {
  497. return errDecidedAlready
  498. }
  499. reject := channelOpenFailureMsg{
  500. PeersID: ch.remoteId,
  501. Reason: reason,
  502. Message: message,
  503. Language: "en",
  504. }
  505. ch.decided = true
  506. // [Psiphon]
  507. // Don't leave reference to channel in chanList
  508. ch.mux.chanList.remove(ch.localId)
  509. return ch.sendMessage(reject)
  510. }
  511. func (ch *channel) Read(data []byte) (int, error) {
  512. if !ch.decided {
  513. return 0, errUndecided
  514. }
  515. return ch.ReadExtended(data, 0)
  516. }
  517. func (ch *channel) Write(data []byte) (int, error) {
  518. if !ch.decided {
  519. return 0, errUndecided
  520. }
  521. return ch.WriteExtended(data, 0)
  522. }
  523. func (ch *channel) CloseWrite() error {
  524. if !ch.decided {
  525. return errUndecided
  526. }
  527. ch.sentEOF = true
  528. return ch.sendMessage(channelEOFMsg{
  529. PeersID: ch.remoteId})
  530. }
  531. func (ch *channel) Close() error {
  532. if !ch.decided {
  533. return errUndecided
  534. }
  535. return ch.sendMessage(channelCloseMsg{
  536. PeersID: ch.remoteId})
  537. }
  538. // Extended returns an io.ReadWriter that sends and receives data on the given,
  539. // SSH extended stream. Such streams are used, for example, for stderr.
  540. func (ch *channel) Extended(code uint32) io.ReadWriter {
  541. if !ch.decided {
  542. return nil
  543. }
  544. return &extChannel{code, ch}
  545. }
  546. func (ch *channel) Stderr() io.ReadWriter {
  547. return ch.Extended(1)
  548. }
  549. func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
  550. if !ch.decided {
  551. return false, errUndecided
  552. }
  553. if wantReply {
  554. ch.sentRequestMu.Lock()
  555. defer ch.sentRequestMu.Unlock()
  556. }
  557. msg := channelRequestMsg{
  558. PeersID: ch.remoteId,
  559. Request: name,
  560. WantReply: wantReply,
  561. RequestSpecificData: payload,
  562. }
  563. if err := ch.sendMessage(msg); err != nil {
  564. return false, err
  565. }
  566. if wantReply {
  567. m, ok := (<-ch.msg)
  568. if !ok {
  569. return false, io.EOF
  570. }
  571. switch m.(type) {
  572. case *channelRequestFailureMsg:
  573. return false, nil
  574. case *channelRequestSuccessMsg:
  575. return true, nil
  576. default:
  577. return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
  578. }
  579. }
  580. return false, nil
  581. }
  582. // ackRequest either sends an ack or nack to the channel request.
  583. func (ch *channel) ackRequest(ok bool) error {
  584. if !ch.decided {
  585. return errUndecided
  586. }
  587. var msg interface{}
  588. if !ok {
  589. msg = channelRequestFailureMsg{
  590. PeersID: ch.remoteId,
  591. }
  592. } else {
  593. msg = channelRequestSuccessMsg{
  594. PeersID: ch.remoteId,
  595. }
  596. }
  597. return ch.sendMessage(msg)
  598. }
  599. func (ch *channel) ChannelType() string {
  600. return ch.chanType
  601. }
  602. func (ch *channel) ExtraData() []byte {
  603. return ch.extraData
  604. }