/**
* TCP Networking APIs
*
* The TCP Networking APIs provide an asynchronous network API for creating
* stream-based TCP servers and clients.
*
* @see {@link https://nodejs.org/dist/latest-v18.x/docs/api/net.html}
*
* @module Net
*/
import dns from 'dns';
import assert from 'assert';
import { EventEmitter } from 'events';
const binding = process.binding('net');
function parseOptionsArgs(args) {
// Use options overloading.
if (typeof args[0] === 'object') {
return [args[0]?.port, args[0]?.host];
}
return args;
}
function toUint8Array(data, encoding) {
if (!(data instanceof Uint8Array)) {
return new TextEncoder(encoding).encode(data);
}
return data;
}
function makeDeferredPromise() {
// Extract the resolve method from the promise.
const promiseExt = {};
const promise = new Promise((resolve, reject) => {
promiseExt.resolve = resolve;
promiseExt.reject = reject;
});
return { promise, promiseExt };
}
const TIMEOUT_MAX = Math.pow(2, 31) - 1;
// Error type referring to socket connection timeout.
export class TimeoutError extends Error {
constructor(message) {
super();
this.name = 'TimeoutError';
this.message = message;
}
}
// Utility function that wraps a promise with a timeout.
function timeout(promise, time = 0) {
// When the time is 0ms it means that we don't want to
// have a timeout for the provided promise.
if (time === 0) return promise;
const timer = {};
return Promise.race([
promise,
new Promise(
(_, reject) => (timer.id = setTimeout(reject, time, new TimeoutError()))
),
]).finally(() => clearTimeout(timer.id));
}
/**
* Utility function that wraps a `repeatable` callback with a timeout.
*
* The following implementation does not create a new timer every time an
* I/O activity happens on the socket. The logic is a bit more complex
* but it is more performant on high I/O scenarios.
*
* http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
* @ignore
*/
function callbackTimeout(callback, time = 0, onTimeout) {
// The reason of the event-emitter is to allow the "outside" world
// to signal us for changes in the timeout value.
const timer = { time, lastActivity: Date.now() };
const timerSignal = new EventEmitter();
const timeout = () => {
// Calculate when the timeout would happen.
const after = timer.lastActivity - Date.now() + timer.time;
if (timer.time === 0) return;
// Timeout occurred, take action.
if (after < 0) {
onTimeout();
return (timer.id = setTimeout(timeout, timer.time));
}
// There was some recent activity, simply restart the timer.
timer.id = setTimeout(timeout, after);
};
timerSignal.on('timeoutUpdate', (timeMs) => {
if (timer.id) clearTimeout(timer.id);
timer.time = timeMs;
timeout();
});
if (time > 0) timer.id = setTimeout(timeout, time);
return [
(...args) => {
callback(...args);
timer.lastActivity = Date.now();
},
timerSignal,
];
}
/**
* Initiates a connection to a given remote host.
*
* Additional signatures:
* - createConnection(port: number | string, host?: string)
*
* @param {Object} options - Configuration options for the connection.
* @param {string} options.host - The hostname or IP address of the remote server to connect to.
* @param {(string|number)} options.port - The port number on the remote host to connect to.
* @returns {Socket} An instance of the `Socket` class
*/
export function createConnection(...args) {
const socket = new Socket();
socket.connect(...args);
return socket;
}
/**
* Aliases to `net.createConnection()`.
*
* Additional signatures:
* - connect(port: number | string, host?: string)
*/
export const connect = createConnection;
/**
* Creates a new TCP server.
*
* @param {Function} [onConnection] - A function that is called whenever a connection is made to the server.
* @returns {Server} An instance of the `Server` class.
*/
export function createServer(onConnection) {
// Instantiate a new TCP server.
const server = new Server();
if (onConnection) {
assert.isFunction(onConnection);
server.on('connection', onConnection);
}
return server;
}
/**
* Information about the connected TCP socket.
*
* @typedef socketInfo
* @property {SocketHost} host - Information about the local endpoint of the socket.
* @property {SocketRemote} remote - Information about the remote endpoint of the socket.
*/
/**
* Information about the host TCP socket.
*
* @typedef SocketHost
* @property {number} port - The port number on the local machine.
* @property {string} family - The IP family of the local address (`IPv4` or `IPv6`).
* @property {string} address - The local IP address.
*/
/**
* Information about the remote TCP socket.
*
* @typedef SocketRemote
* @property {number} port - The port number on the remote machine.
* @property {string} address - The remote IP address.
*/
const kSetSocketIdUnchecked = Symbol('kSetSocketIdUnchecked');
const kAsyncGenerator = Symbol('kAsyncGenerator');
/**
* A Socket object is a JS wrapper around a low-level TCP socket.
*
* @fires connect - Emitted when a socket connection is successfully established.
* @fires data - Emitted when data is received.
* @fires end - Emitted when the other end of the socket sends a FIN packet.
* @fires error - Emitted when an error occurs.
* @fires close - Emitted once the socket is fully closed.
* @fires timeout - Emitted if the socket times out from (read) inactivity.
*/
export class Socket extends EventEmitter {
#id;
#host;
#connecting;
#encoding;
#writable;
#pushQueue;
#pullQueue;
#timeoutHandle;
/**
* Creates a new Socket instance.
*
* @returns {Socket}
*/
constructor() {
super();
this.#pushQueue = [];
this.#pullQueue = [];
this.#connecting = false;
this.#timeoutHandle = undefined;
this.bytesRead = 0;
this.bytesWritten = 0;
this.remotePort = undefined;
this.remoteAddress = undefined;
this.timeout = 0;
}
/**
* Initiates a connection on a given remote host.
*
* Additional signatures:
* - connect(port: number | string, host?: string)
*
* @param {Object} options - Configuration options for the connection.
* @param {string} options.host - The hostname or IP address of the remote server to connect to.
* @param {(string|number)} options.port - The port number on the remote host to connect to.
* @returns {Promise<socketInfo>} Information about the connected TCP socket.
*/
async connect(...args) {
// Parse arguments.
const [port, hostUnchecked] = parseOptionsArgs(args);
const hostname = hostUnchecked || '0.0.0.0';
if (this.#connecting) {
throw new Error('Socket is trying to connect.');
}
if (Number.isNaN(Number.parseInt(port))) {
throw new TypeError(`The "port" option must be castable to number.`);
}
if (hostname && typeof hostname !== 'string') {
throw new TypeError(`The "host" option must be of type string.`);
}
if (this.#id) {
throw new Error(
`Socket is already connected to <${this.remoteAddress}:${this.remotePort}>.`
);
}
this.#connecting = true;
// Use DNS lookup to resolve the hostname.
const addresses = await dns.lookup(hostname);
// Prefer IPv4 address.
const remoteHost = addresses.some((addr) => addr.family === 'IPv4')
? addresses.filter((addr) => addr.family === 'IPv4')[0].address
: addresses[0].address;
const { id, host, remote } = await binding.connect(
remoteHost,
Number.parseInt(port)
);
this.#id = id;
this.#connecting = false;
this.#writable = true;
this.#host = host;
this.remoteAddress = remote.address;
this.remotePort = remote.port;
const [onAvailableSocketData, signal] = callbackTimeout(
this.#onAvailableSocketData.bind(this),
this.timeout,
() => this.emit('timeout')
);
this.#timeoutHandle = signal;
this.emit('connect', { host, remote });
binding.readStart(this.#id, onAvailableSocketData);
return { host, remote };
}
/**
* Sets the encoding for the current socket.
*
* @param {String} [encoding] - The character encoding to use.
*/
setEncoding(encoding = 'utf-8') {
// Check the parameter type.
if (typeof encoding !== 'string') {
throw new TypeError('The "encoding" argument must be of type string.');
}
this.#encoding = encoding;
}
/**
* Sets the socket to timeout after timeout milliseconds of (read) inactivity on the socket.
*
* @param {Number} timeout - The duration after which the socket should timeout due to inactivity.
*/
setTimeout(timeout = 0) {
// Coalesce to number or NaN.
timeout *= 1;
// Check timeout's boundaries.
if (!(timeout >= 0 && timeout <= TIMEOUT_MAX)) {
timeout = 0;
}
if (this.#id) {
// Timeout value changed after the socket began waiting.
this.#timeoutHandle.emit('timeoutUpdate', timeout);
}
this.timeout = timeout;
}
/**
* Returns a promise which is fulfilled when the TCP stream can return a chunk.
*
* @returns {Promise<(Uint8Array|string)>} The chunk read from the socket.
*/
read() {
// Check if the socket is connected to a host.
if (!this.#id) return null;
// HACK: The following is used to handle uncaught errors thrown
// from the event-emitter when no one is subscribed to the `error` event.
if (this.listenerCount('error') === 0) this.on('error', () => {});
// No available value to read yet.
if (this.#pushQueue.length === 0) {
const { promise, promiseExt } = makeDeferredPromise();
this.#pullQueue.push(promiseExt);
return timeout(promise, this.timeout);
}
const value = this.#pushQueue.shift();
const action = value instanceof Error ? Promise.reject : Promise.resolve;
return action.call(Promise, value);
}
/**
* Writes contents to a TCP socket stream.
*
* @param {String|Uint8Array} data - The data to be written to the socket.
* @param {String} [encoding] - The character encoding to use.
* @returns {Promise<Number>} The number of bytes written.
*/
async write(data, encoding = 'utf-8') {
// Check the data argument type.
if (!(data instanceof Uint8Array) && typeof data !== 'string') {
throw new TypeError(
`The "data" argument must be of type string or Uint8Array.`
);
}
if (!this.#id) {
throw new Error(`Socket is not connected to a remote host.`);
}
if (!this.#writable) {
throw new Error(`The socket stream is not writable.`);
}
// Default to UTF-8 encoding.
encoding = encoding || this.#encoding || 'utf-8';
const bytes = toUint8Array(data, encoding);
const bytesWritten = await binding.write(this.#id, bytes);
this.bytesWritten += bytesWritten;
return bytesWritten;
}
/**
* Half-closes the TCP stream.
*
* @param {String|Uint8Array} data - The (last) data to be written to the socket.
* @param {String} [encoding] - The character encoding to use.
* @returns {Promise<void>}
*/
async end(data, encoding = 'utf-8') {
// Check socket connection.
if (!this.#id) return;
// If data is given, write to stream.
if (data) {
await this.write(data, encoding);
}
this.#writable = false;
await binding.shutdown(this.#id);
}
/**
* Closes both sides of the TCP sockets.
*/
async destroy() {
// Check if the socket is indeed connected.
if (!this.#id) return;
this.#timeoutHandle?.emit('timeoutUpdate', 0);
await binding.close(this.#id);
// Ignore pending reads.
for (const promise of this.#pullQueue) {
promise.resolve(null);
}
this.#reset();
this.emit('close');
}
/**
* Returns the bound address, the address family name and port of the socket.
*
* @returns {SocketHost} - Information about the local endpoint of the socket.
*/
address() {
return this.#host;
}
/**
* Resets socket's internal state (not to be called manually).
* @ignore
*/
#reset() {
this.#id = undefined;
this.#pushQueue = [];
this.#pullQueue = [];
this.#connecting = false;
this.#timeoutHandle = undefined;
this.bytesRead = 0;
this.bytesWritten = 0;
this.remotePort = undefined;
this.remoteAddress = undefined;
this.timeout = 0;
}
#asyncDispatch(value) {
if (this.#pullQueue.length === 0) {
this.#pushQueue.push(value);
return;
}
const promise = this.#pullQueue.shift();
const action = value instanceof Error ? promise.reject : promise.resolve;
action(value);
}
#onAvailableSocketData(err, arrayBufferView) {
// Check for errors during socket read.
if (err) {
this.#asyncDispatch(err);
this.emit('error', err);
return;
}
// Check if the remote host closed the connection.
if (arrayBufferView.byteLength === 0) {
this.#asyncDispatch(null);
this.emit('end');
this.destroy();
return;
}
this.bytesRead += arrayBufferView.byteLength;
// Transform ArrayBuffer into a Uint8Array we can use.
const data = new Uint8Array(arrayBufferView);
const data_transform = this.#encoding
? new TextDecoder(this.#encoding).decode(new Uint8Array(data))
: data;
// Use the EE mode instead of the async-iterator.
if (this.listenerCount('data') > 0) {
this.emit('data', data_transform);
return;
}
this.#asyncDispatch(data_transform);
}
/**
* Hard-sets the ID of the socket (ONLY for internal use).
*
* @param {Number} id - The resource ID existing in the event-loop.
* @ignore
*/
[kSetSocketIdUnchecked](id) {
this.#id = id;
this.#writable = true;
const [onAvailableSocketData, signal] = callbackTimeout(
this.#onAvailableSocketData.bind(this),
this.timeout,
() => this.emit('timeout')
);
this.#timeoutHandle = signal;
binding.readStart(this.#id, onAvailableSocketData);
}
async *[kAsyncGenerator](signal) {
// Close socket on stream pipeline errors.
if (signal) signal.on('uncaughtStreamException', () => this.destroy());
let data;
while ((data = await this.read())) {
if (!data) break;
yield data;
}
}
/**
* The socket should be async iterable.
* @ignore
*/
[Symbol.asyncIterator](signal) {
const iterator = { return: () => this.end() };
return Object.assign(this[kAsyncGenerator](signal), iterator);
}
}
/**
* A Server object is a wrapper around a TCP listener.
*
* @fires listening - Emitted when the server has been bound.
* @fires connection - Emitted when a new connection is made.
* @fires close - Emitted when the server stops accepting new connections.
* @fires error - Emitted when an error occurs.
*/
export class Server extends EventEmitter {
#id;
#host;
#pushQueue;
#pullQueue;
/**
* Creates a new Server instance.
*
* @returns {Server} An instance of the TCP `Server` class.
*/
constructor() {
super();
this.#pushQueue = [];
this.#pullQueue = [];
}
/**
* Starts listening for incoming connections.
*
* @param {(string|number)} port - The port number or string on which the server should listen.
* @param {string} host - The hostname or IP address on which the server will listen.
* @returns {Promise<SocketHost>} The host information where the server is listening.
*/
async listen(...args) {
// Parse arguments.
const [port, hostUnchecked] = parseOptionsArgs(args);
const hostname = hostUnchecked || '127.0.0.1';
if (Number.isNaN(Number.parseInt(port))) {
throw new TypeError(`The "port" option must be castable to number.`);
}
if (hostname && typeof hostname !== 'string') {
throw new TypeError(`The "host" option must be of type string.`);
}
if (this.#id) {
throw new Error(`Server is already listening for connections.`);
}
// Use DNS lookup to resolve the local listening interface.
const addresses = await dns.lookup(hostname);
// Prefer IPv4 address.
const host = addresses.some((addr) => addr.family === 'IPv4')
? addresses.filter((addr) => addr.family === 'IPv4')[0].address
: addresses[0].address;
// Bind server to address, and start listening for connections.
const socketInfo = binding.listen(
host,
port,
this.#onAvailableConnection.bind(this)
);
this.#id = socketInfo.id;
this.#host = socketInfo.host;
this.emit('listening', this.#host);
return this.#host;
}
/**
* Waits for a TCP client to connect and accepts the connection.
*
* @returns {Promise<Socket>} - A `Socket` object representing the connected client.
*/
accept() {
// Check if the server is listening.
if (!this.#id) {
throw new Error(`Server is not bound to a port.`);
}
// HACK: The following is used to handle uncaught errors thrown
// from the event-emitter when no one is subscribed to the `error` event.
if (this.listenerCount('error') === 0) this.on('error', () => {});
// No available connection yet.
if (this.#pushQueue.length === 0) {
const { promise, promiseExt } = makeDeferredPromise();
this.#pullQueue.push(promiseExt);
return promise;
}
const socket = this.#pushQueue.shift();
const action = socket instanceof Error ? Promise.reject : Promise.resolve;
return action.call(Promise, socket);
}
/**
* Stops the server from accepting new connections.
*/
async close() {
// Check if the server is already closed.
if (!this.#id) {
throw new Error('Server is already closed.');
}
await binding.close(this.#id);
this.emit('close');
}
/**
* Returns the bound address, the address family name and port of the socket.
*
* @returns {SocketHost} The host information where the server is listening.
*/
address() {
return this.#host;
}
#asyncDispatch(socket) {
if (this.#pullQueue.length === 0) {
this.#pushQueue.push(socket);
return;
}
const promise = this.#pullQueue.shift();
const action = socket instanceof Error ? promise.reject : promise.resolve;
action(socket);
}
#onAvailableConnection(err, sockInfo) {
// Check for socket connection errors.
if (err) {
this.#asyncDispatch(err);
this.emit('error', err);
return;
}
// Create a new socket instance.
const socket = new Socket();
const { id, remoteAddress, remotePort } = sockInfo;
socket[kSetSocketIdUnchecked](id);
socket.remoteAddress = remoteAddress;
socket.remotePort = remotePort;
// Check if a connection handler is specified.
if (this.listenerCount('connection') > 0) {
this.emit('connection', socket);
return;
}
this.#asyncDispatch(socket);
}
async *[kAsyncGenerator]() {
let socket;
while ((socket = await this.accept())) {
yield socket;
}
}
/**
* The server should be async iterable.
* @ignore
*/
[Symbol.asyncIterator]() {
const iterator = { return: () => this.close() };
return Object.assign(this[kAsyncGenerator](), iterator);
}
}
export default {
TimeoutError,
Socket,
connect,
createConnection,
Server,
createServer,
};