Receiver.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. /*!
  2. * ws: a node.js websocket client
  3. * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  4. * MIT Licensed
  5. */
  6. var util = require('util')
  7. , Validation = require('./Validation').Validation
  8. , ErrorCodes = require('./ErrorCodes')
  9. , BufferPool = require('./BufferPool')
  10. , bufferUtil = require('./BufferUtil').BufferUtil
  11. , PerMessageDeflate = require('./PerMessageDeflate');
  12. /**
  13. * HyBi Receiver implementation
  14. */
  15. function Receiver (extensions) {
  16. if (this instanceof Receiver === false) {
  17. throw new TypeError("Classes can't be function-called");
  18. }
  19. // memory pool for fragmented messages
  20. var fragmentedPoolPrevUsed = -1;
  21. this.fragmentedBufferPool = new BufferPool(1024, function(db, length) {
  22. return db.used + length;
  23. }, function(db) {
  24. return fragmentedPoolPrevUsed = fragmentedPoolPrevUsed >= 0 ?
  25. Math.ceil((fragmentedPoolPrevUsed + db.used) / 2) :
  26. db.used;
  27. });
  28. // memory pool for unfragmented messages
  29. var unfragmentedPoolPrevUsed = -1;
  30. this.unfragmentedBufferPool = new BufferPool(1024, function(db, length) {
  31. return db.used + length;
  32. }, function(db) {
  33. return unfragmentedPoolPrevUsed = unfragmentedPoolPrevUsed >= 0 ?
  34. Math.ceil((unfragmentedPoolPrevUsed + db.used) / 2) :
  35. db.used;
  36. });
  37. this.extensions = extensions || {};
  38. this.state = {
  39. activeFragmentedOperation: null,
  40. lastFragment: false,
  41. masked: false,
  42. opcode: 0,
  43. fragmentedOperation: false
  44. };
  45. this.overflow = [];
  46. this.headerBuffer = new Buffer(10);
  47. this.expectOffset = 0;
  48. this.expectBuffer = null;
  49. this.expectHandler = null;
  50. this.currentMessage = [];
  51. this.messageHandlers = [];
  52. this.expectHeader(2, this.processPacket);
  53. this.dead = false;
  54. this.processing = false;
  55. this.onerror = function() {};
  56. this.ontext = function() {};
  57. this.onbinary = function() {};
  58. this.onclose = function() {};
  59. this.onping = function() {};
  60. this.onpong = function() {};
  61. }
  62. module.exports = Receiver;
  63. /**
  64. * Add new data to the parser.
  65. *
  66. * @api public
  67. */
  68. Receiver.prototype.add = function(data) {
  69. var dataLength = data.length;
  70. if (dataLength == 0) return;
  71. if (this.expectBuffer == null) {
  72. this.overflow.push(data);
  73. return;
  74. }
  75. var toRead = Math.min(dataLength, this.expectBuffer.length - this.expectOffset);
  76. fastCopy(toRead, data, this.expectBuffer, this.expectOffset);
  77. this.expectOffset += toRead;
  78. if (toRead < dataLength) {
  79. this.overflow.push(data.slice(toRead));
  80. }
  81. while (this.expectBuffer && this.expectOffset == this.expectBuffer.length) {
  82. var bufferForHandler = this.expectBuffer;
  83. this.expectBuffer = null;
  84. this.expectOffset = 0;
  85. this.expectHandler.call(this, bufferForHandler);
  86. }
  87. };
  88. /**
  89. * Releases all resources used by the receiver.
  90. *
  91. * @api public
  92. */
  93. Receiver.prototype.cleanup = function() {
  94. this.dead = true;
  95. this.overflow = null;
  96. this.headerBuffer = null;
  97. this.expectBuffer = null;
  98. this.expectHandler = null;
  99. this.unfragmentedBufferPool = null;
  100. this.fragmentedBufferPool = null;
  101. this.state = null;
  102. this.currentMessage = null;
  103. this.onerror = null;
  104. this.ontext = null;
  105. this.onbinary = null;
  106. this.onclose = null;
  107. this.onping = null;
  108. this.onpong = null;
  109. };
  110. /**
  111. * Waits for a certain amount of header bytes to be available, then fires a callback.
  112. *
  113. * @api private
  114. */
  115. Receiver.prototype.expectHeader = function(length, handler) {
  116. if (length == 0) {
  117. handler(null);
  118. return;
  119. }
  120. this.expectBuffer = this.headerBuffer.slice(this.expectOffset, this.expectOffset + length);
  121. this.expectHandler = handler;
  122. var toRead = length;
  123. while (toRead > 0 && this.overflow.length > 0) {
  124. var fromOverflow = this.overflow.pop();
  125. if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
  126. var read = Math.min(fromOverflow.length, toRead);
  127. fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
  128. this.expectOffset += read;
  129. toRead -= read;
  130. }
  131. };
  132. /**
  133. * Waits for a certain amount of data bytes to be available, then fires a callback.
  134. *
  135. * @api private
  136. */
  137. Receiver.prototype.expectData = function(length, handler) {
  138. if (length == 0) {
  139. handler(null);
  140. return;
  141. }
  142. this.expectBuffer = this.allocateFromPool(length, this.state.fragmentedOperation);
  143. this.expectHandler = handler;
  144. var toRead = length;
  145. while (toRead > 0 && this.overflow.length > 0) {
  146. var fromOverflow = this.overflow.pop();
  147. if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
  148. var read = Math.min(fromOverflow.length, toRead);
  149. fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
  150. this.expectOffset += read;
  151. toRead -= read;
  152. }
  153. };
  154. /**
  155. * Allocates memory from the buffer pool.
  156. *
  157. * @api private
  158. */
  159. Receiver.prototype.allocateFromPool = function(length, isFragmented) {
  160. return (isFragmented ? this.fragmentedBufferPool : this.unfragmentedBufferPool).get(length);
  161. };
  162. /**
  163. * Start processing a new packet.
  164. *
  165. * @api private
  166. */
  167. Receiver.prototype.processPacket = function (data) {
  168. if (this.extensions[PerMessageDeflate.extensionName]) {
  169. if ((data[0] & 0x30) != 0) {
  170. this.error('reserved fields (2, 3) must be empty', 1002);
  171. return;
  172. }
  173. } else {
  174. if ((data[0] & 0x70) != 0) {
  175. this.error('reserved fields must be empty', 1002);
  176. return;
  177. }
  178. }
  179. this.state.lastFragment = (data[0] & 0x80) == 0x80;
  180. this.state.masked = (data[1] & 0x80) == 0x80;
  181. var compressed = (data[0] & 0x40) == 0x40;
  182. var opcode = data[0] & 0xf;
  183. if (opcode === 0) {
  184. if (compressed) {
  185. this.error('continuation frame cannot have the Per-message Compressed bits', 1002);
  186. return;
  187. }
  188. // continuation frame
  189. this.state.fragmentedOperation = true;
  190. this.state.opcode = this.state.activeFragmentedOperation;
  191. if (!(this.state.opcode == 1 || this.state.opcode == 2)) {
  192. this.error('continuation frame cannot follow current opcode', 1002);
  193. return;
  194. }
  195. }
  196. else {
  197. if (opcode < 3 && this.state.activeFragmentedOperation != null) {
  198. this.error('data frames after the initial data frame must have opcode 0', 1002);
  199. return;
  200. }
  201. if (opcode >= 8 && compressed) {
  202. this.error('control frames cannot have the Per-message Compressed bits', 1002);
  203. return;
  204. }
  205. this.state.compressed = compressed;
  206. this.state.opcode = opcode;
  207. if (this.state.lastFragment === false) {
  208. this.state.fragmentedOperation = true;
  209. this.state.activeFragmentedOperation = opcode;
  210. }
  211. else this.state.fragmentedOperation = false;
  212. }
  213. var handler = opcodes[this.state.opcode];
  214. if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode, 1002);
  215. else {
  216. handler.start.call(this, data);
  217. }
  218. };
  219. /**
  220. * Endprocessing a packet.
  221. *
  222. * @api private
  223. */
  224. Receiver.prototype.endPacket = function() {
  225. if (!this.state.fragmentedOperation) this.unfragmentedBufferPool.reset(true);
  226. else if (this.state.lastFragment) this.fragmentedBufferPool.reset(true);
  227. this.expectOffset = 0;
  228. this.expectBuffer = null;
  229. this.expectHandler = null;
  230. if (this.state.lastFragment && this.state.opcode === this.state.activeFragmentedOperation) {
  231. // end current fragmented operation
  232. this.state.activeFragmentedOperation = null;
  233. }
  234. this.state.lastFragment = false;
  235. this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0;
  236. this.state.masked = false;
  237. this.expectHeader(2, this.processPacket);
  238. };
  239. /**
  240. * Reset the parser state.
  241. *
  242. * @api private
  243. */
  244. Receiver.prototype.reset = function() {
  245. if (this.dead) return;
  246. this.state = {
  247. activeFragmentedOperation: null,
  248. lastFragment: false,
  249. masked: false,
  250. opcode: 0,
  251. fragmentedOperation: false
  252. };
  253. this.fragmentedBufferPool.reset(true);
  254. this.unfragmentedBufferPool.reset(true);
  255. this.expectOffset = 0;
  256. this.expectBuffer = null;
  257. this.expectHandler = null;
  258. this.overflow = [];
  259. this.currentMessage = [];
  260. this.messageHandlers = [];
  261. };
  262. /**
  263. * Unmask received data.
  264. *
  265. * @api private
  266. */
  267. Receiver.prototype.unmask = function (mask, buf, binary) {
  268. if (mask != null && buf != null) bufferUtil.unmask(buf, mask);
  269. if (binary) return buf;
  270. return buf != null ? buf.toString('utf8') : '';
  271. };
  272. /**
  273. * Concatenates a list of buffers.
  274. *
  275. * @api private
  276. */
  277. Receiver.prototype.concatBuffers = function(buffers) {
  278. var length = 0;
  279. for (var i = 0, l = buffers.length; i < l; ++i) length += buffers[i].length;
  280. var mergedBuffer = new Buffer(length);
  281. bufferUtil.merge(mergedBuffer, buffers);
  282. return mergedBuffer;
  283. };
  284. /**
  285. * Handles an error
  286. *
  287. * @api private
  288. */
  289. Receiver.prototype.error = function (reason, protocolErrorCode) {
  290. this.reset();
  291. this.onerror(reason, protocolErrorCode);
  292. return this;
  293. };
  294. /**
  295. * Execute message handler buffers
  296. *
  297. * @api private
  298. */
  299. Receiver.prototype.flush = function() {
  300. if (this.processing || this.dead) return;
  301. var handler = this.messageHandlers.shift();
  302. if (!handler) return;
  303. this.processing = true;
  304. var self = this;
  305. handler(function() {
  306. self.processing = false;
  307. self.flush();
  308. });
  309. };
  310. /**
  311. * Apply extensions to message
  312. *
  313. * @api private
  314. */
  315. Receiver.prototype.applyExtensions = function(messageBuffer, fin, compressed, callback) {
  316. var self = this;
  317. if (compressed) {
  318. this.extensions[PerMessageDeflate.extensionName].decompress(messageBuffer, fin, function(err, buffer) {
  319. if (self.dead) return;
  320. if (err) {
  321. callback(new Error('invalid compressed data'));
  322. return;
  323. }
  324. callback(null, buffer);
  325. });
  326. } else {
  327. callback(null, messageBuffer);
  328. }
  329. };
  330. /**
  331. * Buffer utilities
  332. */
  333. function readUInt16BE(start) {
  334. return (this[start]<<8) +
  335. this[start+1];
  336. }
  337. function readUInt32BE(start) {
  338. return (this[start]<<24) +
  339. (this[start+1]<<16) +
  340. (this[start+2]<<8) +
  341. this[start+3];
  342. }
  343. function fastCopy(length, srcBuffer, dstBuffer, dstOffset) {
  344. switch (length) {
  345. default: srcBuffer.copy(dstBuffer, dstOffset, 0, length); break;
  346. case 16: dstBuffer[dstOffset+15] = srcBuffer[15];
  347. case 15: dstBuffer[dstOffset+14] = srcBuffer[14];
  348. case 14: dstBuffer[dstOffset+13] = srcBuffer[13];
  349. case 13: dstBuffer[dstOffset+12] = srcBuffer[12];
  350. case 12: dstBuffer[dstOffset+11] = srcBuffer[11];
  351. case 11: dstBuffer[dstOffset+10] = srcBuffer[10];
  352. case 10: dstBuffer[dstOffset+9] = srcBuffer[9];
  353. case 9: dstBuffer[dstOffset+8] = srcBuffer[8];
  354. case 8: dstBuffer[dstOffset+7] = srcBuffer[7];
  355. case 7: dstBuffer[dstOffset+6] = srcBuffer[6];
  356. case 6: dstBuffer[dstOffset+5] = srcBuffer[5];
  357. case 5: dstBuffer[dstOffset+4] = srcBuffer[4];
  358. case 4: dstBuffer[dstOffset+3] = srcBuffer[3];
  359. case 3: dstBuffer[dstOffset+2] = srcBuffer[2];
  360. case 2: dstBuffer[dstOffset+1] = srcBuffer[1];
  361. case 1: dstBuffer[dstOffset] = srcBuffer[0];
  362. }
  363. }
  364. function clone(obj) {
  365. var cloned = {};
  366. for (var k in obj) {
  367. if (obj.hasOwnProperty(k)) {
  368. cloned[k] = obj[k];
  369. }
  370. }
  371. return cloned;
  372. }
  373. /**
  374. * Opcode handlers
  375. */
  376. var opcodes = {
  377. // text
  378. '1': {
  379. start: function(data) {
  380. var self = this;
  381. // decode length
  382. var firstLength = data[1] & 0x7f;
  383. if (firstLength < 126) {
  384. opcodes['1'].getData.call(self, firstLength);
  385. }
  386. else if (firstLength == 126) {
  387. self.expectHeader(2, function(data) {
  388. opcodes['1'].getData.call(self, readUInt16BE.call(data, 0));
  389. });
  390. }
  391. else if (firstLength == 127) {
  392. self.expectHeader(8, function(data) {
  393. if (readUInt32BE.call(data, 0) != 0) {
  394. self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
  395. return;
  396. }
  397. opcodes['1'].getData.call(self, readUInt32BE.call(data, 4));
  398. });
  399. }
  400. },
  401. getData: function(length) {
  402. var self = this;
  403. if (self.state.masked) {
  404. self.expectHeader(4, function(data) {
  405. var mask = data;
  406. self.expectData(length, function(data) {
  407. opcodes['1'].finish.call(self, mask, data);
  408. });
  409. });
  410. }
  411. else {
  412. self.expectData(length, function(data) {
  413. opcodes['1'].finish.call(self, null, data);
  414. });
  415. }
  416. },
  417. finish: function(mask, data) {
  418. var self = this;
  419. var packet = this.unmask(mask, data, true) || new Buffer(0);
  420. var state = clone(this.state);
  421. this.messageHandlers.push(function(callback) {
  422. self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) {
  423. if (err) return self.error(err.message, 1007);
  424. if (buffer != null) self.currentMessage.push(buffer);
  425. if (state.lastFragment) {
  426. var messageBuffer = self.concatBuffers(self.currentMessage);
  427. self.currentMessage = [];
  428. if (!Validation.isValidUTF8(messageBuffer)) {
  429. self.error('invalid utf8 sequence', 1007);
  430. return;
  431. }
  432. self.ontext(messageBuffer.toString('utf8'), {masked: state.masked, buffer: messageBuffer});
  433. }
  434. callback();
  435. });
  436. });
  437. this.flush();
  438. this.endPacket();
  439. }
  440. },
  441. // binary
  442. '2': {
  443. start: function(data) {
  444. var self = this;
  445. // decode length
  446. var firstLength = data[1] & 0x7f;
  447. if (firstLength < 126) {
  448. opcodes['2'].getData.call(self, firstLength);
  449. }
  450. else if (firstLength == 126) {
  451. self.expectHeader(2, function(data) {
  452. opcodes['2'].getData.call(self, readUInt16BE.call(data, 0));
  453. });
  454. }
  455. else if (firstLength == 127) {
  456. self.expectHeader(8, function(data) {
  457. if (readUInt32BE.call(data, 0) != 0) {
  458. self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
  459. return;
  460. }
  461. opcodes['2'].getData.call(self, readUInt32BE.call(data, 4, true));
  462. });
  463. }
  464. },
  465. getData: function(length) {
  466. var self = this;
  467. if (self.state.masked) {
  468. self.expectHeader(4, function(data) {
  469. var mask = data;
  470. self.expectData(length, function(data) {
  471. opcodes['2'].finish.call(self, mask, data);
  472. });
  473. });
  474. }
  475. else {
  476. self.expectData(length, function(data) {
  477. opcodes['2'].finish.call(self, null, data);
  478. });
  479. }
  480. },
  481. finish: function(mask, data) {
  482. var self = this;
  483. var packet = this.unmask(mask, data, true) || new Buffer(0);
  484. var state = clone(this.state);
  485. this.messageHandlers.push(function(callback) {
  486. self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) {
  487. if (err) return self.error(err.message, 1007);
  488. if (buffer != null) self.currentMessage.push(buffer);
  489. if (state.lastFragment) {
  490. var messageBuffer = self.concatBuffers(self.currentMessage);
  491. self.currentMessage = [];
  492. self.onbinary(messageBuffer, {masked: state.masked, buffer: messageBuffer});
  493. }
  494. callback();
  495. });
  496. });
  497. this.flush();
  498. this.endPacket();
  499. }
  500. },
  501. // close
  502. '8': {
  503. start: function(data) {
  504. var self = this;
  505. if (self.state.lastFragment == false) {
  506. self.error('fragmented close is not supported', 1002);
  507. return;
  508. }
  509. // decode length
  510. var firstLength = data[1] & 0x7f;
  511. if (firstLength < 126) {
  512. opcodes['8'].getData.call(self, firstLength);
  513. }
  514. else {
  515. self.error('control frames cannot have more than 125 bytes of data', 1002);
  516. }
  517. },
  518. getData: function(length) {
  519. var self = this;
  520. if (self.state.masked) {
  521. self.expectHeader(4, function(data) {
  522. var mask = data;
  523. self.expectData(length, function(data) {
  524. opcodes['8'].finish.call(self, mask, data);
  525. });
  526. });
  527. }
  528. else {
  529. self.expectData(length, function(data) {
  530. opcodes['8'].finish.call(self, null, data);
  531. });
  532. }
  533. },
  534. finish: function(mask, data) {
  535. var self = this;
  536. data = self.unmask(mask, data, true);
  537. var state = clone(this.state);
  538. this.messageHandlers.push(function() {
  539. if (data && data.length == 1) {
  540. self.error('close packets with data must be at least two bytes long', 1002);
  541. return;
  542. }
  543. var code = data && data.length > 1 ? readUInt16BE.call(data, 0) : 1000;
  544. if (!ErrorCodes.isValidErrorCode(code)) {
  545. self.error('invalid error code', 1002);
  546. return;
  547. }
  548. var message = '';
  549. if (data && data.length > 2) {
  550. var messageBuffer = data.slice(2);
  551. if (!Validation.isValidUTF8(messageBuffer)) {
  552. self.error('invalid utf8 sequence', 1007);
  553. return;
  554. }
  555. message = messageBuffer.toString('utf8');
  556. }
  557. self.onclose(code, message, {masked: state.masked});
  558. self.reset();
  559. });
  560. this.flush();
  561. },
  562. },
  563. // ping
  564. '9': {
  565. start: function(data) {
  566. var self = this;
  567. if (self.state.lastFragment == false) {
  568. self.error('fragmented ping is not supported', 1002);
  569. return;
  570. }
  571. // decode length
  572. var firstLength = data[1] & 0x7f;
  573. if (firstLength < 126) {
  574. opcodes['9'].getData.call(self, firstLength);
  575. }
  576. else {
  577. self.error('control frames cannot have more than 125 bytes of data', 1002);
  578. }
  579. },
  580. getData: function(length) {
  581. var self = this;
  582. if (self.state.masked) {
  583. self.expectHeader(4, function(data) {
  584. var mask = data;
  585. self.expectData(length, function(data) {
  586. opcodes['9'].finish.call(self, mask, data);
  587. });
  588. });
  589. }
  590. else {
  591. self.expectData(length, function(data) {
  592. opcodes['9'].finish.call(self, null, data);
  593. });
  594. }
  595. },
  596. finish: function(mask, data) {
  597. var self = this;
  598. data = this.unmask(mask, data, true);
  599. var state = clone(this.state);
  600. this.messageHandlers.push(function(callback) {
  601. self.onping(data, {masked: state.masked, binary: true});
  602. callback();
  603. });
  604. this.flush();
  605. this.endPacket();
  606. }
  607. },
  608. // pong
  609. '10': {
  610. start: function(data) {
  611. var self = this;
  612. if (self.state.lastFragment == false) {
  613. self.error('fragmented pong is not supported', 1002);
  614. return;
  615. }
  616. // decode length
  617. var firstLength = data[1] & 0x7f;
  618. if (firstLength < 126) {
  619. opcodes['10'].getData.call(self, firstLength);
  620. }
  621. else {
  622. self.error('control frames cannot have more than 125 bytes of data', 1002);
  623. }
  624. },
  625. getData: function(length) {
  626. var self = this;
  627. if (this.state.masked) {
  628. this.expectHeader(4, function(data) {
  629. var mask = data;
  630. self.expectData(length, function(data) {
  631. opcodes['10'].finish.call(self, mask, data);
  632. });
  633. });
  634. }
  635. else {
  636. this.expectData(length, function(data) {
  637. opcodes['10'].finish.call(self, null, data);
  638. });
  639. }
  640. },
  641. finish: function(mask, data) {
  642. var self = this;
  643. data = self.unmask(mask, data, true);
  644. var state = clone(this.state);
  645. this.messageHandlers.push(function(callback) {
  646. self.onpong(data, {masked: state.masked, binary: true});
  647. callback();
  648. });
  649. this.flush();
  650. this.endPacket();
  651. }
  652. }
  653. }