serverEntry.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package protocol
  20. import (
  21. "bufio"
  22. "bytes"
  23. "crypto/ed25519"
  24. "crypto/hmac"
  25. "crypto/rand"
  26. "crypto/sha256"
  27. "encoding/base64"
  28. "encoding/hex"
  29. "encoding/json"
  30. "fmt"
  31. "io"
  32. "net"
  33. "regexp"
  34. "strings"
  35. "sync/atomic"
  36. "time"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. )
  40. // ServerEntry represents a Psiphon server. It contains information
  41. // about how to establish a tunnel connection to the server through
  42. // several protocols. Server entries are JSON records downloaded from
  43. // various sources.
  44. type ServerEntry struct {
  45. Tag string `json:"tag,omitempty"`
  46. IpAddress string `json:"ipAddress,omitempty"`
  47. WebServerPort string `json:"webServerPort,omitempty"` // not an int
  48. WebServerSecret string `json:"webServerSecret,omitempty"`
  49. WebServerCertificate string `json:"webServerCertificate,omitempty"`
  50. SshPort int `json:"sshPort,omitempty"`
  51. SshUsername string `json:"sshUsername,omitempty"`
  52. SshPassword string `json:"sshPassword,omitempty"`
  53. SshHostKey string `json:"sshHostKey,omitempty"`
  54. SshObfuscatedPort int `json:"sshObfuscatedPort,omitempty"`
  55. SshObfuscatedQUICPort int `json:"sshObfuscatedQUICPort,omitempty"`
  56. LimitQUICVersions []string `json:"limitQUICVersions,omitempty"`
  57. SshObfuscatedTapDancePort int `json:"sshObfuscatedTapdancePort,omitempty"`
  58. SshObfuscatedConjurePort int `json:"sshObfuscatedConjurePort,omitempty"`
  59. SshObfuscatedKey string `json:"sshObfuscatedKey,omitempty"`
  60. Capabilities []string `json:"capabilities,omitempty"`
  61. Region string `json:"region,omitempty"`
  62. ProviderID string `json:"providerID,omitempty"`
  63. FrontingProviderID string `json:"frontingProviderID,omitempty"`
  64. TlsOSSHPort int `json:"tlsOSSHPort,omitempty"`
  65. MeekServerPort int `json:"meekServerPort,omitempty"`
  66. MeekCookieEncryptionPublicKey string `json:"meekCookieEncryptionPublicKey,omitempty"`
  67. MeekObfuscatedKey string `json:"meekObfuscatedKey,omitempty"`
  68. MeekFrontingHost string `json:"meekFrontingHost,omitempty"`
  69. MeekFrontingHosts []string `json:"meekFrontingHosts,omitempty"`
  70. MeekFrontingDomain string `json:"meekFrontingDomain,omitempty"`
  71. MeekFrontingAddresses []string `json:"meekFrontingAddresses,omitempty"`
  72. MeekFrontingAddressesRegex string `json:"meekFrontingAddressesRegex,omitempty"`
  73. MeekFrontingDisableSNI bool `json:"meekFrontingDisableSNI,omitempty"`
  74. TacticsRequestPublicKey string `json:"tacticsRequestPublicKey,omitempty"`
  75. TacticsRequestObfuscatedKey string `json:"tacticsRequestObfuscatedKey,omitempty"`
  76. ConfigurationVersion int `json:"configurationVersion,omitempty"`
  77. Signature string `json:"signature,omitempty"`
  78. DisableHTTPTransforms bool `json:"disableHTTPTransforms,omitempty"`
  79. DisableObfuscatedQUICTransforms bool `json:"disableObfuscatedQUICTransforms,omitempty"`
  80. DisableOSSHTransforms bool `json:"disableOSSHTransforms,omitempty"`
  81. DisableOSSHPrefix bool `json:"disableOSSHPrefix,omitempty"`
  82. InproxySessionPublicKey string `json:"inproxySessionPublicKey,omitempty"`
  83. InproxySessionRootObfuscationSecret string `json:"inproxySessionRootObfuscationSecret,omitempty"`
  84. InproxySSHPort int `json:"inproxySSHPort,omitempty"`
  85. InproxyOSSHPort int `json:"inproxyOSSHPort,omitempty"`
  86. InproxyQUICPort int `json:"inproxyQUICPort,omitempty"`
  87. InproxyMeekPort int `json:"inproxyMeekPort,omitempty"`
  88. InproxyTlsOSSHPort int `json:"inproxyTlsOSSHPort,omitempty"`
  89. // These local fields are not expected to be present in downloaded server
  90. // entries. They are added by the client to record and report stats about
  91. // how and when server entries are obtained.
  92. // All local fields should be included the list of fields in RemoveUnsignedFields.
  93. LocalSource string `json:"localSource,omitempty"`
  94. LocalTimestamp string `json:"localTimestamp,omitempty"`
  95. IsLocalDerivedTag bool `json:"isLocalDerivedTag,omitempty"`
  96. }
  97. // ServerEntryFields is an alternate representation of ServerEntry which
  98. // enables future compatibility when unmarshaling and persisting new server
  99. // entries which may contain new, unrecognized fields not in the ServerEntry
  100. // type for a particular client version.
  101. //
  102. // When new JSON server entries with new fields are unmarshaled to ServerEntry
  103. // types, unrecognized fields are discarded. When unmarshaled to
  104. // ServerEntryFields, unrecognized fields are retained and may be persisted
  105. // and available when the client is upgraded and unmarshals to an updated
  106. // ServerEntry type.
  107. type ServerEntryFields map[string]interface{}
  108. // GetServerEntry converts a ServerEntryFields into a ServerEntry.
  109. func (fields ServerEntryFields) GetServerEntry() (*ServerEntry, error) {
  110. marshaledServerEntry, err := json.Marshal(fields)
  111. if err != nil {
  112. return nil, errors.Trace(err)
  113. }
  114. var serverEntry *ServerEntry
  115. err = json.Unmarshal(marshaledServerEntry, &serverEntry)
  116. if err != nil {
  117. return nil, errors.Trace(err)
  118. }
  119. return serverEntry, nil
  120. }
  121. func (fields ServerEntryFields) GetTag() string {
  122. tag, ok := fields["tag"]
  123. if !ok {
  124. return ""
  125. }
  126. tagStr, ok := tag.(string)
  127. if !ok {
  128. return ""
  129. }
  130. return tagStr
  131. }
  132. // SetTag sets a local, derived server entry tag. A tag is an identifier used
  133. // in server entry pruning and potentially other use cases. An explict tag,
  134. // set by the Psiphon Network, may be present in a server entry that is
  135. // imported; otherwise, the client will set a derived tag. The tag should be
  136. // generated using GenerateServerEntryTag. When SetTag finds a explicit tag,
  137. // the new, derived tag is ignored. The isLocalTag local field is set to
  138. // distinguish explict and derived tags and is used in signature verification
  139. // to determine if the tag field is part of the signature.
  140. func (fields ServerEntryFields) SetTag(tag string) {
  141. // Don't replace explicit tag
  142. if tag, ok := fields["tag"]; ok {
  143. tagStr, ok := tag.(string)
  144. if ok && tagStr != "" {
  145. isLocalDerivedTag, ok := fields["isLocalDerivedTag"]
  146. if !ok {
  147. return
  148. }
  149. isLocalDerivedTagBool, ok := isLocalDerivedTag.(bool)
  150. if ok && !isLocalDerivedTagBool {
  151. return
  152. }
  153. }
  154. }
  155. fields["tag"] = tag
  156. // Mark this tag as local
  157. fields["isLocalDerivedTag"] = true
  158. }
  159. func (fields ServerEntryFields) GetDiagnosticID() string {
  160. tag, ok := fields["tag"]
  161. if !ok {
  162. return ""
  163. }
  164. tagStr, ok := tag.(string)
  165. if !ok {
  166. return ""
  167. }
  168. return TagToDiagnosticID(tagStr)
  169. }
  170. func (fields ServerEntryFields) GetIPAddress() string {
  171. ipAddress, ok := fields["ipAddress"]
  172. if !ok {
  173. return ""
  174. }
  175. ipAddressStr, ok := ipAddress.(string)
  176. if !ok {
  177. return ""
  178. }
  179. return ipAddressStr
  180. }
  181. func (fields ServerEntryFields) GetWebServerPort() string {
  182. webServerPort, ok := fields["webServerPort"]
  183. if !ok {
  184. return ""
  185. }
  186. webServerPortStr, ok := webServerPort.(string)
  187. if !ok {
  188. return ""
  189. }
  190. return webServerPortStr
  191. }
  192. func (fields ServerEntryFields) GetWebServerSecret() string {
  193. webServerSecret, ok := fields["webServerSecret"]
  194. if !ok {
  195. return ""
  196. }
  197. webServerSecretStr, ok := webServerSecret.(string)
  198. if !ok {
  199. return ""
  200. }
  201. return webServerSecretStr
  202. }
  203. func (fields ServerEntryFields) GetWebServerCertificate() string {
  204. webServerCertificate, ok := fields["webServerCertificate"]
  205. if !ok {
  206. return ""
  207. }
  208. webServerCertificateStr, ok := webServerCertificate.(string)
  209. if !ok {
  210. return ""
  211. }
  212. return webServerCertificateStr
  213. }
  214. func (fields ServerEntryFields) GetConfigurationVersion() int {
  215. configurationVersion, ok := fields["configurationVersion"]
  216. if !ok {
  217. return 0
  218. }
  219. configurationVersionFloat, ok := configurationVersion.(float64)
  220. if !ok {
  221. return 0
  222. }
  223. return int(configurationVersionFloat)
  224. }
  225. func (fields ServerEntryFields) GetLocalSource() string {
  226. localSource, ok := fields["localSource"]
  227. if !ok {
  228. return ""
  229. }
  230. localSourceStr, ok := localSource.(string)
  231. if !ok {
  232. return ""
  233. }
  234. return localSourceStr
  235. }
  236. func (fields ServerEntryFields) SetLocalSource(source string) {
  237. fields["localSource"] = source
  238. }
  239. func (fields ServerEntryFields) GetLocalTimestamp() string {
  240. localTimestamp, ok := fields["localTimestamp"]
  241. if !ok {
  242. return ""
  243. }
  244. localTimestampStr, ok := localTimestamp.(string)
  245. if !ok {
  246. return ""
  247. }
  248. return localTimestampStr
  249. }
  250. func (fields ServerEntryFields) SetLocalTimestamp(timestamp string) {
  251. fields["localTimestamp"] = timestamp
  252. }
  253. func (fields ServerEntryFields) HasSignature() bool {
  254. signature, ok := fields["signature"]
  255. if !ok {
  256. return false
  257. }
  258. signatureStr, ok := signature.(string)
  259. if !ok {
  260. return false
  261. }
  262. return signatureStr != ""
  263. }
  264. const signaturePublicKeyDigestSize = 8
  265. // AddSignature signs a server entry and attaches a new field containing the
  266. // signature. Any existing "signature" field will be replaced.
  267. //
  268. // The signature incudes a public key ID that is derived from a digest of the
  269. // public key value. This ID is intended for future use when multiple signing
  270. // keys may be deployed.
  271. func (fields ServerEntryFields) AddSignature(publicKey, privateKey string) error {
  272. // Make a copy so that removing unsigned fields will have no side effects
  273. copyFields := make(ServerEntryFields)
  274. for k, v := range fields {
  275. copyFields[k] = v
  276. }
  277. copyFields.RemoveUnsignedFields()
  278. delete(copyFields, "signature")
  279. // Best practise would be to sign the JSON encoded server entry bytes and
  280. // append the signature to those bytes. However, due to backwards
  281. // compatibility requirements, we must retain the outer server entry encoding
  282. // as-is and insert the signature.
  283. //
  284. // Limitation: since the verifyier must remarshal its server entry before
  285. // verifying, the JSON produced there must be a byte-for-byte match to the
  286. // JSON signed here. The precise output of the JSON encoder that is used,
  287. // "encoding/json", with default formatting, as of Go 1.11.5, is therefore
  288. // part of the signature protocol.
  289. //
  290. // TODO: use a standard, canonical encoding, such as JCS:
  291. // https://tools.ietf.org/id/draft-rundgren-json-canonicalization-scheme-05.html
  292. marshaledFields, err := json.Marshal(copyFields)
  293. if err != nil {
  294. return errors.Trace(err)
  295. }
  296. decodedPublicKey, err := base64.StdEncoding.DecodeString(publicKey)
  297. if err != nil {
  298. return errors.Trace(err)
  299. }
  300. publicKeyDigest := sha256.Sum256(decodedPublicKey)
  301. publicKeyID := publicKeyDigest[:signaturePublicKeyDigestSize]
  302. decodedPrivateKey, err := base64.StdEncoding.DecodeString(privateKey)
  303. if err != nil {
  304. return errors.Trace(err)
  305. }
  306. signature := ed25519.Sign(decodedPrivateKey, marshaledFields)
  307. fields["signature"] = base64.StdEncoding.EncodeToString(
  308. append(publicKeyID, signature...))
  309. return nil
  310. }
  311. // VerifySignature verifies the signature set by AddSignature.
  312. //
  313. // VerifySignature must be called before using any server entry that is
  314. // imported from an untrusted source, such as client-to-client exchange.
  315. func (fields ServerEntryFields) VerifySignature(publicKey string) error {
  316. if publicKey == "" {
  317. return errors.TraceNew("missing public key")
  318. }
  319. // Make a copy so that removing unsigned fields will have no side effects
  320. copyFields := make(ServerEntryFields)
  321. for k, v := range fields {
  322. copyFields[k] = v
  323. }
  324. signatureField, ok := copyFields["signature"]
  325. if !ok {
  326. return errors.TraceNew("missing signature field")
  327. }
  328. signatureFieldStr, ok := signatureField.(string)
  329. if !ok {
  330. return errors.TraceNew("invalid signature field")
  331. }
  332. decodedSignatureField, err := base64.StdEncoding.DecodeString(signatureFieldStr)
  333. if err != nil {
  334. return errors.Trace(err)
  335. }
  336. if len(decodedSignatureField) < signaturePublicKeyDigestSize {
  337. return errors.TraceNew("invalid signature field length")
  338. }
  339. publicKeyID := decodedSignatureField[:signaturePublicKeyDigestSize]
  340. signature := decodedSignatureField[signaturePublicKeyDigestSize:]
  341. if len(signature) != ed25519.SignatureSize {
  342. return errors.TraceNew("invalid signature length")
  343. }
  344. decodedPublicKey, err := base64.StdEncoding.DecodeString(publicKey)
  345. if err != nil {
  346. return errors.Trace(err)
  347. }
  348. publicKeyDigest := sha256.Sum256(decodedPublicKey)
  349. expectedPublicKeyID := publicKeyDigest[:signaturePublicKeyDigestSize]
  350. if !bytes.Equal(expectedPublicKeyID, publicKeyID) {
  351. return errors.TraceNew("unexpected public key ID")
  352. }
  353. copyFields.RemoveUnsignedFields()
  354. delete(copyFields, "signature")
  355. marshaledFields, err := json.Marshal(copyFields)
  356. if err != nil {
  357. return errors.Trace(err)
  358. }
  359. if !ed25519.Verify(decodedPublicKey, marshaledFields, signature) {
  360. return errors.TraceNew("invalid signature")
  361. }
  362. return nil
  363. }
  364. // RemoveUnsignedFields prepares a server entry for signing or signature
  365. // verification by removing unsigned fields. The JSON marshalling of the
  366. // remaining fields is the data that is signed.
  367. func (fields ServerEntryFields) RemoveUnsignedFields() {
  368. delete(fields, "localSource")
  369. delete(fields, "localTimestamp")
  370. // Only non-local, explicit tags are part of the signature
  371. isLocalDerivedTag := fields["isLocalDerivedTag"]
  372. isLocalDerivedTagBool, ok := isLocalDerivedTag.(bool)
  373. if ok && isLocalDerivedTagBool {
  374. delete(fields, "tag")
  375. }
  376. delete(fields, "isLocalDerivedTag")
  377. }
  378. // NewServerEntrySignatureKeyPair creates an ed25519 key pair for use in
  379. // server entry signing and verification.
  380. func NewServerEntrySignatureKeyPair() (string, string, error) {
  381. publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
  382. if err != nil {
  383. return "", "", errors.Trace(err)
  384. }
  385. return base64.StdEncoding.EncodeToString(publicKey),
  386. base64.StdEncoding.EncodeToString(privateKey),
  387. nil
  388. }
  389. // GetCapability returns the server capability corresponding
  390. // to the tunnel protocol.
  391. func GetCapability(protocol string) string {
  392. // The "-OSSH" suffix drop is for legacy compatibility. Newer protocols,
  393. // including in-proxy protocols, use the full protocol name as the
  394. // capability. This avoids ambiguities such as in the case
  395. // of "INPROXY-WEBRTC-OSSH", where a truncated "INPROXY-WEBRTC" is
  396. // ambiguous.
  397. if TunnelProtocolUsesInproxy(protocol) {
  398. return protocol
  399. }
  400. return strings.TrimSuffix(protocol, "-OSSH")
  401. }
  402. // GetTacticsCapability returns the server tactics capability
  403. // corresponding to the tunnel protocol.
  404. func GetTacticsCapability(protocol string) string {
  405. return GetCapability(protocol) + "-TACTICS"
  406. }
  407. // hasCapability indicates if the server entry has the specified capability.
  408. //
  409. // Any internal "PASSTHROUGH-v2 or "PASSTHROUGH" component in the server
  410. // entry's capabilities is ignored. These PASSTHROUGH components are used to
  411. // mask protocols which are running the passthrough mechanisms from older
  412. // clients which do not implement the passthrough messages. Older clients
  413. // will treat these capabilities as unknown protocols and skip them.
  414. //
  415. // Any "QUICv1" capability is treated as "QUIC". "QUICv1" is used to mask the
  416. // QUIC-OSSH capability from older clients to ensure that older clients do
  417. // not send gQUIC packets to second generation QUICv1-only QUIC-OSSH servers.
  418. // New clients must check SupportsOnlyQUICv1 before selecting a QUIC version;
  419. // for "QUICv1", this ensures that new clients also do not select gQUIC to
  420. // QUICv1-only servers.
  421. //
  422. // In-proxy tunnel protocols omit the "v1" and "PASSTHROUGH" suffixes. For
  423. // in-proxy QUIC, gQUIC is never used; and for in-proxy HTTPS/TLS, clients
  424. // always apply PASSTHROUGH-v2.
  425. func (serverEntry *ServerEntry) hasCapability(requiredCapability string) bool {
  426. for _, capability := range serverEntry.Capabilities {
  427. originalCapability := capability
  428. capability = strings.ReplaceAll(capability, "-PASSTHROUGH-v2", "")
  429. capability = strings.ReplaceAll(capability, "-PASSTHROUGH", "")
  430. quicCapability := GetCapability(TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH)
  431. if capability == quicCapability+"v1" {
  432. capability = quicCapability
  433. }
  434. if capability == requiredCapability {
  435. return true
  436. }
  437. // Special case: some capabilities may additionally support TLS-OSSH.
  438. // This does not apply to in-proxy TLS-OSSH.
  439. if requiredCapability == GetCapability(TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH) &&
  440. capabilitySupportsTLSOSSH(originalCapability) {
  441. return true
  442. }
  443. }
  444. return false
  445. }
  446. // capabilitySupportsTLSOSSH returns true if and only if the given capability
  447. // supports TLS-OSSH in addition to its primary protocol.
  448. func capabilitySupportsTLSOSSH(capability string) bool {
  449. tlsCapabilities := []string{
  450. GetCapability(TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS),
  451. GetCapability(TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET),
  452. }
  453. for _, tlsCapability := range tlsCapabilities {
  454. // The TLS capability is additionally supported by UNFRONTED-MEEK-HTTPS
  455. // and UNFRONTED-MEEK-SESSION-TICKET capabilities with passthrough.
  456. if capability == tlsCapability+"-PASSTHROUGH-v2" {
  457. return true
  458. }
  459. }
  460. return false
  461. }
  462. // SupportsProtocol returns true if and only if the ServerEntry has
  463. // the necessary capability to support the specified tunnel protocol.
  464. func (serverEntry *ServerEntry) SupportsProtocol(protocol string) bool {
  465. requiredCapability := GetCapability(protocol)
  466. return serverEntry.hasCapability(requiredCapability)
  467. }
  468. // ProtocolUsesLegacyPassthrough indicates whether the ServerEntry supports
  469. // the specified protocol using legacy passthrough messages.
  470. //
  471. // There is no corresponding check for v2 passthrough, as clients send v2
  472. // passthrough messages unconditionally, by default, for passthrough
  473. // protocols.
  474. func (serverEntry *ServerEntry) ProtocolUsesLegacyPassthrough(protocol string) bool {
  475. legacyCapability := GetCapability(protocol) + "-PASSTHROUGH"
  476. for _, capability := range serverEntry.Capabilities {
  477. if capability == legacyCapability {
  478. return true
  479. }
  480. }
  481. return false
  482. }
  483. // SupportsOnlyQUICv1 indicates that the QUIC-OSSH server supports only QUICv1
  484. // and gQUIC versions should not be selected, as they will fail to connect
  485. // while sending atypical traffic to the server.
  486. //
  487. // SupportsOnlyQUICv1 strictly applies to QUIC-OSSH and not the in-proxy
  488. // variant.
  489. func (serverEntry *ServerEntry) SupportsOnlyQUICv1() bool {
  490. quicCapability := GetCapability(TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH)
  491. return common.Contains(serverEntry.Capabilities, quicCapability+"v1") &&
  492. !common.Contains(serverEntry.Capabilities, quicCapability)
  493. }
  494. // ConditionallyEnabledComponents defines an interface which can be queried to
  495. // determine which conditionally compiled protocol components are present.
  496. type ConditionallyEnabledComponents interface {
  497. QUICEnabled() bool
  498. RefractionNetworkingEnabled() bool
  499. }
  500. // TunnelProtocolPortLists is a map from tunnel protocol names (or "All") to a
  501. // list of port number ranges.
  502. type TunnelProtocolPortLists map[string]*common.PortList
  503. // GetSupportedProtocols returns a list of tunnel protocols supported by the
  504. // ServerEntry's capabilities and allowed by various constraints.
  505. func (serverEntry *ServerEntry) GetSupportedProtocols(
  506. conditionallyEnabled ConditionallyEnabledComponents,
  507. useUpstreamProxy bool,
  508. limitTunnelProtocols TunnelProtocols,
  509. limitTunnelDialPortNumbers TunnelProtocolPortLists,
  510. limitQUICVersions QUICVersions,
  511. excludeIntensive bool,
  512. excludeInproxy bool) TunnelProtocols {
  513. supportedProtocols := make(TunnelProtocols, 0)
  514. for _, tunnelProtocol := range SupportedTunnelProtocols {
  515. if useUpstreamProxy && !TunnelProtocolSupportsUpstreamProxy(tunnelProtocol) {
  516. continue
  517. }
  518. if common.Contains(DisabledTunnelProtocols, tunnelProtocol) {
  519. continue
  520. }
  521. if len(limitTunnelProtocols) > 0 {
  522. if !common.Contains(limitTunnelProtocols, tunnelProtocol) {
  523. continue
  524. }
  525. } else {
  526. if common.Contains(DefaultDisabledTunnelProtocols, tunnelProtocol) {
  527. continue
  528. }
  529. }
  530. if excludeIntensive && TunnelProtocolIsResourceIntensive(tunnelProtocol) {
  531. continue
  532. }
  533. // While in-proxy protocols are TunnelProtocolIsResourceIntensive,
  534. // there's an additional use case for excluding in-proxy protocols as
  535. // controlled by InproxyTunnelProtocolSelectionProbability.
  536. if excludeInproxy && TunnelProtocolUsesInproxy(tunnelProtocol) {
  537. continue
  538. }
  539. if (TunnelProtocolUsesQUIC(tunnelProtocol) && !conditionallyEnabled.QUICEnabled()) ||
  540. (TunnelProtocolUsesRefractionNetworking(tunnelProtocol) &&
  541. !conditionallyEnabled.RefractionNetworkingEnabled()) {
  542. continue
  543. }
  544. if !serverEntry.SupportsProtocol(tunnelProtocol) {
  545. continue
  546. }
  547. // If the server is limiting QUIC versions, at least one must be
  548. // supported. And if tactics is also limiting QUIC versions, there
  549. // must be a common version in both limit lists for this server entry
  550. // to support QUIC-OSSH.
  551. //
  552. // Limitation: to avoid additional complexity, we do not consider
  553. // DisableFrontingProviderQUICVersion here, as fronting providers are
  554. // expected to support QUICv1 and gQUIC is expected to become
  555. // obsolete in general.
  556. if TunnelProtocolUsesQUIC(tunnelProtocol) && len(serverEntry.LimitQUICVersions) > 0 {
  557. if !common.ContainsAny(serverEntry.LimitQUICVersions, SupportedQUICVersions) {
  558. continue
  559. }
  560. if len(limitQUICVersions) > 0 &&
  561. !common.ContainsAny(serverEntry.LimitQUICVersions, limitQUICVersions) {
  562. continue
  563. }
  564. }
  565. dialPortNumber, err := serverEntry.GetDialPortNumber(tunnelProtocol)
  566. if err != nil {
  567. continue
  568. }
  569. if len(limitTunnelDialPortNumbers) > 0 {
  570. if portList, ok := limitTunnelDialPortNumbers[tunnelProtocol]; ok {
  571. if !portList.Lookup(dialPortNumber) {
  572. continue
  573. }
  574. } else if portList, ok := limitTunnelDialPortNumbers[TUNNEL_PROTOCOLS_ALL]; ok {
  575. if !portList.Lookup(dialPortNumber) {
  576. continue
  577. }
  578. }
  579. }
  580. supportedProtocols = append(supportedProtocols, tunnelProtocol)
  581. }
  582. return supportedProtocols
  583. }
  584. var frontedMeekHTTPSDialPortNumber = int32(443)
  585. // SetFrontedMeekHTTPDialPortNumber sets the FRONTED-MEEK-OSSH dial port
  586. // number, which defaults to 443. Overriding the port number enables running
  587. // test servers where binding to port 443 is not possible.
  588. func SetFrontedMeekHTTPDialPortNumber(port int) {
  589. atomic.StoreInt32(&frontedMeekHTTPSDialPortNumber, int32(port))
  590. }
  591. func (serverEntry *ServerEntry) GetDialPortNumber(tunnelProtocol string) (int, error) {
  592. if !serverEntry.SupportsProtocol(tunnelProtocol) {
  593. return 0, errors.TraceNew("protocol not supported")
  594. }
  595. if !TunnelProtocolUsesInproxy(tunnelProtocol) {
  596. switch tunnelProtocol {
  597. case TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH:
  598. if serverEntry.TlsOSSHPort == 0 {
  599. // Special case: a server which supports UNFRONTED-MEEK-HTTPS-OSSH
  600. // or UNFRONTED-MEEK-SESSION-TICKET-OSSH also supports TLS-OSSH
  601. // over the same port.
  602. return serverEntry.MeekServerPort, nil
  603. }
  604. return serverEntry.TlsOSSHPort, nil
  605. case TUNNEL_PROTOCOL_SSH:
  606. return serverEntry.SshPort, nil
  607. case TUNNEL_PROTOCOL_OBFUSCATED_SSH:
  608. return serverEntry.SshObfuscatedPort, nil
  609. case TUNNEL_PROTOCOL_TAPDANCE_OBFUSCATED_SSH:
  610. return serverEntry.SshObfuscatedTapDancePort, nil
  611. case TUNNEL_PROTOCOL_CONJURE_OBFUSCATED_SSH:
  612. return serverEntry.SshObfuscatedConjurePort, nil
  613. case TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH:
  614. return serverEntry.SshObfuscatedQUICPort, nil
  615. case TUNNEL_PROTOCOL_FRONTED_MEEK,
  616. TUNNEL_PROTOCOL_FRONTED_MEEK_QUIC_OBFUSCATED_SSH:
  617. return int(atomic.LoadInt32(&frontedMeekHTTPSDialPortNumber)), nil
  618. case TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
  619. return 80, nil
  620. case TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  621. TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET,
  622. TUNNEL_PROTOCOL_UNFRONTED_MEEK:
  623. return serverEntry.MeekServerPort, nil
  624. }
  625. } else {
  626. // Distinct dial/listening ports are used for tunnel protocols when
  627. // used as an in-proxy 2nd hop, as the server will require a relayed
  628. // in-proxy broker report for in-proxy 2nd hops.
  629. switch TunnelProtocolMinusInproxy(tunnelProtocol) {
  630. case TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH:
  631. if serverEntry.InproxyTlsOSSHPort == 0 {
  632. return serverEntry.InproxyMeekPort, nil
  633. }
  634. return serverEntry.InproxyTlsOSSHPort, nil
  635. case TUNNEL_PROTOCOL_SSH:
  636. return serverEntry.InproxySSHPort, nil
  637. case TUNNEL_PROTOCOL_OBFUSCATED_SSH:
  638. return serverEntry.InproxyOSSHPort, nil
  639. case TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH:
  640. return serverEntry.InproxyQUICPort, nil
  641. case TUNNEL_PROTOCOL_FRONTED_MEEK,
  642. TUNNEL_PROTOCOL_FRONTED_MEEK_QUIC_OBFUSCATED_SSH:
  643. return int(atomic.LoadInt32(&frontedMeekHTTPSDialPortNumber)), nil
  644. case TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
  645. return 80, nil
  646. case TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  647. TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET,
  648. TUNNEL_PROTOCOL_UNFRONTED_MEEK:
  649. return serverEntry.InproxyMeekPort, nil
  650. }
  651. }
  652. return 0, errors.TraceNew("unknown protocol")
  653. }
  654. // IsValidInproxyDialAddress indicates whether the dial destination
  655. // network/host/port matches the dial parameters for any of the tunnel
  656. // protocols supported by the server entry.
  657. //
  658. // Limitations:
  659. // - TAPDANCE-OSSH and CONJURE-OSSH are not supported.
  660. // - The host header is not considered in the case of fronted protocols.
  661. func (serverEntry *ServerEntry) IsValidInproxyDialAddress(
  662. networkProtocol string, dialHost string, dialPortNumber int) bool {
  663. // The TapDance and Conjure destination addresses are not included
  664. // in the server entry, so TAPDANCE-OSSH and CONJURE-OSSH dial
  665. // destinations cannot be validated for in-proxy use.
  666. for _, tunnelProtocol := range SupportedTunnelProtocols {
  667. if !TunnelProtocolUsesInproxy(tunnelProtocol) {
  668. continue
  669. }
  670. if !serverEntry.SupportsProtocol(tunnelProtocol) {
  671. continue
  672. }
  673. usesTCP := TunnelProtocolUsesTCP(tunnelProtocol)
  674. if (usesTCP && networkProtocol != "tcp") || (!usesTCP && networkProtocol != "udp") {
  675. continue
  676. }
  677. tunnelPortNumber, err := serverEntry.GetDialPortNumber(tunnelProtocol)
  678. if err != nil || tunnelPortNumber != dialPortNumber {
  679. // Silently fail on error as the server entry should be well-formed.
  680. continue
  681. }
  682. if !TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  683. // For all direct protocols, the destination host must be the
  684. // server IP address.
  685. if serverEntry.IpAddress != dialHost {
  686. continue
  687. }
  688. } else {
  689. // For fronted protocols, the destination host may be domain and
  690. // must match either MeekFrontingAddressesRegex or
  691. // MeekFrontingAddresses. As in psiphon.selectFrontingParameters,
  692. // MeekFrontingAddressesRegex takes precedence when not empty.
  693. //
  694. // As the host header value is not checked here, additional
  695. // measures must be taken to ensure the destination is a Psiphon server.
  696. if len(serverEntry.MeekFrontingAddressesRegex) > 0 {
  697. re, err := regexp.Compile(serverEntry.MeekFrontingAddressesRegex)
  698. if err != nil {
  699. continue
  700. }
  701. // The entire dialHost string must match the regex.
  702. re.Longest()
  703. match := re.FindString(dialHost)
  704. if match == "" || match != dialHost {
  705. continue
  706. }
  707. } else {
  708. if !common.Contains(serverEntry.MeekFrontingAddresses, dialHost) {
  709. continue
  710. }
  711. }
  712. }
  713. // When all of the checks pass for this protocol, the input is a valid
  714. // dial destination.
  715. return true
  716. }
  717. return false
  718. }
  719. // GetSupportedTacticsProtocols returns a list of tunnel protocols,
  720. // supported by the ServerEntry's capabilities, that may be used
  721. // for tactics requests.
  722. func (serverEntry *ServerEntry) GetSupportedTacticsProtocols() []string {
  723. supportedProtocols := make([]string, 0)
  724. for _, protocol := range SupportedTunnelProtocols {
  725. if !TunnelProtocolUsesMeek(protocol) {
  726. continue
  727. }
  728. requiredCapability := GetTacticsCapability(protocol)
  729. if !serverEntry.hasCapability(requiredCapability) {
  730. continue
  731. }
  732. supportedProtocols = append(supportedProtocols, protocol)
  733. }
  734. return supportedProtocols
  735. }
  736. // SupportsSSHAPIRequests returns true when the server supports
  737. // SSH API requests.
  738. func (serverEntry *ServerEntry) SupportsSSHAPIRequests() bool {
  739. return serverEntry.hasCapability(CAPABILITY_SSH_API_REQUESTS)
  740. }
  741. func (serverEntry *ServerEntry) HasProviderID() bool {
  742. return serverEntry.ProviderID != ""
  743. }
  744. func (serverEntry *ServerEntry) HasSignature() bool {
  745. return serverEntry.Signature != ""
  746. }
  747. // GetSignedServerEntryFields converts a signed ServerEntry into
  748. // ServerEntryFields.
  749. //
  750. // Limitation: a ServerEntry loaded from the local datastore may be missing
  751. // unmarshaled fields in the case where the server entry JSON contains fields
  752. // introduced in newer code; the ServerEntryFields in the db will contain
  753. // these fields, but not the ServerEntry. Prefer loading the
  754. // ServerEntryFields from psiphon.GetSignedServerEntryFields, which will
  755. // include all these fields, which may be included in the signature.
  756. //
  757. // GetSignedServerEntryFields is intended for use with limited cases such as
  758. // testing with TargetServerEntry.
  759. func (serverEntry *ServerEntry) GetSignedServerEntryFields() (ServerEntryFields, error) {
  760. if !serverEntry.HasSignature() {
  761. return nil, errors.TraceNew("missing signature")
  762. }
  763. serverEntryJSON, err := json.Marshal(serverEntry)
  764. if err != nil {
  765. return nil, errors.Trace(err)
  766. }
  767. var serverEntryFields ServerEntryFields
  768. err = json.Unmarshal(serverEntryJSON, &serverEntryFields)
  769. if err != nil {
  770. return nil, errors.Trace(err)
  771. }
  772. serverEntryFields.RemoveUnsignedFields()
  773. return serverEntryFields, nil
  774. }
  775. func (serverEntry *ServerEntry) GetDiagnosticID() string {
  776. return TagToDiagnosticID(serverEntry.Tag)
  777. }
  778. // GenerateServerEntryTag creates a server entry tag value that is
  779. // cryptographically derived from the IP address and web server secret in a
  780. // way that is difficult to reverse the IP address value from the tag or
  781. // compute the tag without having the web server secret, a 256-bit random
  782. // value which is unique per server, in addition to the IP address. A database
  783. // consisting only of server entry tags should be resistent to an attack that
  784. // attempts to reverse all the server IPs, even given a small IP space (IPv4),
  785. // or some subset of the web server secrets.
  786. func GenerateServerEntryTag(ipAddress, webServerSecret string) string {
  787. h := hmac.New(sha256.New, []byte(webServerSecret))
  788. h.Write([]byte(ipAddress))
  789. return base64.StdEncoding.EncodeToString(h.Sum(nil))
  790. }
  791. // TagToDiagnosticID returns a prefix of the server entry tag that should be
  792. // sufficient to uniquely identify servers in diagnostics, while also being
  793. // more human readable than emitting the full tag. The tag is used as the base
  794. // of the diagnostic ID as it doesn't leak the server IP address in diagnostic
  795. // output.
  796. func TagToDiagnosticID(tag string) string {
  797. if len(tag) < 8 {
  798. return "<unknown>"
  799. }
  800. return tag[:8]
  801. }
  802. // EncodeServerEntry returns a string containing the encoding of
  803. // a ServerEntry following Psiphon conventions.
  804. func EncodeServerEntry(serverEntry *ServerEntry) (string, error) {
  805. encodedServerEntry, err := encodeServerEntry(
  806. serverEntry.IpAddress,
  807. serverEntry.WebServerPort,
  808. serverEntry.WebServerSecret,
  809. serverEntry.WebServerCertificate,
  810. serverEntry)
  811. if err != nil {
  812. return "", errors.Trace(err)
  813. }
  814. return encodedServerEntry, nil
  815. }
  816. // EncodeServerEntryFields returns a string containing the encoding of
  817. // ServerEntryFields following Psiphon conventions.
  818. func EncodeServerEntryFields(serverEntryFields ServerEntryFields) (string, error) {
  819. encodedServerEntry, err := encodeServerEntry(
  820. serverEntryFields.GetIPAddress(),
  821. serverEntryFields.GetWebServerPort(),
  822. serverEntryFields.GetWebServerSecret(),
  823. serverEntryFields.GetWebServerCertificate(),
  824. serverEntryFields)
  825. if err != nil {
  826. return "", errors.Trace(err)
  827. }
  828. return encodedServerEntry, nil
  829. }
  830. func encodeServerEntry(
  831. prefixIPAddress string,
  832. prefixWebServerPort string,
  833. prefixWebServerSecret string,
  834. prefixWebServerCertificate string,
  835. serverEntry interface{}) (string, error) {
  836. serverEntryJSON, err := json.Marshal(serverEntry)
  837. if err != nil {
  838. return "", errors.Trace(err)
  839. }
  840. // Legacy clients use a space-delimited fields prefix, and all clients expect
  841. // to at least parse the prefix in order to skip over it.
  842. //
  843. // When the server entry has no web API server certificate, the entire prefix
  844. // can be compacted down to single character placeholders. Clients that can
  845. // use the ssh API always prefer it over the web API and won't use the prefix
  846. // values.
  847. if len(prefixWebServerCertificate) == 0 {
  848. prefixIPAddress = "0"
  849. prefixWebServerPort = "0"
  850. prefixWebServerSecret = "0"
  851. prefixWebServerCertificate = "0"
  852. }
  853. return hex.EncodeToString([]byte(fmt.Sprintf(
  854. "%s %s %s %s %s",
  855. prefixIPAddress,
  856. prefixWebServerPort,
  857. prefixWebServerSecret,
  858. prefixWebServerCertificate,
  859. serverEntryJSON))), nil
  860. }
  861. // DecodeServerEntry extracts a server entry from the encoding
  862. // used by remote server lists and Psiphon server handshake requests.
  863. //
  864. // The resulting ServerEntry.LocalSource is populated with serverEntrySource,
  865. // which should be one of SERVER_ENTRY_SOURCE_EMBEDDED, SERVER_ENTRY_SOURCE_REMOTE,
  866. // SERVER_ENTRY_SOURCE_DISCOVERY, SERVER_ENTRY_SOURCE_TARGET,
  867. // SERVER_ENTRY_SOURCE_OBFUSCATED.
  868. // ServerEntry.LocalTimestamp is populated with the provided timestamp, which
  869. // should be a RFC 3339 formatted string. These local fields are stored with the
  870. // server entry and reported to the server as stats (a coarse granularity timestamp
  871. // is reported).
  872. func DecodeServerEntry(
  873. encodedServerEntry, timestamp, serverEntrySource string) (*ServerEntry, error) {
  874. serverEntry := new(ServerEntry)
  875. err := decodeServerEntry(encodedServerEntry, timestamp, serverEntrySource, serverEntry)
  876. if err != nil {
  877. return nil, errors.Trace(err)
  878. }
  879. // NOTE: if the source JSON happens to have values in these fields, they get clobbered.
  880. serverEntry.LocalSource = serverEntrySource
  881. serverEntry.LocalTimestamp = timestamp
  882. return serverEntry, nil
  883. }
  884. // DecodeServerEntryFields extracts an encoded server entry into a
  885. // ServerEntryFields type, much like DecodeServerEntry. Unrecognized fields
  886. // not in ServerEntry are retained in the ServerEntryFields.
  887. //
  888. // LocalSource/LocalTimestamp map entries are set only when the corresponding
  889. // inputs are non-blank.
  890. func DecodeServerEntryFields(
  891. encodedServerEntry, timestamp, serverEntrySource string) (ServerEntryFields, error) {
  892. serverEntryFields := make(ServerEntryFields)
  893. err := decodeServerEntry(encodedServerEntry, timestamp, serverEntrySource, &serverEntryFields)
  894. if err != nil {
  895. return nil, errors.Trace(err)
  896. }
  897. // NOTE: if the source JSON happens to have values in these fields, they get clobbered.
  898. if serverEntrySource != "" {
  899. serverEntryFields.SetLocalSource(serverEntrySource)
  900. }
  901. if timestamp != "" {
  902. serverEntryFields.SetLocalTimestamp(timestamp)
  903. }
  904. return serverEntryFields, nil
  905. }
  906. func decodeServerEntry(
  907. encodedServerEntry, timestamp, serverEntrySource string,
  908. target interface{}) error {
  909. hexDecodedServerEntry, err := hex.DecodeString(encodedServerEntry)
  910. if err != nil {
  911. return errors.Trace(err)
  912. }
  913. // Skip past legacy format (4 space delimited fields) and just parse the JSON config
  914. fields := bytes.SplitN(hexDecodedServerEntry, []byte(" "), 5)
  915. if len(fields) != 5 {
  916. return errors.TraceNew("invalid encoded server entry")
  917. }
  918. err = json.Unmarshal(fields[4], target)
  919. if err != nil {
  920. return errors.Trace(err)
  921. }
  922. return nil
  923. }
  924. // ValidateServerEntryFields checks for malformed server entries.
  925. func ValidateServerEntryFields(serverEntryFields ServerEntryFields) error {
  926. // Checks for a valid ipAddress. This is important since the IP
  927. // address is the key used to store/lookup the server entry.
  928. ipAddress := serverEntryFields.GetIPAddress()
  929. if net.ParseIP(ipAddress) == nil {
  930. return errors.Tracef("server entry has invalid ipAddress: %s", ipAddress)
  931. }
  932. // TODO: validate more fields?
  933. // Ensure locally initialized fields have been set.
  934. source := serverEntryFields.GetLocalSource()
  935. if !common.Contains(
  936. SupportedServerEntrySources, source) {
  937. return errors.Tracef("server entry has invalid source: %s", source)
  938. }
  939. timestamp := serverEntryFields.GetLocalTimestamp()
  940. _, err := time.Parse(time.RFC3339, timestamp)
  941. if err != nil {
  942. return errors.Tracef("server entry has invalid timestamp: %s", err)
  943. }
  944. return nil
  945. }
  946. // DecodeServerEntryList extracts server entries from the list encoding
  947. // used by remote server lists and Psiphon server handshake requests.
  948. // Each server entry is validated and invalid entries are skipped.
  949. // See DecodeServerEntry for note on serverEntrySource/timestamp.
  950. func DecodeServerEntryList(
  951. encodedServerEntryList, timestamp,
  952. serverEntrySource string) ([]ServerEntryFields, error) {
  953. serverEntries := make([]ServerEntryFields, 0)
  954. for _, encodedServerEntry := range strings.Split(encodedServerEntryList, "\n") {
  955. if len(encodedServerEntry) == 0 {
  956. continue
  957. }
  958. // TODO: skip this entry and continue if can't decode?
  959. serverEntryFields, err := DecodeServerEntryFields(encodedServerEntry, timestamp, serverEntrySource)
  960. if err != nil {
  961. return nil, errors.Trace(err)
  962. }
  963. if ValidateServerEntryFields(serverEntryFields) != nil {
  964. // Skip this entry and continue with the next one
  965. // TODO: invoke a logging callback
  966. continue
  967. }
  968. serverEntries = append(serverEntries, serverEntryFields)
  969. }
  970. return serverEntries, nil
  971. }
  972. // StreamingServerEntryDecoder performs the DecodeServerEntryList
  973. // operation, loading only one server entry into memory at a time.
  974. type StreamingServerEntryDecoder struct {
  975. scanner *bufio.Scanner
  976. timestamp string
  977. serverEntrySource string
  978. }
  979. // NewStreamingServerEntryDecoder creates a new StreamingServerEntryDecoder.
  980. func NewStreamingServerEntryDecoder(
  981. encodedServerEntryListReader io.Reader,
  982. timestamp, serverEntrySource string) *StreamingServerEntryDecoder {
  983. return &StreamingServerEntryDecoder{
  984. scanner: bufio.NewScanner(encodedServerEntryListReader),
  985. timestamp: timestamp,
  986. serverEntrySource: serverEntrySource,
  987. }
  988. }
  989. // Next reads and decodes, and validates the next server entry from the
  990. // input stream, returning a nil server entry when the stream is complete.
  991. //
  992. // Limitations:
  993. // - Each encoded server entry line cannot exceed bufio.MaxScanTokenSize,
  994. // the default buffer size which this decoder uses. This is 64K.
  995. // - DecodeServerEntry is called on each encoded server entry line, which
  996. // will allocate memory to hex decode and JSON deserialze the server
  997. // entry. As this is not presently reusing a fixed buffer, each call
  998. // will allocate additional memory; garbage collection is necessary to
  999. // reclaim that memory for reuse for the next server entry.
  1000. func (decoder *StreamingServerEntryDecoder) Next() (ServerEntryFields, error) {
  1001. for {
  1002. if !decoder.scanner.Scan() {
  1003. return nil, errors.Trace(decoder.scanner.Err())
  1004. }
  1005. // TODO: use scanner.Bytes which doesn't allocate, instead of scanner.Text
  1006. // TODO: skip this entry and continue if can't decode?
  1007. serverEntryFields, err := DecodeServerEntryFields(
  1008. decoder.scanner.Text(), decoder.timestamp, decoder.serverEntrySource)
  1009. if err != nil {
  1010. return nil, errors.Trace(err)
  1011. }
  1012. if ValidateServerEntryFields(serverEntryFields) != nil {
  1013. // Skip this entry and continue with the next one
  1014. // TODO: invoke a logging callback
  1015. continue
  1016. }
  1017. return serverEntryFields, nil
  1018. }
  1019. }