net.js

  1. /**
  2. * TCP Networking APIs
  3. *
  4. * The TCP Networking APIs provide an asynchronous network API for creating
  5. * stream-based TCP servers and clients.
  6. *
  7. * @see {@link https://nodejs.org/dist/latest-v18.x/docs/api/net.html}
  8. *
  9. * @module Net
  10. */
  11. import dns from 'dns';
  12. import assert from 'assert';
  13. import { EventEmitter } from 'events';
  14. const binding = process.binding('net');
  15. function parseOptionsArgs(args) {
  16. // Use options overloading.
  17. if (typeof args[0] === 'object') {
  18. return [args[0]?.port, args[0]?.host];
  19. }
  20. return args;
  21. }
  22. function toUint8Array(data, encoding) {
  23. if (!(data instanceof Uint8Array)) {
  24. return new TextEncoder(encoding).encode(data);
  25. }
  26. return data;
  27. }
  28. function makeDeferredPromise() {
  29. // Extract the resolve method from the promise.
  30. const promiseExt = {};
  31. const promise = new Promise((resolve, reject) => {
  32. promiseExt.resolve = resolve;
  33. promiseExt.reject = reject;
  34. });
  35. return { promise, promiseExt };
  36. }
  37. const TIMEOUT_MAX = Math.pow(2, 31) - 1;
  38. // Error type referring to socket connection timeout.
  39. export class TimeoutError extends Error {
  40. constructor(message) {
  41. super();
  42. this.name = 'TimeoutError';
  43. this.message = message;
  44. }
  45. }
  46. // Utility function that wraps a promise with a timeout.
  47. function timeout(promise, time = 0) {
  48. // When the time is 0ms it means that we don't want to
  49. // have a timeout for the provided promise.
  50. if (time === 0) return promise;
  51. const timer = {};
  52. return Promise.race([
  53. promise,
  54. new Promise(
  55. (_, reject) => (timer.id = setTimeout(reject, time, new TimeoutError()))
  56. ),
  57. ]).finally(() => clearTimeout(timer.id));
  58. }
  59. /**
  60. * Utility function that wraps a `repeatable` callback with a timeout.
  61. *
  62. * The following implementation does not create a new timer every time an
  63. * I/O activity happens on the socket. The logic is a bit more complex
  64. * but it is more performant on high I/O scenarios.
  65. *
  66. * http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
  67. * @ignore
  68. */
  69. function callbackTimeout(callback, time = 0, onTimeout) {
  70. // The reason of the event-emitter is to allow the "outside" world
  71. // to signal us for changes in the timeout value.
  72. const timer = { time, lastActivity: Date.now() };
  73. const timerSignal = new EventEmitter();
  74. const timeout = () => {
  75. // Calculate when the timeout would happen.
  76. const after = timer.lastActivity - Date.now() + timer.time;
  77. if (timer.time === 0) return;
  78. // Timeout occurred, take action.
  79. if (after < 0) {
  80. onTimeout();
  81. return (timer.id = setTimeout(timeout, timer.time));
  82. }
  83. // There was some recent activity, simply restart the timer.
  84. timer.id = setTimeout(timeout, after);
  85. };
  86. timerSignal.on('timeoutUpdate', (timeMs) => {
  87. if (timer.id) clearTimeout(timer.id);
  88. timer.time = timeMs;
  89. timeout();
  90. });
  91. if (time > 0) timer.id = setTimeout(timeout, time);
  92. return [
  93. (...args) => {
  94. callback(...args);
  95. timer.lastActivity = Date.now();
  96. },
  97. timerSignal,
  98. ];
  99. }
  100. /**
  101. * Initiates a connection to a given remote host.
  102. *
  103. * Additional signatures:
  104. * - createConnection(port: number | string, host?: string)
  105. *
  106. * @param {Object} options - Configuration options for the connection.
  107. * @param {string} options.host - The hostname or IP address of the remote server to connect to.
  108. * @param {(string|number)} options.port - The port number on the remote host to connect to.
  109. * @returns {Socket} An instance of the `Socket` class
  110. */
  111. export function createConnection(...args) {
  112. const socket = new Socket();
  113. socket.connect(...args);
  114. return socket;
  115. }
  116. /**
  117. * Aliases to `net.createConnection()`.
  118. *
  119. * Additional signatures:
  120. * - connect(port: number | string, host?: string)
  121. */
  122. export const connect = createConnection;
  123. /**
  124. * Creates a new TCP server.
  125. *
  126. * @param {Function} [onConnection] - A function that is called whenever a connection is made to the server.
  127. * @returns {Server} An instance of the `Server` class.
  128. */
  129. export function createServer(onConnection) {
  130. // Instantiate a new TCP server.
  131. const server = new Server();
  132. if (onConnection) {
  133. assert.isFunction(onConnection);
  134. server.on('connection', onConnection);
  135. }
  136. return server;
  137. }
  138. /**
  139. * Information about the connected TCP socket.
  140. *
  141. * @typedef socketInfo
  142. * @property {SocketHost} host - Information about the local endpoint of the socket.
  143. * @property {SocketRemote} remote - Information about the remote endpoint of the socket.
  144. */
  145. /**
  146. * Information about the host TCP socket.
  147. *
  148. * @typedef SocketHost
  149. * @property {number} port - The port number on the local machine.
  150. * @property {string} family - The IP family of the local address (`IPv4` or `IPv6`).
  151. * @property {string} address - The local IP address.
  152. */
  153. /**
  154. * Information about the remote TCP socket.
  155. *
  156. * @typedef SocketRemote
  157. * @property {number} port - The port number on the remote machine.
  158. * @property {string} address - The remote IP address.
  159. */
  160. const kSetSocketIdUnchecked = Symbol('kSetSocketIdUnchecked');
  161. const kAsyncGenerator = Symbol('kAsyncGenerator');
  162. /**
  163. * A Socket object is a JS wrapper around a low-level TCP socket.
  164. *
  165. * @fires connect - Emitted when a socket connection is successfully established.
  166. * @fires data - Emitted when data is received.
  167. * @fires end - Emitted when the other end of the socket sends a FIN packet.
  168. * @fires error - Emitted when an error occurs.
  169. * @fires close - Emitted once the socket is fully closed.
  170. * @fires timeout - Emitted if the socket times out from (read) inactivity.
  171. */
  172. export class Socket extends EventEmitter {
  173. #id;
  174. #host;
  175. #connecting;
  176. #encoding;
  177. #writable;
  178. #pushQueue;
  179. #pullQueue;
  180. #timeoutHandle;
  181. /**
  182. * Creates a new Socket instance.
  183. *
  184. * @returns {Socket}
  185. */
  186. constructor() {
  187. super();
  188. this.#pushQueue = [];
  189. this.#pullQueue = [];
  190. this.#connecting = false;
  191. this.#timeoutHandle = undefined;
  192. this.bytesRead = 0;
  193. this.bytesWritten = 0;
  194. this.remotePort = undefined;
  195. this.remoteAddress = undefined;
  196. this.timeout = 0;
  197. }
  198. /**
  199. * Initiates a connection on a given remote host.
  200. *
  201. * Additional signatures:
  202. * - connect(port: number | string, host?: string)
  203. *
  204. * @param {Object} options - Configuration options for the connection.
  205. * @param {string} options.host - The hostname or IP address of the remote server to connect to.
  206. * @param {(string|number)} options.port - The port number on the remote host to connect to.
  207. * @returns {Promise<socketInfo>} Information about the connected TCP socket.
  208. */
  209. async connect(...args) {
  210. // Parse arguments.
  211. const [port, hostUnchecked] = parseOptionsArgs(args);
  212. const hostname = hostUnchecked || '0.0.0.0';
  213. if (this.#connecting) {
  214. throw new Error('Socket is trying to connect.');
  215. }
  216. if (Number.isNaN(Number.parseInt(port))) {
  217. throw new TypeError(`The "port" option must be castable to number.`);
  218. }
  219. if (hostname && typeof hostname !== 'string') {
  220. throw new TypeError(`The "host" option must be of type string.`);
  221. }
  222. if (this.#id) {
  223. throw new Error(
  224. `Socket is already connected to <${this.remoteAddress}:${this.remotePort}>.`
  225. );
  226. }
  227. this.#connecting = true;
  228. // Use DNS lookup to resolve the hostname.
  229. const addresses = await dns.lookup(hostname);
  230. // Prefer IPv4 address.
  231. const remoteHost = addresses.some((addr) => addr.family === 'IPv4')
  232. ? addresses.filter((addr) => addr.family === 'IPv4')[0].address
  233. : addresses[0].address;
  234. const { id, host, remote } = await binding.connect(
  235. remoteHost,
  236. Number.parseInt(port)
  237. );
  238. this.#id = id;
  239. this.#connecting = false;
  240. this.#writable = true;
  241. this.#host = host;
  242. this.remoteAddress = remote.address;
  243. this.remotePort = remote.port;
  244. const [onAvailableSocketData, signal] = callbackTimeout(
  245. this.#onAvailableSocketData.bind(this),
  246. this.timeout,
  247. () => this.emit('timeout')
  248. );
  249. this.#timeoutHandle = signal;
  250. this.emit('connect', { host, remote });
  251. binding.readStart(this.#id, onAvailableSocketData);
  252. return { host, remote };
  253. }
  254. /**
  255. * Sets the encoding for the current socket.
  256. *
  257. * @param {String} [encoding] - The character encoding to use.
  258. */
  259. setEncoding(encoding = 'utf-8') {
  260. // Check the parameter type.
  261. if (typeof encoding !== 'string') {
  262. throw new TypeError('The "encoding" argument must be of type string.');
  263. }
  264. this.#encoding = encoding;
  265. }
  266. /**
  267. * Sets the socket to timeout after timeout milliseconds of (read) inactivity on the socket.
  268. *
  269. * @param {Number} timeout - The duration after which the socket should timeout due to inactivity.
  270. */
  271. setTimeout(timeout = 0) {
  272. // Coalesce to number or NaN.
  273. timeout *= 1;
  274. // Check timeout's boundaries.
  275. if (!(timeout >= 0 && timeout <= TIMEOUT_MAX)) {
  276. timeout = 0;
  277. }
  278. if (this.#id) {
  279. // Timeout value changed after the socket began waiting.
  280. this.#timeoutHandle.emit('timeoutUpdate', timeout);
  281. }
  282. this.timeout = timeout;
  283. }
  284. /**
  285. * Returns a promise which is fulfilled when the TCP stream can return a chunk.
  286. *
  287. * @returns {Promise<(Uint8Array|string)>} The chunk read from the socket.
  288. */
  289. read() {
  290. // Check if the socket is connected to a host.
  291. if (!this.#id) return null;
  292. // HACK: The following is used to handle uncaught errors thrown
  293. // from the event-emitter when no one is subscribed to the `error` event.
  294. if (this.listenerCount('error') === 0) this.on('error', () => {});
  295. // No available value to read yet.
  296. if (this.#pushQueue.length === 0) {
  297. const { promise, promiseExt } = makeDeferredPromise();
  298. this.#pullQueue.push(promiseExt);
  299. return timeout(promise, this.timeout);
  300. }
  301. const value = this.#pushQueue.shift();
  302. const action = value instanceof Error ? Promise.reject : Promise.resolve;
  303. return action.call(Promise, value);
  304. }
  305. /**
  306. * Writes contents to a TCP socket stream.
  307. *
  308. * @param {String|Uint8Array} data - The data to be written to the socket.
  309. * @param {String} [encoding] - The character encoding to use.
  310. * @returns {Promise<Number>} The number of bytes written.
  311. */
  312. async write(data, encoding = 'utf-8') {
  313. // Check the data argument type.
  314. if (!(data instanceof Uint8Array) && typeof data !== 'string') {
  315. throw new TypeError(
  316. `The "data" argument must be of type string or Uint8Array.`
  317. );
  318. }
  319. if (!this.#id) {
  320. throw new Error(`Socket is not connected to a remote host.`);
  321. }
  322. if (!this.#writable) {
  323. throw new Error(`The socket stream is not writable.`);
  324. }
  325. // Default to UTF-8 encoding.
  326. encoding = encoding || this.#encoding || 'utf-8';
  327. const bytes = toUint8Array(data, encoding);
  328. const bytesWritten = await binding.write(this.#id, bytes);
  329. this.bytesWritten += bytesWritten;
  330. return bytesWritten;
  331. }
  332. /**
  333. * Half-closes the TCP stream.
  334. *
  335. * @param {String|Uint8Array} data - The (last) data to be written to the socket.
  336. * @param {String} [encoding] - The character encoding to use.
  337. * @returns {Promise<void>}
  338. */
  339. async end(data, encoding = 'utf-8') {
  340. // Check socket connection.
  341. if (!this.#id) return;
  342. // If data is given, write to stream.
  343. if (data) {
  344. await this.write(data, encoding);
  345. }
  346. this.#writable = false;
  347. await binding.shutdown(this.#id);
  348. }
  349. /**
  350. * Closes both sides of the TCP sockets.
  351. */
  352. async destroy() {
  353. // Check if the socket is indeed connected.
  354. if (!this.#id) return;
  355. this.#timeoutHandle?.emit('timeoutUpdate', 0);
  356. await binding.close(this.#id);
  357. // Ignore pending reads.
  358. for (const promise of this.#pullQueue) {
  359. promise.resolve(null);
  360. }
  361. this.#reset();
  362. this.emit('close');
  363. }
  364. /**
  365. * Returns the bound address, the address family name and port of the socket.
  366. *
  367. * @returns {SocketHost} - Information about the local endpoint of the socket.
  368. */
  369. address() {
  370. return this.#host;
  371. }
  372. /**
  373. * Resets socket's internal state (not to be called manually).
  374. * @ignore
  375. */
  376. #reset() {
  377. this.#id = undefined;
  378. this.#pushQueue = [];
  379. this.#pullQueue = [];
  380. this.#connecting = false;
  381. this.#timeoutHandle = undefined;
  382. this.bytesRead = 0;
  383. this.bytesWritten = 0;
  384. this.remotePort = undefined;
  385. this.remoteAddress = undefined;
  386. this.timeout = 0;
  387. }
  388. #asyncDispatch(value) {
  389. if (this.#pullQueue.length === 0) {
  390. this.#pushQueue.push(value);
  391. return;
  392. }
  393. const promise = this.#pullQueue.shift();
  394. const action = value instanceof Error ? promise.reject : promise.resolve;
  395. action(value);
  396. }
  397. #onAvailableSocketData(err, arrayBufferView) {
  398. // Check for errors during socket read.
  399. if (err) {
  400. this.#asyncDispatch(err);
  401. this.emit('error', err);
  402. return;
  403. }
  404. // Check if the remote host closed the connection.
  405. if (arrayBufferView.byteLength === 0) {
  406. this.#asyncDispatch(null);
  407. this.emit('end');
  408. this.destroy();
  409. return;
  410. }
  411. this.bytesRead += arrayBufferView.byteLength;
  412. // Transform ArrayBuffer into a Uint8Array we can use.
  413. const data = new Uint8Array(arrayBufferView);
  414. const data_transform = this.#encoding
  415. ? new TextDecoder(this.#encoding).decode(new Uint8Array(data))
  416. : data;
  417. // Use the EE mode instead of the async-iterator.
  418. if (this.listenerCount('data') > 0) {
  419. this.emit('data', data_transform);
  420. return;
  421. }
  422. this.#asyncDispatch(data_transform);
  423. }
  424. /**
  425. * Hard-sets the ID of the socket (ONLY for internal use).
  426. *
  427. * @param {Number} id - The resource ID existing in the event-loop.
  428. * @ignore
  429. */
  430. [kSetSocketIdUnchecked](id) {
  431. this.#id = id;
  432. this.#writable = true;
  433. const [onAvailableSocketData, signal] = callbackTimeout(
  434. this.#onAvailableSocketData.bind(this),
  435. this.timeout,
  436. () => this.emit('timeout')
  437. );
  438. this.#timeoutHandle = signal;
  439. binding.readStart(this.#id, onAvailableSocketData);
  440. }
  441. async *[kAsyncGenerator](signal) {
  442. // Close socket on stream pipeline errors.
  443. if (signal) signal.on('uncaughtStreamException', () => this.destroy());
  444. let data;
  445. while ((data = await this.read())) {
  446. if (!data) break;
  447. yield data;
  448. }
  449. }
  450. /**
  451. * The socket should be async iterable.
  452. * @ignore
  453. */
  454. [Symbol.asyncIterator](signal) {
  455. const iterator = { return: () => this.end() };
  456. return Object.assign(this[kAsyncGenerator](signal), iterator);
  457. }
  458. }
  459. /**
  460. * A Server object is a wrapper around a TCP listener.
  461. *
  462. * @fires listening - Emitted when the server has been bound.
  463. * @fires connection - Emitted when a new connection is made.
  464. * @fires close - Emitted when the server stops accepting new connections.
  465. * @fires error - Emitted when an error occurs.
  466. */
  467. export class Server extends EventEmitter {
  468. #id;
  469. #host;
  470. #pushQueue;
  471. #pullQueue;
  472. /**
  473. * Creates a new Server instance.
  474. *
  475. * @returns {Server} An instance of the TCP `Server` class.
  476. */
  477. constructor() {
  478. super();
  479. this.#pushQueue = [];
  480. this.#pullQueue = [];
  481. }
  482. /**
  483. * Starts listening for incoming connections.
  484. *
  485. * @param {(string|number)} port - The port number or string on which the server should listen.
  486. * @param {string} host - The hostname or IP address on which the server will listen.
  487. * @returns {Promise<SocketHost>} The host information where the server is listening.
  488. */
  489. async listen(...args) {
  490. // Parse arguments.
  491. const [port, hostUnchecked] = parseOptionsArgs(args);
  492. const hostname = hostUnchecked || '127.0.0.1';
  493. if (Number.isNaN(Number.parseInt(port))) {
  494. throw new TypeError(`The "port" option must be castable to number.`);
  495. }
  496. if (hostname && typeof hostname !== 'string') {
  497. throw new TypeError(`The "host" option must be of type string.`);
  498. }
  499. if (this.#id) {
  500. throw new Error(`Server is already listening for connections.`);
  501. }
  502. // Use DNS lookup to resolve the local listening interface.
  503. const addresses = await dns.lookup(hostname);
  504. // Prefer IPv4 address.
  505. const host = addresses.some((addr) => addr.family === 'IPv4')
  506. ? addresses.filter((addr) => addr.family === 'IPv4')[0].address
  507. : addresses[0].address;
  508. // Bind server to address, and start listening for connections.
  509. const socketInfo = binding.listen(
  510. host,
  511. port,
  512. this.#onAvailableConnection.bind(this)
  513. );
  514. this.#id = socketInfo.id;
  515. this.#host = socketInfo.host;
  516. this.emit('listening', this.#host);
  517. return this.#host;
  518. }
  519. /**
  520. * Waits for a TCP client to connect and accepts the connection.
  521. *
  522. * @returns {Promise<Socket>} - A `Socket` object representing the connected client.
  523. */
  524. accept() {
  525. // Check if the server is listening.
  526. if (!this.#id) {
  527. throw new Error(`Server is not bound to a port.`);
  528. }
  529. // HACK: The following is used to handle uncaught errors thrown
  530. // from the event-emitter when no one is subscribed to the `error` event.
  531. if (this.listenerCount('error') === 0) this.on('error', () => {});
  532. // No available connection yet.
  533. if (this.#pushQueue.length === 0) {
  534. const { promise, promiseExt } = makeDeferredPromise();
  535. this.#pullQueue.push(promiseExt);
  536. return promise;
  537. }
  538. const socket = this.#pushQueue.shift();
  539. const action = socket instanceof Error ? Promise.reject : Promise.resolve;
  540. return action.call(Promise, socket);
  541. }
  542. /**
  543. * Stops the server from accepting new connections.
  544. */
  545. async close() {
  546. // Check if the server is already closed.
  547. if (!this.#id) {
  548. throw new Error('Server is already closed.');
  549. }
  550. await binding.close(this.#id);
  551. this.emit('close');
  552. }
  553. /**
  554. * Returns the bound address, the address family name and port of the socket.
  555. *
  556. * @returns {SocketHost} The host information where the server is listening.
  557. */
  558. address() {
  559. return this.#host;
  560. }
  561. #asyncDispatch(socket) {
  562. if (this.#pullQueue.length === 0) {
  563. this.#pushQueue.push(socket);
  564. return;
  565. }
  566. const promise = this.#pullQueue.shift();
  567. const action = socket instanceof Error ? promise.reject : promise.resolve;
  568. action(socket);
  569. }
  570. #onAvailableConnection(err, sockInfo) {
  571. // Check for socket connection errors.
  572. if (err) {
  573. this.#asyncDispatch(err);
  574. this.emit('error', err);
  575. return;
  576. }
  577. // Create a new socket instance.
  578. const socket = new Socket();
  579. const { id, remoteAddress, remotePort } = sockInfo;
  580. socket[kSetSocketIdUnchecked](id);
  581. socket.remoteAddress = remoteAddress;
  582. socket.remotePort = remotePort;
  583. // Check if a connection handler is specified.
  584. if (this.listenerCount('connection') > 0) {
  585. this.emit('connection', socket);
  586. return;
  587. }
  588. this.#asyncDispatch(socket);
  589. }
  590. async *[kAsyncGenerator]() {
  591. let socket;
  592. while ((socket = await this.accept())) {
  593. yield socket;
  594. }
  595. }
  596. /**
  597. * The server should be async iterable.
  598. * @ignore
  599. */
  600. [Symbol.asyncIterator]() {
  601. const iterator = { return: () => this.close() };
  602. return Object.assign(this[kAsyncGenerator](), iterator);
  603. }
  604. }
  605. export default {
  606. TimeoutError,
  607. Socket,
  608. connect,
  609. createConnection,
  610. Server,
  611. createServer,
  612. };