WebSocket.js 26 KB


  1. 'use strict';
  2. /*!
  3. * ws: a node.js websocket client
  4. * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  5. * MIT Licensed
  6. */
  7. var url = require('url')
  8. , util = require('util')
  9. , http = require('http')
  10. , https = require('https')
  11. , crypto = require('crypto')
  12. , stream = require('stream')
  13. , Ultron = require('ultron')
  14. , Options = require('options')
  15. , Sender = require('./Sender')
  16. , Receiver = require('./Receiver')
  17. , SenderHixie = require('./Sender.hixie')
  18. , ReceiverHixie = require('./Receiver.hixie')
  19. , Extensions = require('./Extensions')
  20. , PerMessageDeflate = require('./PerMessageDeflate')
  21. , EventEmitter = require('events').EventEmitter;
  22. /**
  23. * Constants
  24. */
  25. // Default protocol version
  26. var protocolVersion = 13;
  27. // Close timeout
  28. var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly
  29. /**
  30. * WebSocket implementation
  31. *
  32. * @constructor
  33. * @param {String} address Connection address.
  34. * @param {String|Array} protocols WebSocket protocols.
  35. * @param {Object} options Additional connection options.
  36. * @api public
  37. */
  38. function WebSocket(address, protocols, options) {
  39. if (this instanceof WebSocket === false) {
  40. return new WebSocket(address, protocols, options);
  41. }
  42. EventEmitter.call(this);
  43. if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
  44. // accept the "options" Object as the 2nd argument
  45. options = protocols;
  46. protocols = null;
  47. }
  48. if ('string' === typeof protocols) {
  49. protocols = [ protocols ];
  50. }
  51. if (!Array.isArray(protocols)) {
  52. protocols = [];
  53. }
  54. this._socket = null;
  55. this._ultron = null;
  56. this._closeReceived = false;
  57. this.bytesReceived = 0;
  58. this.readyState = null;
  59. this.supports = {};
  60. this.extensions = {};
  61. if (Array.isArray(address)) {
  62. initAsServerClient.apply(this, address.concat(options));
  63. } else {
  64. initAsClient.apply(this, [address, protocols, options]);
  65. }
  66. }
  67. /**
  68. * Inherits from EventEmitter.
  69. */
  70. util.inherits(WebSocket, EventEmitter);
  71. /**
  72. * Ready States
  73. */
  74. ["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) {
  75. WebSocket.prototype[state] = WebSocket[state] = index;
  76. });
  77. /**
  78. * Gracefully closes the connection, after sending a description message to the server
  79. *
  80. * @param {Object} data to be sent to the server
  81. * @api public
  82. */
  83. WebSocket.prototype.close = function close(code, data) {
  84. if (this.readyState === WebSocket.CLOSED) return;
  85. if (this.readyState === WebSocket.CONNECTING) {
  86. this.readyState = WebSocket.CLOSED;
  87. return;
  88. }
  89. if (this.readyState === WebSocket.CLOSING) {
  90. if (this._closeReceived && this._isServer) {
  91. this.terminate();
  92. }
  93. return;
  94. }
  95. var self = this;
  96. try {
  97. this.readyState = WebSocket.CLOSING;
  98. this._closeCode = code;
  99. this._closeMessage = data;
  100. var mask = !this._isServer;
  101. this._sender.close(code, data, mask, function(err) {
  102. if (err) self.emit('error', err);
  103. if (self._closeReceived && self._isServer) {
  104. self.terminate();
  105. } else {
  106. // ensure that the connection is cleaned up even when no response of closing handshake.
  107. clearTimeout(self._closeTimer);
  108. self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout);
  109. }
  110. });
  111. } catch (e) {
  112. this.emit('error', e);
  113. }
  114. };
  115. /**
  116. * Pause the client stream
  117. *
  118. * @api public
  119. */
  120. WebSocket.prototype.pause = function pauser() {
  121. if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
  122. return this._socket.pause();
  123. };
  124. /**
  125. * Sends a ping
  126. *
  127. * @param {Object} data to be sent to the server
  128. * @param {Object} Members - mask: boolean, binary: boolean
  129. * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
  130. * @api public
  131. */
  132. WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) {
  133. if (this.readyState !== WebSocket.OPEN) {
  134. if (dontFailWhenClosed === true) return;
  135. throw new Error('not opened');
  136. }
  137. options = options || {};
  138. if (typeof options.mask === 'undefined') options.mask = !this._isServer;
  139. this._sender.ping(data, options);
  140. };
  141. /**
  142. * Sends a pong
  143. *
  144. * @param {Object} data to be sent to the server
  145. * @param {Object} Members - mask: boolean, binary: boolean
  146. * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
  147. * @api public
  148. */
  149. WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) {
  150. if (this.readyState !== WebSocket.OPEN) {
  151. if (dontFailWhenClosed === true) return;
  152. throw new Error('not opened');
  153. }
  154. options = options || {};
  155. if (typeof options.mask === 'undefined') options.mask = !this._isServer;
  156. this._sender.pong(data, options);
  157. };
  158. /**
  159. * Resume the client stream
  160. *
  161. * @api public
  162. */
  163. WebSocket.prototype.resume = function resume() {
  164. if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
  165. return this._socket.resume();
  166. };
  167. /**
  168. * Sends a piece of data
  169. *
  170. * @param {Object} data to be sent to the server
  171. * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
  172. * @param {function} Optional callback which is executed after the send completes
  173. * @api public
  174. */
  175. WebSocket.prototype.send = function send(data, options, cb) {
  176. if (typeof options === 'function') {
  177. cb = options;
  178. options = {};
  179. }
  180. if (this.readyState !== WebSocket.OPEN) {
  181. if (typeof cb === 'function') cb(new Error('not opened'));
  182. else throw new Error('not opened');
  183. return;
  184. }
  185. if (!data) data = '';
  186. if (this._queue) {
  187. var self = this;
  188. this._queue.push(function() { self.send(data, options, cb); });
  189. return;
  190. }
  191. options = options || {};
  192. options.fin = true;
  193. if (typeof options.binary === 'undefined') {
  194. options.binary = (data instanceof ArrayBuffer || data instanceof Buffer ||
  195. data instanceof Uint8Array ||
  196. data instanceof Uint16Array ||
  197. data instanceof Uint32Array ||
  198. data instanceof Int8Array ||
  199. data instanceof Int16Array ||
  200. data instanceof Int32Array ||
  201. data instanceof Float32Array ||
  202. data instanceof Float64Array);
  203. }
  204. if (typeof options.mask === 'undefined') options.mask = !this._isServer;
  205. if (typeof options.compress === 'undefined') options.compress = true;
  206. if (!this.extensions[PerMessageDeflate.extensionName]) {
  207. options.compress = false;
  208. }
  209. var readable = typeof stream.Readable === 'function'
  210. ? stream.Readable
  211. : stream.Stream;
  212. if (data instanceof readable) {
  213. startQueue(this);
  214. var self = this;
  215. sendStream(this, data, options, function send(error) {
  216. process.nextTick(function tock() {
  217. executeQueueSends(self);
  218. });
  219. if (typeof cb === 'function') cb(error);
  220. });
  221. } else {
  222. this._sender.send(data, options, cb);
  223. }
  224. };
  225. /**
  226. * Streams data through calls to a user supplied function
  227. *
  228. * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
  229. * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'.
  230. * @api public
  231. */
  232. WebSocket.prototype.stream = function stream(options, cb) {
  233. if (typeof options === 'function') {
  234. cb = options;
  235. options = {};
  236. }
  237. var self = this;
  238. if (typeof cb !== 'function') throw new Error('callback must be provided');
  239. if (this.readyState !== WebSocket.OPEN) {
  240. if (typeof cb === 'function') cb(new Error('not opened'));
  241. else throw new Error('not opened');
  242. return;
  243. }
  244. if (this._queue) {
  245. this._queue.push(function () { self.stream(options, cb); });
  246. return;
  247. }
  248. options = options || {};
  249. if (typeof options.mask === 'undefined') options.mask = !this._isServer;
  250. if (typeof options.compress === 'undefined') options.compress = true;
  251. if (!this.extensions[PerMessageDeflate.extensionName]) {
  252. options.compress = false;
  253. }
  254. startQueue(this);
  255. function send(data, final) {
  256. try {
  257. if (self.readyState !== WebSocket.OPEN) throw new Error('not opened');
  258. options.fin = final === true;
  259. self._sender.send(data, options);
  260. if (!final) process.nextTick(cb.bind(null, null, send));
  261. else executeQueueSends(self);
  262. } catch (e) {
  263. if (typeof cb === 'function') cb(e);
  264. else {
  265. delete self._queue;
  266. self.emit('error', e);
  267. }
  268. }
  269. }
  270. process.nextTick(cb.bind(null, null, send));
  271. };
  272. /**
  273. * Immediately shuts down the connection
  274. *
  275. * @api public
  276. */
  277. WebSocket.prototype.terminate = function terminate() {
  278. if (this.readyState === WebSocket.CLOSED) return;
  279. if (this._socket) {
  280. this.readyState = WebSocket.CLOSING;
  281. // End the connection
  282. try { this._socket.end(); }
  283. catch (e) {
  284. // Socket error during end() call, so just destroy it right now
  285. cleanupWebsocketResources.call(this, true);
  286. return;
  287. }
  288. // Add a timeout to ensure that the connection is completely
  289. // cleaned up within 30 seconds, even if the clean close procedure
  290. // fails for whatever reason
  291. // First cleanup any pre-existing timeout from an earlier "terminate" call,
  292. // if one exists. Otherwise terminate calls in quick succession will leak timeouts
  293. // and hold the program open for `closeTimout` time.
  294. if (this._closeTimer) { clearTimeout(this._closeTimer); }
  295. this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout);
  296. } else if (this.readyState === WebSocket.CONNECTING) {
  297. cleanupWebsocketResources.call(this, true);
  298. }
  299. };
  300. /**
  301. * Expose bufferedAmount
  302. *
  303. * @api public
  304. */
  305. Object.defineProperty(WebSocket.prototype, 'bufferedAmount', {
  306. get: function get() {
  307. var amount = 0;
  308. if (this._socket) {
  309. amount = this._socket.bufferSize || 0;
  310. }
  311. return amount;
  312. }
  313. });
  314. /**
  315. * Emulates the W3C Browser based WebSocket interface using function members.
  316. *
  317. * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
  318. * @api public
  319. */
  320. ['open', 'error', 'close', 'message'].forEach(function(method) {
  321. Object.defineProperty(WebSocket.prototype, 'on' + method, {
  322. /**
  323. * Returns the current listener
  324. *
  325. * @returns {Mixed} the set function or undefined
  326. * @api public
  327. */
  328. get: function get() {
  329. var listener = this.listeners(method)[0];
  330. return listener ? (listener._listener ? listener._listener : listener) : undefined;
  331. },
  332. /**
  333. * Start listening for events
  334. *
  335. * @param {Function} listener the listener
  336. * @returns {Mixed} the set function or undefined
  337. * @api public
  338. */
  339. set: function set(listener) {
  340. this.removeAllListeners(method);
  341. this.addEventListener(method, listener);
  342. }
  343. });
  344. });
  345. /**
  346. * Emulates the W3C Browser based WebSocket interface using addEventListener.
  347. *
  348. * @see https://developer.mozilla.org/en/DOM/element.addEventListener
  349. * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
  350. * @api public
  351. */
  352. WebSocket.prototype.addEventListener = function(method, listener) {
  353. var target = this;
  354. function onMessage (data, flags) {
  355. listener.call(target, new MessageEvent(data, !!flags.binary, target));
  356. }
  357. function onClose (code, message) {
  358. listener.call(target, new CloseEvent(code, message, target));
  359. }
  360. function onError (event) {
  361. event.type = 'error';
  362. event.target = target;
  363. listener.call(target, event);
  364. }
  365. function onOpen () {
  366. listener.call(target, new OpenEvent(target));
  367. }
  368. if (typeof listener === 'function') {
  369. if (method === 'message') {
  370. // store a reference so we can return the original function from the
  371. // addEventListener hook
  372. onMessage._listener = listener;
  373. this.on(method, onMessage);
  374. } else if (method === 'close') {
  375. // store a reference so we can return the original function from the
  376. // addEventListener hook
  377. onClose._listener = listener;
  378. this.on(method, onClose);
  379. } else if (method === 'error') {
  380. // store a reference so we can return the original function from the
  381. // addEventListener hook
  382. onError._listener = listener;
  383. this.on(method, onError);
  384. } else if (method === 'open') {
  385. // store a reference so we can return the original function from the
  386. // addEventListener hook
  387. onOpen._listener = listener;
  388. this.on(method, onOpen);
  389. } else {
  390. this.on(method, listener);
  391. }
  392. }
  393. };
  394. module.exports = WebSocket;
  395. module.exports.buildHostHeader = buildHostHeader
  396. /**
  397. * W3C MessageEvent
  398. *
  399. * @see http://www.w3.org/TR/html5/comms.html
  400. * @constructor
  401. * @api private
  402. */
  403. function MessageEvent(dataArg, isBinary, target) {
  404. this.type = 'message';
  405. this.data = dataArg;
  406. this.target = target;
  407. this.binary = isBinary; // non-standard.
  408. }
  409. /**
  410. * W3C CloseEvent
  411. *
  412. * @see http://www.w3.org/TR/html5/comms.html
  413. * @constructor
  414. * @api private
  415. */
  416. function CloseEvent(code, reason, target) {
  417. this.type = 'close';
  418. this.wasClean = (typeof code === 'undefined' || code === 1000);
  419. this.code = code;
  420. this.reason = reason;
  421. this.target = target;
  422. }
  423. /**
  424. * W3C OpenEvent
  425. *
  426. * @see http://www.w3.org/TR/html5/comms.html
  427. * @constructor
  428. * @api private
  429. */
  430. function OpenEvent(target) {
  431. this.type = 'open';
  432. this.target = target;
  433. }
  434. // Append port number to Host header, only if specified in the url
  435. // and non-default
  436. function buildHostHeader(isSecure, hostname, port) {
  437. var headerHost = hostname;
  438. if (hostname) {
  439. if ((isSecure && (port != 443)) || (!isSecure && (port != 80))){
  440. headerHost = headerHost + ':' + port;
  441. }
  442. }
  443. return headerHost;
  444. }
  445. /**
  446. * Entirely private apis,
  447. * which may or may not be bound to a sepcific WebSocket instance.
  448. */
  449. function initAsServerClient(req, socket, upgradeHead, options) {
  450. options = new Options({
  451. protocolVersion: protocolVersion,
  452. protocol: null,
  453. extensions: {}
  454. }).merge(options);
  455. // expose state properties
  456. this.protocol = options.value.protocol;
  457. this.protocolVersion = options.value.protocolVersion;
  458. this.extensions = options.value.extensions;
  459. this.supports.binary = (this.protocolVersion !== 'hixie-76');
  460. this.upgradeReq = req;
  461. this.readyState = WebSocket.CONNECTING;
  462. this._isServer = true;
  463. // establish connection
  464. if (options.value.protocolVersion === 'hixie-76') {
  465. establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead);
  466. } else {
  467. establishConnection.call(this, Receiver, Sender, socket, upgradeHead);
  468. }
  469. }
  470. function initAsClient(address, protocols, options) {
  471. options = new Options({
  472. origin: null,
  473. protocolVersion: protocolVersion,
  474. host: null,
  475. headers: null,
  476. protocol: protocols.join(','),
  477. agent: null,
  478. // ssl-related options
  479. pfx: null,
  480. key: null,
  481. passphrase: null,
  482. cert: null,
  483. ca: null,
  484. ciphers: null,
  485. rejectUnauthorized: null,
  486. perMessageDeflate: true,
  487. localAddress: null
  488. }).merge(options);
  489. if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) {
  490. throw new Error('unsupported protocol version');
  491. }
  492. // verify URL and establish http class
  493. var serverUrl = url.parse(address);
  494. var isUnixSocket = serverUrl.protocol === 'ws+unix:';
  495. if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url');
  496. var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:';
  497. var httpObj = isSecure ? https : http;
  498. var port = serverUrl.port || (isSecure ? 443 : 80);
  499. var auth = serverUrl.auth;
  500. // prepare extensions
  501. var extensionsOffer = {};
  502. var perMessageDeflate;
  503. if (options.value.perMessageDeflate) {
  504. perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false);
  505. extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer();
  506. }
  507. // expose state properties
  508. this._isServer = false;
  509. this.url = address;
  510. this.protocolVersion = options.value.protocolVersion;
  511. this.supports.binary = (this.protocolVersion !== 'hixie-76');
  512. // begin handshake
  513. var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64');
  514. var shasum = crypto.createHash('sha1');
  515. shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
  516. var expectedServerKey = shasum.digest('base64');
  517. var agent = options.value.agent;
  518. var headerHost = buildHostHeader(isSecure, serverUrl.hostname, port)
  519. var requestOptions = {
  520. port: port,
  521. host: serverUrl.hostname,
  522. headers: {
  523. 'Connection': 'Upgrade',
  524. 'Upgrade': 'websocket',
  525. 'Host': headerHost,
  526. 'Sec-WebSocket-Version': options.value.protocolVersion,
  527. 'Sec-WebSocket-Key': key
  528. }
  529. };
  530. // If we have basic auth.
  531. if (auth) {
  532. requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64');
  533. }
  534. if (options.value.protocol) {
  535. requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol;
  536. }
  537. if (options.value.host) {
  538. requestOptions.headers.Host = options.value.host;
  539. }
  540. if (options.value.headers) {
  541. for (var header in options.value.headers) {
  542. if (options.value.headers.hasOwnProperty(header)) {
  543. requestOptions.headers[header] = options.value.headers[header];
  544. }
  545. }
  546. }
  547. if (Object.keys(extensionsOffer).length) {
  548. requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer);
  549. }
  550. if (options.isDefinedAndNonNull('pfx')
  551. || options.isDefinedAndNonNull('key')
  552. || options.isDefinedAndNonNull('passphrase')
  553. || options.isDefinedAndNonNull('cert')
  554. || options.isDefinedAndNonNull('ca')
  555. || options.isDefinedAndNonNull('ciphers')
  556. || options.isDefinedAndNonNull('rejectUnauthorized')) {
  557. if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx;
  558. if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key;
  559. if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase;
  560. if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert;
  561. if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca;
  562. if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers;
  563. if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized;
  564. if (!agent) {
  565. // global agent ignores client side certificates
  566. agent = new httpObj.Agent(requestOptions);
  567. }
  568. }
  569. requestOptions.path = serverUrl.path || '/';
  570. if (agent) {
  571. requestOptions.agent = agent;
  572. }
  573. if (isUnixSocket) {
  574. requestOptions.socketPath = serverUrl.pathname;
  575. }
  576. if (options.value.localAddress) {
  577. requestOptions.localAddress = options.value.localAddress;
  578. }
  579. if (options.value.origin) {
  580. if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin;
  581. else requestOptions.headers.Origin = options.value.origin;
  582. }
  583. var self = this;
  584. var req = httpObj.request(requestOptions);
  585. req.on('error', function onerror(error) {
  586. self.emit('error', error);
  587. cleanupWebsocketResources.call(self, error);
  588. });
  589. req.once('response', function response(res) {
  590. var error;
  591. if (!self.emit('unexpected-response', req, res)) {
  592. error = new Error('unexpected server response (' + res.statusCode + ')');
  593. req.abort();
  594. self.emit('error', error);
  595. }
  596. cleanupWebsocketResources.call(self, error);
  597. });
  598. req.once('upgrade', function upgrade(res, socket, upgradeHead) {
  599. if (self.readyState === WebSocket.CLOSED) {
  600. // client closed before server accepted connection
  601. self.emit('close');
  602. self.removeAllListeners();
  603. socket.end();
  604. return;
  605. }
  606. var serverKey = res.headers['sec-websocket-accept'];
  607. if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) {
  608. self.emit('error', 'invalid server key');
  609. self.removeAllListeners();
  610. socket.end();
  611. return;
  612. }
  613. var serverProt = res.headers['sec-websocket-protocol'];
  614. var protList = (options.value.protocol || "").split(/, */);
  615. var protError = null;
  616. if (!options.value.protocol && serverProt) {
  617. protError = 'server sent a subprotocol even though none requested';
  618. } else if (options.value.protocol && !serverProt) {
  619. protError = 'server sent no subprotocol even though requested';
  620. } else if (serverProt && protList.indexOf(serverProt) === -1) {
  621. protError = 'server responded with an invalid protocol';
  622. }
  623. if (protError) {
  624. self.emit('error', protError);
  625. self.removeAllListeners();
  626. socket.end();
  627. return;
  628. } else if (serverProt) {
  629. self.protocol = serverProt;
  630. }
  631. var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']);
  632. if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) {
  633. try {
  634. perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]);
  635. } catch (err) {
  636. self.emit('error', 'invalid extension parameter');
  637. self.removeAllListeners();
  638. socket.end();
  639. return;
  640. }
  641. self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
  642. }
  643. establishConnection.call(self, Receiver, Sender, socket, upgradeHead);
  644. // perform cleanup on http resources
  645. req.removeAllListeners();
  646. req = null;
  647. agent = null;
  648. });
  649. req.end();
  650. this.readyState = WebSocket.CONNECTING;
  651. }
  652. function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) {
  653. var ultron = this._ultron = new Ultron(socket)
  654. , called = false
  655. , self = this;
  656. socket.setTimeout(0);
  657. socket.setNoDelay(true);
  658. this._receiver = new ReceiverClass(this.extensions);
  659. this._socket = socket;
  660. // socket cleanup handlers
  661. ultron.on('end', cleanupWebsocketResources.bind(this));
  662. ultron.on('close', cleanupWebsocketResources.bind(this));
  663. ultron.on('error', cleanupWebsocketResources.bind(this));
  664. // ensure that the upgradeHead is added to the receiver
  665. function firstHandler(data) {
  666. if (called || self.readyState === WebSocket.CLOSED) return;
  667. called = true;
  668. socket.removeListener('data', firstHandler);
  669. ultron.on('data', realHandler);
  670. if (upgradeHead && upgradeHead.length > 0) {
  671. realHandler(upgradeHead);
  672. upgradeHead = null;
  673. }
  674. if (data) realHandler(data);
  675. }
  676. // subsequent packets are pushed straight to the receiver
  677. function realHandler(data) {
  678. self.bytesReceived += data.length;
  679. self._receiver.add(data);
  680. }
  681. ultron.on('data', firstHandler);
  682. // if data was passed along with the http upgrade,
  683. // this will schedule a push of that on to the receiver.
  684. // this has to be done on next tick, since the caller
  685. // hasn't had a chance to set event handlers on this client
  686. // object yet.
  687. process.nextTick(firstHandler);
  688. // receiver event handlers
  689. self._receiver.ontext = function ontext(data, flags) {
  690. flags = flags || {};
  691. self.emit('message', data, flags);
  692. };
  693. self._receiver.onbinary = function onbinary(data, flags) {
  694. flags = flags || {};
  695. flags.binary = true;
  696. self.emit('message', data, flags);
  697. };
  698. self._receiver.onping = function onping(data, flags) {
  699. flags = flags || {};
  700. self.pong(data, {
  701. mask: !self._isServer,
  702. binary: flags.binary === true
  703. }, true);
  704. self.emit('ping', data, flags);
  705. };
  706. self._receiver.onpong = function onpong(data, flags) {
  707. self.emit('pong', data, flags || {});
  708. };
  709. self._receiver.onclose = function onclose(code, data, flags) {
  710. flags = flags || {};
  711. self._closeReceived = true;
  712. self.close(code, data);
  713. };
  714. self._receiver.onerror = function onerror(reason, errorCode) {
  715. // close the connection when the receiver reports a HyBi error code
  716. self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, '');
  717. self.emit('error', reason, errorCode);
  718. };
  719. // finalize the client
  720. this._sender = new SenderClass(socket, this.extensions);
  721. this._sender.on('error', function onerror(error) {
  722. self.close(1002, '');
  723. self.emit('error', error);
  724. });
  725. this.readyState = WebSocket.OPEN;
  726. this.emit('open');
  727. }
  728. function startQueue(instance) {
  729. instance._queue = instance._queue || [];
  730. }
  731. function executeQueueSends(instance) {
  732. var queue = instance._queue;
  733. if (typeof queue === 'undefined') return;
  734. delete instance._queue;
  735. for (var i = 0, l = queue.length; i < l; ++i) {
  736. queue[i]();
  737. }
  738. }
  739. function sendStream(instance, stream, options, cb) {
  740. stream.on('data', function incoming(data) {
  741. if (instance.readyState !== WebSocket.OPEN) {
  742. if (typeof cb === 'function') cb(new Error('not opened'));
  743. else {
  744. delete instance._queue;
  745. instance.emit('error', new Error('not opened'));
  746. }
  747. return;
  748. }
  749. options.fin = false;
  750. instance._sender.send(data, options);
  751. });
  752. stream.on('end', function end() {
  753. if (instance.readyState !== WebSocket.OPEN) {
  754. if (typeof cb === 'function') cb(new Error('not opened'));
  755. else {
  756. delete instance._queue;
  757. instance.emit('error', new Error('not opened'));
  758. }
  759. return;
  760. }
  761. options.fin = true;
  762. instance._sender.send(null, options);
  763. if (typeof cb === 'function') cb(null);
  764. });
  765. }
  766. function cleanupWebsocketResources(error) {
  767. if (this.readyState === WebSocket.CLOSED) return;
  768. var emitClose = this.readyState !== WebSocket.CONNECTING;
  769. this.readyState = WebSocket.CLOSED;
  770. clearTimeout(this._closeTimer);
  771. this._closeTimer = null;
  772. if (emitClose) {
  773. // If the connection was closed abnormally (with an error), or if
  774. // the close control frame was not received then the close code
  775. // must default to 1006.
  776. if (error || !this._closeReceived) {
  777. this._closeCode = 1006;
  778. }
  779. this.emit('close', this._closeCode || 1000, this._closeMessage || '');
  780. }
  781. if (this._socket) {
  782. if (this._ultron) this._ultron.destroy();
  783. this._socket.on('error', function onerror() {
  784. try { this.destroy(); }
  785. catch (e) {}
  786. });
  787. try {
  788. if (!error) this._socket.end();
  789. else this._socket.destroy();
  790. } catch (e) { /* Ignore termination errors */ }
  791. this._socket = null;
  792. this._ultron = null;
  793. }
  794. if (this._sender) {
  795. this._sender.removeAllListeners();
  796. this._sender = null;
  797. }
  798. if (this._receiver) {
  799. this._receiver.cleanup();
  800. this._receiver = null;
  801. }
  802. if (this.extensions[PerMessageDeflate.extensionName]) {
  803. this.extensions[PerMessageDeflate.extensionName].cleanup();
  804. }
  805. this.extensions = null;
  806. this.removeAllListeners();
  807. this.on('error', function onerror() {}); // catch all errors after this
  808. delete this._queue;
  809. }