conn_flow.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. /*
  2. TODO: It probably should have read flow that reads messages and says STAAAHP to channel when read
  3. TODO: here we actually can avoid reconnecting if idle for too long
  4. TODO: confirm that all writes are recorded towards data limit
  5. */
  6. package tapdance
  7. import (
  8. "context"
  9. "crypto/rand"
  10. "encoding/binary"
  11. "encoding/hex"
  12. "errors"
  13. "io"
  14. "net"
  15. "sync"
  16. "time"
  17. "github.com/golang/protobuf/proto"
  18. pb "github.com/refraction-networking/gotapdance/protobuf"
  19. "github.com/sergeyfrolov/bsbuffer"
  20. )
  21. // TapdanceFlowConn represents single TapDance flow.
  22. type TapdanceFlowConn struct {
  23. tdRaw *tdRawConn
  24. bsbuf *bsbuffer.BSBuffer
  25. recvbuf []byte
  26. headerBuf [6]byte
  27. writeSliceChan chan []byte
  28. writeResultChan chan ioOpResult
  29. writtenBytesTotal int
  30. yieldConfirmed chan struct{} // used by flowConn to signal that flow was picked up
  31. readOnly bool // if readOnly -- we don't need to wait for write engine to stop
  32. reconnectSuccess chan bool
  33. reconnectStarted chan struct{}
  34. finSent bool // used only by reader to know if it has already scheduled reconnect
  35. closed chan struct{}
  36. closeOnce sync.Once
  37. closeErr error
  38. flowType flowType
  39. }
  40. // NewTapDanceConn returns TapDance connection, that is ready to be Dial'd
  41. func NewTapDanceConn() (net.Conn, error) {
  42. return makeTdFlow(flowBidirectional, nil, "")
  43. }
  44. // Prepares TD flow: does not make any network calls nor sets up engines
  45. func makeTdFlow(flow flowType, tdRaw *tdRawConn, covert string) (*TapdanceFlowConn, error) {
  46. if tdRaw == nil {
  47. // raw TapDance connection is not given, make a new one
  48. stationPubkey := Assets().GetPubkey()
  49. remoteConnId := make([]byte, 16)
  50. rand.Read(remoteConnId[:])
  51. tdRaw = makeTdRaw(tagHttpGetIncomplete,
  52. stationPubkey[:])
  53. tdRaw.covert = covert
  54. tdRaw.sessionId = sessionsTotal.GetAndInc()
  55. }
  56. flowConn := &TapdanceFlowConn{tdRaw: tdRaw}
  57. flowConn.bsbuf = bsbuffer.NewBSBuffer()
  58. flowConn.closed = make(chan struct{})
  59. flowConn.flowType = flow
  60. return flowConn, nil
  61. }
  62. // Dial establishes direct connection to TapDance station proxy.
  63. // Users are expected to send HTTP CONNECT request next.
  64. func (flowConn *TapdanceFlowConn) DialContext(ctx context.Context) error {
  65. if flowConn.tdRaw.tlsConn == nil {
  66. // if still hasn't dialed
  67. err := flowConn.tdRaw.DialContext(ctx)
  68. if err != nil {
  69. return err
  70. }
  71. }
  72. // don't lose initial msg from station
  73. // strip off state transition and push protobuf up for processing
  74. flowConn.tdRaw.initialMsg.StateTransition = nil
  75. err := flowConn.processProto(flowConn.tdRaw.initialMsg)
  76. if err != nil {
  77. flowConn.closeWithErrorOnce(err)
  78. return err
  79. }
  80. switch flowConn.flowType {
  81. case flowUpload:
  82. fallthrough
  83. case flowBidirectional:
  84. flowConn.reconnectSuccess = make(chan bool, 1)
  85. flowConn.reconnectStarted = make(chan struct{})
  86. flowConn.writeSliceChan = make(chan []byte)
  87. flowConn.writeResultChan = make(chan ioOpResult)
  88. go flowConn.spawnReaderEngine()
  89. go flowConn.spawnWriterEngine()
  90. case flowReadOnly:
  91. go flowConn.spawnReaderEngine()
  92. case flowRendezvous:
  93. default:
  94. panic("Not implemented")
  95. }
  96. return nil
  97. }
  98. type ioOpResult struct {
  99. err error
  100. n int
  101. }
  102. func (flowConn *TapdanceFlowConn) schedReconnectNow() {
  103. flowConn.tdRaw.tlsConn.SetReadDeadline(time.Now())
  104. }
  105. // returns bool indicating success of reconnect
  106. func (flowConn *TapdanceFlowConn) awaitReconnect() bool {
  107. defer func() { flowConn.writtenBytesTotal = 0 }()
  108. for {
  109. select {
  110. case <-flowConn.reconnectStarted:
  111. case <-flowConn.closed:
  112. return false
  113. case reconnectOk := <-flowConn.reconnectSuccess:
  114. return reconnectOk
  115. }
  116. }
  117. }
  118. // Write writes data to the connection.
  119. // Write can be made to time out and return an Error with Timeout() == true
  120. // after a fixed time limit; see SetDeadline and SetWriteDeadline.
  121. func (flowConn *TapdanceFlowConn) spawnWriterEngine() {
  122. defer close(flowConn.writeResultChan)
  123. for {
  124. select {
  125. case <-flowConn.reconnectStarted:
  126. if !flowConn.awaitReconnect() {
  127. return
  128. }
  129. case <-flowConn.closed:
  130. return
  131. case b := <-flowConn.writeSliceChan:
  132. ioResult := ioOpResult{}
  133. bytesSent := 0
  134. canSend := func() int {
  135. // checks the upload limit
  136. // 6 is max header size (protobufs aren't sent here though)
  137. // 1024 is max transition message size
  138. return flowConn.tdRaw.UploadLimit -
  139. flowConn.writtenBytesTotal - 6 - 1024
  140. }
  141. for bytesSent < len(b) {
  142. idxToSend := len(b)
  143. if idxToSend-bytesSent > canSend() {
  144. Logger().Infof("%s reconnecting due to upload limit: "+
  145. "idxToSend (%d) - bytesSent(%d) > UploadLimit(%d) - "+
  146. "writtenBytesTotal(%d) - 6 - 1024 \n",
  147. flowConn.idStr(), idxToSend, bytesSent,
  148. flowConn.tdRaw.UploadLimit, flowConn.writtenBytesTotal)
  149. flowConn.schedReconnectNow()
  150. if !flowConn.awaitReconnect() {
  151. return
  152. }
  153. }
  154. Logger().Debugf("%s WriterEngine: writing\n%s", flowConn.idStr(), hex.Dump(b))
  155. if cs := minInt(canSend(), int(maxInt16)); idxToSend-bytesSent > cs {
  156. // just reconnected and still can't send: time to chunk
  157. idxToSend = bytesSent + cs
  158. }
  159. // TODO: outerProto limit on data size
  160. bufToSend := b[bytesSent:idxToSend]
  161. bufToSendWithHeader := getMsgWithHeader(msgRawData, bufToSend) // TODO: optimize!
  162. headerSize := len(bufToSendWithHeader) - len(bufToSend)
  163. n, err := flowConn.tdRaw.tlsConn.Write(bufToSendWithHeader)
  164. if n >= headerSize {
  165. // TODO: that's kinda hacky
  166. n -= headerSize
  167. }
  168. ioResult.n += n
  169. bytesSent += n
  170. flowConn.writtenBytesTotal += len(bufToSendWithHeader)
  171. if err != nil {
  172. ioResult.err = err
  173. break
  174. }
  175. }
  176. select {
  177. case flowConn.writeResultChan <- ioResult:
  178. case <-flowConn.closed:
  179. return
  180. }
  181. }
  182. }
  183. }
  184. func (flowConn *TapdanceFlowConn) spawnReaderEngine() {
  185. flowConn.updateReadDeadline()
  186. flowConn.recvbuf = make([]byte, 1500)
  187. for {
  188. msgType, msgLen, err := flowConn.readHeader()
  189. if err != nil {
  190. flowConn.closeWithErrorOnce(err)
  191. return
  192. }
  193. if msgLen == 0 {
  194. continue // wtf?
  195. }
  196. switch msgType {
  197. case msgRawData:
  198. buf, err := flowConn.readRawData(msgLen)
  199. if err != nil {
  200. flowConn.closeWithErrorOnce(err)
  201. return
  202. }
  203. Logger().Debugf("%s ReaderEngine: read\n%s",
  204. flowConn.idStr(), hex.Dump(buf))
  205. _, err = flowConn.bsbuf.Write(buf)
  206. if err != nil {
  207. flowConn.closeWithErrorOnce(err)
  208. return
  209. }
  210. case msgProtobuf:
  211. msg, err := flowConn.readProtobuf(msgLen)
  212. if err != nil {
  213. flowConn.closeWithErrorOnce(err)
  214. return
  215. }
  216. err = flowConn.processProto(msg)
  217. if err != nil {
  218. flowConn.closeWithErrorOnce(err)
  219. return
  220. }
  221. default:
  222. flowConn.closeWithErrorOnce(errors.New("Corrupted outer protocol header: " +
  223. msgType.Str()))
  224. return
  225. }
  226. }
  227. }
  228. // Write writes data to the connection.
  229. // Write can be made to time out and return an Error with Timeout() == true
  230. // after a fixed time limit; see SetDeadline and SetWriteDeadline.
  231. func (flowConn *TapdanceFlowConn) Write(b []byte) (int, error) {
  232. select {
  233. case flowConn.writeSliceChan <- b:
  234. case <-flowConn.closed:
  235. return 0, flowConn.closeErr
  236. }
  237. select {
  238. case r := <-flowConn.writeResultChan:
  239. return r.n, r.err
  240. case <-flowConn.closed:
  241. return 0, flowConn.closeErr
  242. }
  243. }
  244. func (flowConn *TapdanceFlowConn) Read(b []byte) (int, error) {
  245. return flowConn.bsbuf.Read(b)
  246. }
  247. func (flowConn *TapdanceFlowConn) readRawData(msgLen int) ([]byte, error) {
  248. if cap(flowConn.recvbuf) < msgLen {
  249. flowConn.recvbuf = make([]byte, msgLen)
  250. }
  251. var err error
  252. var readBytes int
  253. var readBytesTotal int // both header and body
  254. // Get the message itself
  255. for readBytesTotal < msgLen {
  256. readBytes, err = flowConn.tdRaw.tlsConn.Read(flowConn.recvbuf[readBytesTotal:])
  257. readBytesTotal += int(readBytes)
  258. if err != nil {
  259. err = flowConn.actOnReadError(err)
  260. if err != nil {
  261. return flowConn.recvbuf[:readBytesTotal], err
  262. }
  263. }
  264. }
  265. return flowConn.recvbuf[:readBytesTotal], err
  266. }
  267. func (flowConn *TapdanceFlowConn) readProtobuf(msgLen int) (msg *pb.StationToClient, err error) {
  268. rbuf := make([]byte, msgLen)
  269. var readBytes int
  270. var readBytesTotal int // both header and body
  271. // Get the message itself
  272. for readBytesTotal < msgLen {
  273. readBytes, err = flowConn.tdRaw.tlsConn.Read(rbuf[readBytesTotal:])
  274. readBytesTotal += readBytes
  275. if err != nil {
  276. err = flowConn.actOnReadError(err)
  277. if err != nil {
  278. return
  279. }
  280. }
  281. }
  282. msg = &pb.StationToClient{}
  283. err = proto.Unmarshal(rbuf[:], msg)
  284. return
  285. }
  286. func (flowConn *TapdanceFlowConn) readHeader() (msgType msgType, msgLen int, err error) {
  287. // For each message we first read outer protocol header to see if it's protobuf or data
  288. var readBytes int
  289. var readBytesTotal uint32 // both header and body
  290. headerSize := uint32(2)
  291. //TODO: check FIN+last data case
  292. for readBytesTotal < headerSize {
  293. readBytes, err = flowConn.tdRaw.tlsConn.Read(flowConn.headerBuf[readBytesTotal:headerSize])
  294. readBytesTotal += uint32(readBytes)
  295. if err != nil {
  296. err = flowConn.actOnReadError(err)
  297. if err != nil {
  298. return
  299. }
  300. }
  301. }
  302. // Get TIL
  303. typeLen := uint16toInt16(binary.BigEndian.Uint16(flowConn.headerBuf[0:2]))
  304. if typeLen < 0 {
  305. msgType = msgRawData
  306. msgLen = int(-typeLen)
  307. } else if typeLen > 0 {
  308. msgType = msgProtobuf
  309. msgLen = int(typeLen)
  310. } else {
  311. // protobuf with size over 32KB, not fitting into 2-byte TL
  312. msgType = msgProtobuf
  313. headerSize += 4
  314. for readBytesTotal < headerSize {
  315. readBytes, err = flowConn.tdRaw.tlsConn.Read(flowConn.headerBuf[readBytesTotal:headerSize])
  316. readBytesTotal += uint32(readBytes)
  317. if err != nil {
  318. err = flowConn.actOnReadError(err)
  319. if err != nil {
  320. return
  321. }
  322. }
  323. }
  324. msgLen = int(binary.BigEndian.Uint32(flowConn.headerBuf[2:6]))
  325. }
  326. return
  327. }
  328. // Allows scheduling/doing reconnects in the middle of reads
  329. func (flowConn *TapdanceFlowConn) actOnReadError(err error) error {
  330. if err == nil {
  331. return nil
  332. }
  333. willScheduleReconnect := false
  334. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  335. // Timeout is used as a signal to schedule reconnect, as reconnect is indeed time dependent.
  336. // One can also SetDeadline(NOW) to schedule deadline NOW.
  337. // After EXPECT_RECONNECT and FIN are sent, deadline is used to signal that flow timed out
  338. // waiting for FIN back.
  339. willScheduleReconnect = true
  340. }
  341. // "EOF is the error returned by Read when no more input is available. Functions should
  342. // return EOF only to signal a graceful end of input." (e.g. FIN was received)
  343. // "ErrUnexpectedEOF means that EOF was encountered in the middle of reading a fixed-size
  344. // block or data structure."
  345. willReconnect := (err == io.EOF || err == io.ErrUnexpectedEOF)
  346. if willScheduleReconnect {
  347. Logger().Infoln(flowConn.tdRaw.idStr() + " scheduling reconnect")
  348. if flowConn.finSent {
  349. // timeout is hit another time before reconnect
  350. return errors.New("reconnect scheduling: timed out waiting for FIN back")
  351. }
  352. if flowConn.flowType != flowReadOnly {
  353. // notify writer, if there is a writer
  354. select {
  355. case <-flowConn.closed:
  356. return errors.New("reconnect scheduling: closed while notifiyng writer")
  357. case flowConn.reconnectStarted <- struct{}{}:
  358. }
  359. }
  360. transition := pb.C2S_Transition_C2S_EXPECT_RECONNECT
  361. if flowConn.flowType == flowUpload {
  362. transition = pb.C2S_Transition_C2S_EXPECT_UPLOADONLY_RECONN
  363. }
  364. _, err = flowConn.tdRaw.writeTransition(transition)
  365. if err != nil {
  366. return errors.New("reconnect scheduling: failed to send " +
  367. transition.String() + ": " + err.Error())
  368. }
  369. if flowConn.flowType == flowUpload {
  370. // for upload-only flows we reconnect right away
  371. willReconnect = true
  372. } else {
  373. flowConn.tdRaw.tlsConn.SetReadDeadline(time.Now().Add(
  374. getRandomDuration(waitForFINDieMin, waitForFINDieMax)))
  375. err = flowConn.tdRaw.closeWrite()
  376. if err != nil {
  377. Logger().Infoln(flowConn.tdRaw.idStr() + " reconnect scheduling:" +
  378. "failed to send FIN: " + err.Error() +
  379. ". Closing roughly and moving on.")
  380. flowConn.tdRaw.Close()
  381. }
  382. flowConn.finSent = true
  383. return nil
  384. }
  385. }
  386. if willReconnect {
  387. if flowConn.flowType != flowReadOnly {
  388. // notify writer, if there is a writer
  389. select {
  390. case <-flowConn.closed:
  391. return errors.New("reconnect scheduling: closed while notifiyng writer")
  392. case flowConn.reconnectStarted <- struct{}{}:
  393. }
  394. }
  395. if (flowConn.flowType != flowUpload && !flowConn.finSent) ||
  396. err == io.ErrUnexpectedEOF {
  397. Logger().Infoln(flowConn.tdRaw.idStr() + " reconnect: FIN is unexpected")
  398. }
  399. err = flowConn.tdRaw.RedialContext(context.Background())
  400. if flowConn.flowType != flowReadOnly {
  401. // wake up writer engine
  402. select {
  403. case <-flowConn.closed:
  404. case flowConn.reconnectSuccess <- (err == nil):
  405. }
  406. }
  407. if err != nil {
  408. return errors.New("reconnect: failed to Redial: " + err.Error())
  409. }
  410. flowConn.finSent = false
  411. // strip off state transition and push protobuf up for processing
  412. flowConn.tdRaw.initialMsg.StateTransition = nil
  413. err = flowConn.processProto(flowConn.tdRaw.initialMsg)
  414. if err == nil {
  415. flowConn.updateReadDeadline()
  416. return nil
  417. } else if err == errMsgClose {
  418. // errMsgClose actually won't show up here
  419. Logger().Infoln(flowConn.tdRaw.idStr() + " closing cleanly with MSG_CLOSE")
  420. return io.EOF
  421. } // else: proceed and exit as a crash
  422. }
  423. return flowConn.closeWithErrorOnce(err)
  424. }
  425. // Sets read deadline to {when raw connection was establihsed} + {timeout} - {small random value}
  426. func (flowConn *TapdanceFlowConn) updateReadDeadline() {
  427. amortizationVal := 0.9
  428. const minSubtrahend = 50
  429. const maxSubtrahend = 9500
  430. deadline := flowConn.tdRaw.establishedAt.Add(time.Millisecond *
  431. time.Duration(int(float64(flowConn.tdRaw.decoySpec.GetTimeout())*amortizationVal)-
  432. getRandInt(minSubtrahend, maxSubtrahend)))
  433. flowConn.tdRaw.tlsConn.SetReadDeadline(deadline)
  434. }
  435. func (flowConn *TapdanceFlowConn) acquireUpload() error {
  436. _, err := flowConn.tdRaw.writeTransition(pb.C2S_Transition_C2S_ACQUIRE_UPLOAD)
  437. if err != nil {
  438. Logger().Infoln(flowConn.idStr() + " Failed attempt to acquire upload:" + err.Error())
  439. } else {
  440. Logger().Infoln(flowConn.idStr() + " Sent acquire upload request")
  441. }
  442. return err
  443. }
  444. func (flowConn *TapdanceFlowConn) yieldUpload() error {
  445. _, err := flowConn.tdRaw.writeTransition(pb.C2S_Transition_C2S_YIELD_UPLOAD)
  446. if err != nil {
  447. Logger().Infoln(flowConn.idStr() + " Failed attempt to yield upload:" + err.Error())
  448. } else {
  449. Logger().Infoln(flowConn.idStr() + " Sent yield upload request")
  450. }
  451. return err
  452. }
  453. // TODO: implement on station, currently unused
  454. // wait for flowConn to confirm that flow was noticed
  455. func (flowConn *TapdanceFlowConn) waitForYieldConfirmation() error {
  456. // camouflage issue
  457. timeout := time.After(20 * time.Second)
  458. select {
  459. case <-timeout:
  460. return errors.New("yield confirmation timeout")
  461. case <-flowConn.yieldConfirmed:
  462. Logger().Infoln(flowConn.idStr() +
  463. " Successfully received yield confirmation from reader flow!")
  464. return nil
  465. case <-flowConn.closed:
  466. return flowConn.closeErr
  467. }
  468. }
  469. // Closes connection, channel and sets error ONCE, e.g. error won't be overwritten
  470. func (flowConn *TapdanceFlowConn) closeWithErrorOnce(err error) error {
  471. if err == nil {
  472. // safeguard, shouldn't happen
  473. err = errors.New("closed with nil error!")
  474. }
  475. flowConn.closeOnce.Do(func() {
  476. flowConn.closeErr = errors.New(flowConn.idStr() + " " + err.Error())
  477. flowConn.bsbuf.Unblock()
  478. close(flowConn.closed)
  479. flowConn.tdRaw.Close()
  480. })
  481. return flowConn.closeErr
  482. }
  483. // Close closes the connection.
  484. // Any blocked Read or Write operations will be unblocked and return errors.
  485. func (flowConn *TapdanceFlowConn) Close() error {
  486. return flowConn.closeWithErrorOnce(errors.New("closed by application layer"))
  487. }
  488. func (flowConn *TapdanceFlowConn) idStr() string {
  489. return flowConn.tdRaw.idStr()
  490. }
  491. func (flowConn *TapdanceFlowConn) processProto(msg *pb.StationToClient) error {
  492. handleConfigInfo := func(conf *pb.ClientConf) {
  493. currGen := Assets().GetGeneration()
  494. if conf.GetGeneration() < currGen {
  495. Logger().Infoln(flowConn.idStr()+" not appliying new config due"+
  496. " to lower generation: ", conf.GetGeneration(), " "+
  497. "(have:", currGen, ")")
  498. return
  499. } else if conf.GetGeneration() < currGen {
  500. Logger().Infoln(flowConn.idStr()+" not appliying new config due"+
  501. " to currently having same generation: ", currGen)
  502. return
  503. }
  504. _err := Assets().SetClientConf(conf)
  505. if _err != nil {
  506. Logger().Warningln(flowConn.idStr() +
  507. " could not persistently set ClientConf: " + _err.Error())
  508. }
  509. }
  510. Logger().Debugln(flowConn.idStr() + " processing incoming protobuf: " + msg.String())
  511. // handle ConfigInfo
  512. if confInfo := msg.ConfigInfo; confInfo != nil {
  513. handleConfigInfo(confInfo)
  514. // TODO: if we ever get a ``safe'' decoy rotation - code below has to be rewritten
  515. if !Assets().IsDecoyInList(flowConn.tdRaw.decoySpec) {
  516. Logger().Warningln(flowConn.idStr() + " current decoy is no " +
  517. "longer in the list, changing it! Read flow probably will break!")
  518. // if current decoy is no longer in the list
  519. flowConn.tdRaw.decoySpec = Assets().GetDecoy()
  520. }
  521. if !Assets().IsDecoyInList(flowConn.tdRaw.decoySpec) {
  522. Logger().Warningln(flowConn.idStr() + " current decoy is no " +
  523. "longer in the list, changing it! Write flow probably will break!")
  524. // if current decoy is no longer in the list
  525. flowConn.tdRaw.decoySpec = Assets().GetDecoy()
  526. }
  527. }
  528. // note that flowConn don't see first-message transitions, such as INIT or RECONNECT
  529. stateTransition := msg.GetStateTransition()
  530. switch stateTransition {
  531. case pb.S2C_Transition_S2C_NO_CHANGE:
  532. // carry on
  533. case pb.S2C_Transition_S2C_SESSION_CLOSE:
  534. Logger().Infof(flowConn.idStr() + " received MSG_CLOSE")
  535. return errMsgClose
  536. case pb.S2C_Transition_S2C_ERROR:
  537. err := errors.New("message from station:" +
  538. msg.GetErrReason().String())
  539. Logger().Errorln(flowConn.idStr() + " " + err.Error())
  540. flowConn.closeWithErrorOnce(err)
  541. return err
  542. case pb.S2C_Transition_S2C_CONFIRM_RECONNECT:
  543. fallthrough
  544. case pb.S2C_Transition_S2C_SESSION_INIT:
  545. fallthrough
  546. default:
  547. err := errors.New("Unexpected StateTransition " +
  548. "in initialized Conn:" + stateTransition.String())
  549. Logger().Errorln(flowConn.idStr() + " " + err.Error())
  550. flowConn.closeWithErrorOnce(err)
  551. return err
  552. }
  553. return nil
  554. }
  555. // LocalAddr returns the local network address.
  556. func (flowConn *TapdanceFlowConn) LocalAddr() net.Addr {
  557. return flowConn.tdRaw.tlsConn.LocalAddr()
  558. }
  559. // RemoteAddr returns the address of current decoy.
  560. // Not goroutine-safe, mostly here to satisfy net.Conn
  561. func (flowConn *TapdanceFlowConn) RemoteAddr() net.Addr {
  562. return flowConn.tdRaw.tlsConn.RemoteAddr()
  563. }
  564. // SetDeadline is supposed to set the read and write deadlines
  565. // associated with the connection. It is equivalent to calling
  566. // both SetReadDeadline and SetWriteDeadline.
  567. //
  568. // TODO: In reality, SetDeadline doesn't do that yet, but
  569. // existence of this function is mandatory to implement net.Conn
  570. //
  571. // A deadline is an absolute time after which I/O operations
  572. // fail with a timeout (see type Error) instead of
  573. // blocking. The deadline applies to all future I/O, not just
  574. // the immediately following call to Read or Write.
  575. //
  576. // An idle timeout can be implemented by repeatedly extending
  577. // the deadline after successful Read or Write calls.
  578. //
  579. // A zero value for t means I/O operations will not time out.
  580. //
  581. func (flowConn *TapdanceFlowConn) SetDeadline(t time.Time) error {
  582. return errNotImplemented
  583. }
  584. // SetReadDeadline sets the deadline for future Read calls.
  585. // A zero value for t means Read will not time out.
  586. func (flowConn *TapdanceFlowConn) SetReadDeadline(t time.Time) error {
  587. return errNotImplemented
  588. }
  589. // SetWriteDeadline sets the deadline for future Write calls.
  590. // Even if write times out, it may return n > 0, indicating that
  591. // some of the data was successfully written.
  592. // A zero value for t means Write will not time out.
  593. func (flowConn *TapdanceFlowConn) SetWriteDeadline(t time.Time) error {
  594. return errNotImplemented
  595. }