|
@@ -541,7 +541,6 @@ func recvLoop(domain dns.Name, dnsConn net.PacketConn, ttConn *turbotunnel.Queue
|
|
|
// fit while keeping the total size under maxEncodedPayload, then sends it.
|
|
// fit while keeping the total size under maxEncodedPayload, then sends it.
|
|
|
func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <-chan *record, maxEncodedPayload int) error {
|
|
func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <-chan *record, maxEncodedPayload int) error {
|
|
|
var nextRec *record
|
|
var nextRec *record
|
|
|
- var nextP []byte
|
|
|
|
|
for {
|
|
for {
|
|
|
rec := nextRec
|
|
rec := nextRec
|
|
|
nextRec = nil
|
|
nextRec = nil
|
|
@@ -571,67 +570,62 @@ func sendLoop(dnsConn net.PacketConn, ttConn *turbotunnel.QueuePacketConn, ch <-
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var payload bytes.Buffer
|
|
var payload bytes.Buffer
|
|
|
-
|
|
|
|
|
limit := maxEncodedPayload
|
|
limit := maxEncodedPayload
|
|
|
- if len(nextP) > 0 {
|
|
|
|
|
- // No length check on any packet left over from
|
|
|
|
|
- // the previous bundle -- if it's too large, we
|
|
|
|
|
- // allow it to be truncated and dropped.
|
|
|
|
|
- limit -= 2 + len(nextP)
|
|
|
|
|
- binary.Write(&payload, binary.BigEndian, uint16(len(nextP)))
|
|
|
|
|
- payload.Write(nextP)
|
|
|
|
|
- }
|
|
|
|
|
- nextP = nil
|
|
|
|
|
-
|
|
|
|
|
- // We loop and write as many packets from OutgoingQueue
|
|
|
|
|
|
|
+ // We loop and bundle as many packets from OutgoingQueue
|
|
|
// into the response as will fit. Any packet that would
|
|
// into the response as will fit. Any packet that would
|
|
|
- // overflow the capacity of the DNS response, we save in
|
|
|
|
|
- // nextP to be included in a future response.
|
|
|
|
|
|
|
+ // overflow the capacity of the DNS response, we stash
|
|
|
|
|
+ // to be bundled into a future response.
|
|
|
timer := time.NewTimer(maxResponseDelay)
|
|
timer := time.NewTimer(maxResponseDelay)
|
|
|
loop:
|
|
loop:
|
|
|
for {
|
|
for {
|
|
|
|
|
+ var p []byte
|
|
|
select {
|
|
select {
|
|
|
- // Prioritize the first two cases over the
|
|
|
|
|
- // OutgoingQueue case. The first two cases are
|
|
|
|
|
- // duplicated under the default case.
|
|
|
|
|
|
|
+ // Check the nextRec, timer, and stash cases
|
|
|
|
|
+ // before considering the OutgoingQueue case.
|
|
|
|
|
+ // Only if all these cases fail do we enter the
|
|
|
|
|
+ // default arm, where they are checked again in
|
|
|
|
|
+ // addition to OutgoingQueue.
|
|
|
case nextRec = <-ch:
|
|
case nextRec = <-ch:
|
|
|
- // If there's another response
|
|
|
|
|
- // waiting to be sent, wait no
|
|
|
|
|
- // longer for a payload for this
|
|
|
|
|
- // one.
|
|
|
|
|
|
|
+ // If there's another response waiting
|
|
|
|
|
+ // to be sent, wait no longer for a
|
|
|
|
|
+ // payload for this one.
|
|
|
break loop
|
|
break loop
|
|
|
case <-timer.C:
|
|
case <-timer.C:
|
|
|
break loop
|
|
break loop
|
|
|
|
|
+ case p = <-ttConn.Unstash(rec.ClientID):
|
|
|
default:
|
|
default:
|
|
|
select {
|
|
select {
|
|
|
case nextRec = <-ch:
|
|
case nextRec = <-ch:
|
|
|
break loop
|
|
break loop
|
|
|
case <-timer.C:
|
|
case <-timer.C:
|
|
|
break loop
|
|
break loop
|
|
|
- case p := <-ttConn.OutgoingQueue(rec.ClientID):
|
|
|
|
|
- // We wait for the first packet
|
|
|
|
|
- // in a bundle only. The second
|
|
|
|
|
- // and later packets must be
|
|
|
|
|
- // immediately available or they
|
|
|
|
|
- // will be omitted from this
|
|
|
|
|
- // send.
|
|
|
|
|
- timer.Reset(0)
|
|
|
|
|
-
|
|
|
|
|
- if int(uint16(len(p))) != len(p) {
|
|
|
|
|
- panic(len(p))
|
|
|
|
|
- }
|
|
|
|
|
- if 2+len(p) > limit {
|
|
|
|
|
- // Save this packet to
|
|
|
|
|
- // send in the next
|
|
|
|
|
- // response.
|
|
|
|
|
- nextP = p
|
|
|
|
|
- break loop
|
|
|
|
|
- }
|
|
|
|
|
- limit -= 2 + len(p)
|
|
|
|
|
- binary.Write(&payload, binary.BigEndian, uint16(len(p)))
|
|
|
|
|
- payload.Write(p)
|
|
|
|
|
|
|
+ case p = <-ttConn.Unstash(rec.ClientID):
|
|
|
|
|
+ case p = <-ttConn.OutgoingQueue(rec.ClientID):
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ // We wait for the first packet in a bundle
|
|
|
|
|
+ // only. The second and later packets must be
|
|
|
|
|
+ // immediately available or they will be omitted
|
|
|
|
|
+ // from this bundle.
|
|
|
|
|
+ timer.Reset(0)
|
|
|
|
|
+
|
|
|
|
|
+ limit -= 2 + len(p)
|
|
|
|
|
+ if payload.Len() == 0 {
|
|
|
|
|
+ // No packet length check for the first
|
|
|
|
|
+ // packet; if it's too large, we allow
|
|
|
|
|
+ // it to be truncated and dropped by the
|
|
|
|
|
+ // receiver.
|
|
|
|
|
+ } else if limit < 0 {
|
|
|
|
|
+ // Stash this packet to send in the next
|
|
|
|
|
+ // response.
|
|
|
|
|
+ ttConn.Stash(p, rec.ClientID)
|
|
|
|
|
+ break loop
|
|
|
|
|
+ }
|
|
|
|
|
+ if int(uint16(len(p))) != len(p) {
|
|
|
|
|
+ panic(len(p))
|
|
|
|
|
+ }
|
|
|
|
|
+ binary.Write(&payload, binary.BigEndian, uint16(len(p)))
|
|
|
|
|
+ payload.Write(p)
|
|
|
}
|
|
}
|
|
|
timer.Stop()
|
|
timer.Stop()
|
|
|
|
|
|