manager.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /**
  2. * Module dependencies.
  3. */
  4. var eio = require('engine.io-client');
  5. var Socket = require('./socket');
  6. var Emitter = require('component-emitter');
  7. var parser = require('socket.io-parser');
  8. var on = require('./on');
  9. var bind = require('component-bind');
  10. var debug = require('debug')('socket.io-client:manager');
  11. var indexOf = require('indexof');
  12. var Backoff = require('backo2');
  13. /**
  14. * IE6+ hasOwnProperty
  15. */
  16. var has = Object.prototype.hasOwnProperty;
  17. /**
  18. * Module exports
  19. */
  20. module.exports = Manager;
  21. /**
  22. * `Manager` constructor.
  23. *
  24. * @param {String} engine instance or engine uri/opts
  25. * @param {Object} options
  26. * @api public
  27. */
  28. function Manager(uri, opts){
  29. if (!(this instanceof Manager)) return new Manager(uri, opts);
  30. if (uri && ('object' == typeof uri)) {
  31. opts = uri;
  32. uri = undefined;
  33. }
  34. opts = opts || {};
  35. opts.path = opts.path || '/socket.io';
  36. this.nsps = {};
  37. this.subs = [];
  38. this.opts = opts;
  39. this.reconnection(opts.reconnection !== false);
  40. this.reconnectionAttempts(opts.reconnectionAttempts || Infinity);
  41. this.reconnectionDelay(opts.reconnectionDelay || 1000);
  42. this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000);
  43. this.randomizationFactor(opts.randomizationFactor || 0.5);
  44. this.backoff = new Backoff({
  45. min: this.reconnectionDelay(),
  46. max: this.reconnectionDelayMax(),
  47. jitter: this.randomizationFactor()
  48. });
  49. this.timeout(null == opts.timeout ? 20000 : opts.timeout);
  50. this.readyState = 'closed';
  51. this.uri = uri;
  52. this.connecting = [];
  53. this.lastPing = null;
  54. this.encoding = false;
  55. this.packetBuffer = [];
  56. this.encoder = new parser.Encoder();
  57. this.decoder = new parser.Decoder();
  58. this.autoConnect = opts.autoConnect !== false;
  59. if (this.autoConnect) this.open();
  60. }
  61. /**
  62. * Propagate given event to sockets and emit on `this`
  63. *
  64. * @api private
  65. */
  66. Manager.prototype.emitAll = function() {
  67. this.emit.apply(this, arguments);
  68. for (var nsp in this.nsps) {
  69. if (has.call(this.nsps, nsp)) {
  70. this.nsps[nsp].emit.apply(this.nsps[nsp], arguments);
  71. }
  72. }
  73. };
  74. /**
  75. * Update `socket.id` of all sockets
  76. *
  77. * @api private
  78. */
  79. Manager.prototype.updateSocketIds = function(){
  80. for (var nsp in this.nsps) {
  81. if (has.call(this.nsps, nsp)) {
  82. this.nsps[nsp].id = this.engine.id;
  83. }
  84. }
  85. };
  86. /**
  87. * Mix in `Emitter`.
  88. */
  89. Emitter(Manager.prototype);
  90. /**
  91. * Sets the `reconnection` config.
  92. *
  93. * @param {Boolean} true/false if it should automatically reconnect
  94. * @return {Manager} self or value
  95. * @api public
  96. */
  97. Manager.prototype.reconnection = function(v){
  98. if (!arguments.length) return this._reconnection;
  99. this._reconnection = !!v;
  100. return this;
  101. };
  102. /**
  103. * Sets the reconnection attempts config.
  104. *
  105. * @param {Number} max reconnection attempts before giving up
  106. * @return {Manager} self or value
  107. * @api public
  108. */
  109. Manager.prototype.reconnectionAttempts = function(v){
  110. if (!arguments.length) return this._reconnectionAttempts;
  111. this._reconnectionAttempts = v;
  112. return this;
  113. };
  114. /**
  115. * Sets the delay between reconnections.
  116. *
  117. * @param {Number} delay
  118. * @return {Manager} self or value
  119. * @api public
  120. */
  121. Manager.prototype.reconnectionDelay = function(v){
  122. if (!arguments.length) return this._reconnectionDelay;
  123. this._reconnectionDelay = v;
  124. this.backoff && this.backoff.setMin(v);
  125. return this;
  126. };
  127. Manager.prototype.randomizationFactor = function(v){
  128. if (!arguments.length) return this._randomizationFactor;
  129. this._randomizationFactor = v;
  130. this.backoff && this.backoff.setJitter(v);
  131. return this;
  132. };
  133. /**
  134. * Sets the maximum delay between reconnections.
  135. *
  136. * @param {Number} delay
  137. * @return {Manager} self or value
  138. * @api public
  139. */
  140. Manager.prototype.reconnectionDelayMax = function(v){
  141. if (!arguments.length) return this._reconnectionDelayMax;
  142. this._reconnectionDelayMax = v;
  143. this.backoff && this.backoff.setMax(v);
  144. return this;
  145. };
  146. /**
  147. * Sets the connection timeout. `false` to disable
  148. *
  149. * @return {Manager} self or value
  150. * @api public
  151. */
  152. Manager.prototype.timeout = function(v){
  153. if (!arguments.length) return this._timeout;
  154. this._timeout = v;
  155. return this;
  156. };
  157. /**
  158. * Starts trying to reconnect if reconnection is enabled and we have not
  159. * started reconnecting yet
  160. *
  161. * @api private
  162. */
  163. Manager.prototype.maybeReconnectOnOpen = function() {
  164. // Only try to reconnect if it's the first time we're connecting
  165. if (!this.reconnecting && this._reconnection && this.backoff.attempts === 0) {
  166. // keeps reconnection from firing twice for the same reconnection loop
  167. this.reconnect();
  168. }
  169. };
  170. /**
  171. * Sets the current transport `socket`.
  172. *
  173. * @param {Function} optional, callback
  174. * @return {Manager} self
  175. * @api public
  176. */
  177. Manager.prototype.open =
  178. Manager.prototype.connect = function(fn){
  179. debug('readyState %s', this.readyState);
  180. if (~this.readyState.indexOf('open')) return this;
  181. debug('opening %s', this.uri);
  182. this.engine = eio(this.uri, this.opts);
  183. var socket = this.engine;
  184. var self = this;
  185. this.readyState = 'opening';
  186. this.skipReconnect = false;
  187. // emit `open`
  188. var openSub = on(socket, 'open', function() {
  189. self.onopen();
  190. fn && fn();
  191. });
  192. // emit `connect_error`
  193. var errorSub = on(socket, 'error', function(data){
  194. debug('connect_error');
  195. self.cleanup();
  196. self.readyState = 'closed';
  197. self.emitAll('connect_error', data);
  198. if (fn) {
  199. var err = new Error('Connection error');
  200. err.data = data;
  201. fn(err);
  202. } else {
  203. // Only do this if there is no fn to handle the error
  204. self.maybeReconnectOnOpen();
  205. }
  206. });
  207. // emit `connect_timeout`
  208. if (false !== this._timeout) {
  209. var timeout = this._timeout;
  210. debug('connect attempt will timeout after %d', timeout);
  211. // set timer
  212. var timer = setTimeout(function(){
  213. debug('connect attempt timed out after %d', timeout);
  214. openSub.destroy();
  215. socket.close();
  216. socket.emit('error', 'timeout');
  217. self.emitAll('connect_timeout', timeout);
  218. }, timeout);
  219. this.subs.push({
  220. destroy: function(){
  221. clearTimeout(timer);
  222. }
  223. });
  224. }
  225. this.subs.push(openSub);
  226. this.subs.push(errorSub);
  227. return this;
  228. };
  229. /**
  230. * Called upon transport open.
  231. *
  232. * @api private
  233. */
  234. Manager.prototype.onopen = function(){
  235. debug('open');
  236. // clear old subs
  237. this.cleanup();
  238. // mark as open
  239. this.readyState = 'open';
  240. this.emit('open');
  241. // add new subs
  242. var socket = this.engine;
  243. this.subs.push(on(socket, 'data', bind(this, 'ondata')));
  244. this.subs.push(on(socket, 'ping', bind(this, 'onping')));
  245. this.subs.push(on(socket, 'pong', bind(this, 'onpong')));
  246. this.subs.push(on(socket, 'error', bind(this, 'onerror')));
  247. this.subs.push(on(socket, 'close', bind(this, 'onclose')));
  248. this.subs.push(on(this.decoder, 'decoded', bind(this, 'ondecoded')));
  249. };
  250. /**
  251. * Called upon a ping.
  252. *
  253. * @api private
  254. */
  255. Manager.prototype.onping = function(){
  256. this.lastPing = new Date;
  257. this.emitAll('ping');
  258. };
  259. /**
  260. * Called upon a packet.
  261. *
  262. * @api private
  263. */
  264. Manager.prototype.onpong = function(){
  265. this.emitAll('pong', new Date - this.lastPing);
  266. };
  267. /**
  268. * Called with data.
  269. *
  270. * @api private
  271. */
  272. Manager.prototype.ondata = function(data){
  273. this.decoder.add(data);
  274. };
  275. /**
  276. * Called when parser fully decodes a packet.
  277. *
  278. * @api private
  279. */
  280. Manager.prototype.ondecoded = function(packet) {
  281. this.emit('packet', packet);
  282. };
  283. /**
  284. * Called upon socket error.
  285. *
  286. * @api private
  287. */
  288. Manager.prototype.onerror = function(err){
  289. debug('error', err);
  290. this.emitAll('error', err);
  291. };
  292. /**
  293. * Creates a new socket for the given `nsp`.
  294. *
  295. * @return {Socket}
  296. * @api public
  297. */
  298. Manager.prototype.socket = function(nsp){
  299. var socket = this.nsps[nsp];
  300. if (!socket) {
  301. socket = new Socket(this, nsp);
  302. this.nsps[nsp] = socket;
  303. var self = this;
  304. socket.on('connecting', onConnecting);
  305. socket.on('connect', function(){
  306. socket.id = self.engine.id;
  307. });
  308. if (this.autoConnect) {
  309. // manually call here since connecting evnet is fired before listening
  310. onConnecting();
  311. }
  312. }
  313. function onConnecting() {
  314. if (!~indexOf(self.connecting, socket)) {
  315. self.connecting.push(socket);
  316. }
  317. }
  318. return socket;
  319. };
  320. /**
  321. * Called upon a socket close.
  322. *
  323. * @param {Socket} socket
  324. */
  325. Manager.prototype.destroy = function(socket){
  326. var index = indexOf(this.connecting, socket);
  327. if (~index) this.connecting.splice(index, 1);
  328. if (this.connecting.length) return;
  329. this.close();
  330. };
  331. /**
  332. * Writes a packet.
  333. *
  334. * @param {Object} packet
  335. * @api private
  336. */
  337. Manager.prototype.packet = function(packet){
  338. debug('writing packet %j', packet);
  339. var self = this;
  340. if (!self.encoding) {
  341. // encode, then write to engine with result
  342. self.encoding = true;
  343. this.encoder.encode(packet, function(encodedPackets) {
  344. for (var i = 0; i < encodedPackets.length; i++) {
  345. self.engine.write(encodedPackets[i], packet.options);
  346. }
  347. self.encoding = false;
  348. self.processPacketQueue();
  349. });
  350. } else { // add packet to the queue
  351. self.packetBuffer.push(packet);
  352. }
  353. };
  354. /**
  355. * If packet buffer is non-empty, begins encoding the
  356. * next packet in line.
  357. *
  358. * @api private
  359. */
  360. Manager.prototype.processPacketQueue = function() {
  361. if (this.packetBuffer.length > 0 && !this.encoding) {
  362. var pack = this.packetBuffer.shift();
  363. this.packet(pack);
  364. }
  365. };
  366. /**
  367. * Clean up transport subscriptions and packet buffer.
  368. *
  369. * @api private
  370. */
  371. Manager.prototype.cleanup = function(){
  372. debug('cleanup');
  373. var sub;
  374. while (sub = this.subs.shift()) sub.destroy();
  375. this.packetBuffer = [];
  376. this.encoding = false;
  377. this.lastPing = null;
  378. this.decoder.destroy();
  379. };
  380. /**
  381. * Close the current socket.
  382. *
  383. * @api private
  384. */
  385. Manager.prototype.close =
  386. Manager.prototype.disconnect = function(){
  387. debug('disconnect');
  388. this.skipReconnect = true;
  389. this.reconnecting = false;
  390. if ('opening' == this.readyState) {
  391. // `onclose` will not fire because
  392. // an open event never happened
  393. this.cleanup();
  394. }
  395. this.backoff.reset();
  396. this.readyState = 'closed';
  397. if (this.engine) this.engine.close();
  398. };
  399. /**
  400. * Called upon engine close.
  401. *
  402. * @api private
  403. */
  404. Manager.prototype.onclose = function(reason){
  405. debug('onclose');
  406. this.cleanup();
  407. this.backoff.reset();
  408. this.readyState = 'closed';
  409. this.emit('close', reason);
  410. if (this._reconnection && !this.skipReconnect) {
  411. this.reconnect();
  412. }
  413. };
  414. /**
  415. * Attempt a reconnection.
  416. *
  417. * @api private
  418. */
  419. Manager.prototype.reconnect = function(){
  420. if (this.reconnecting || this.skipReconnect) return this;
  421. var self = this;
  422. if (this.backoff.attempts >= this._reconnectionAttempts) {
  423. debug('reconnect failed');
  424. this.backoff.reset();
  425. this.emitAll('reconnect_failed');
  426. this.reconnecting = false;
  427. } else {
  428. var delay = this.backoff.duration();
  429. debug('will wait %dms before reconnect attempt', delay);
  430. this.reconnecting = true;
  431. var timer = setTimeout(function(){
  432. if (self.skipReconnect) return;
  433. debug('attempting reconnect');
  434. self.emitAll('reconnect_attempt', self.backoff.attempts);
  435. self.emitAll('reconnecting', self.backoff.attempts);
  436. // check again for the case socket closed in above events
  437. if (self.skipReconnect) return;
  438. self.open(function(err){
  439. if (err) {
  440. debug('reconnect attempt error');
  441. self.reconnecting = false;
  442. self.reconnect();
  443. self.emitAll('reconnect_error', err.data);
  444. } else {
  445. debug('reconnect success');
  446. self.onreconnect();
  447. }
  448. });
  449. }, delay);
  450. this.subs.push({
  451. destroy: function(){
  452. clearTimeout(timer);
  453. }
  454. });
  455. }
  456. };
  457. /**
  458. * Called upon successful reconnect.
  459. *
  460. * @api private
  461. */
  462. Manager.prototype.onreconnect = function(){
  463. var attempt = this.backoff.attempts;
  464. this.reconnecting = false;
  465. this.backoff.reset();
  466. this.updateSocketIds();
  467. this.emitAll('reconnect', attempt);
  468. };