|
|
@@ -30,6 +30,7 @@ import (
|
|
|
"net/http"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
|
|
|
@@ -100,6 +101,7 @@ func NewMeekServer(
|
|
|
meekServer := &MeekServer{
|
|
|
config: config,
|
|
|
listener: listener,
|
|
|
+ clientHandler: clientHandler,
|
|
|
openConns: new(psiphon.Conns),
|
|
|
stopBroadcast: stopBroadcast,
|
|
|
sessions: make(map[string]*meekSession),
|
|
|
@@ -230,7 +232,9 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
|
|
|
|
|
|
err = session.clientConn.PumpReads(request.Body)
|
|
|
if err != nil {
|
|
|
- log.WithContextFields(LogFields{"error": err}).Warning("pump reads failed")
|
|
|
+ if err != io.EOF {
|
|
|
+ log.WithContextFields(LogFields{"error": err}).Warning("pump reads failed")
|
|
|
+ }
|
|
|
server.terminateConnection(responseWriter, request)
|
|
|
server.closeSession(sessionID)
|
|
|
return
|
|
|
@@ -251,7 +255,9 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
|
|
|
|
|
|
err = session.clientConn.PumpWrites(responseWriter)
|
|
|
if err != nil {
|
|
|
- log.WithContextFields(LogFields{"error": err}).Warning("pump writes failed")
|
|
|
+ if err != io.EOF {
|
|
|
+ log.WithContextFields(LogFields{"error": err}).Warning("pump writes failed")
|
|
|
+ }
|
|
|
server.terminateConnection(responseWriter, request)
|
|
|
server.closeSession(sessionID)
|
|
|
return
|
|
|
@@ -572,6 +578,7 @@ type meekConn struct {
|
|
|
remoteAddr net.Addr
|
|
|
protocolVersion int
|
|
|
closeBroadcast chan struct{}
|
|
|
+ closed int32
|
|
|
readLock sync.Mutex
|
|
|
readyReader chan io.Reader
|
|
|
readResult chan error
|
|
|
@@ -585,6 +592,7 @@ func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
|
|
|
remoteAddr: remoteAddr,
|
|
|
protocolVersion: protocolVersion,
|
|
|
closeBroadcast: make(chan struct{}),
|
|
|
+ closed: 0,
|
|
|
readyReader: make(chan io.Reader, 1),
|
|
|
readResult: make(chan error, 1),
|
|
|
nextWriteBuffer: make(chan []byte, 1),
|
|
|
@@ -716,7 +724,9 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
|
|
|
// Close closes the meekConn. This will interrupt any blocked
|
|
|
// Read, Write, PumpReads, and PumpWrites.
|
|
|
func (conn *meekConn) Close() error {
|
|
|
- close(conn.closeBroadcast)
|
|
|
+ if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
|
|
|
+ close(conn.closeBroadcast)
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|