TCP

composer require php-standard-library/tcp

The TCP component provides a non-blocking API for TCP client and server connections, built on top of PSL's IO and Network abstractions.

Usage

use Psl\Async;
use Psl\TCP;

$listener = TCP\listen('127.0.0.1');

Async\concurrently([
    'server' => static function () use ($listener): void {
        $connection = $listener->accept();
        $request = $connection->readAll();
        $connection->writeAll("echo: {$request}");
        $connection->close();
        $listener->close();
    },
    'client' => static function () use ($listener): void {
        $address = $listener->getLocalAddress();
        $client = TCP\connect($address->host, $address->port ?? 0);
        $client->writeAll('hello');
        $client->shutdown();
        $response = $client->readAll();
        $client->close();
    },
]);

Design

ConnectorInterface

All TCP connectors implement ConnectorInterface, making them interchangeable and composable. The built-in connectors are:

SocketPool

SocketPool reuses idle TCP connections. Checked-in connections stay alive for a configurable idle timeout before being closed.

use Psl\Async;
use Psl\TCP;

$listener = TCP\listen('127.0.0.1');
$address = $listener->getLocalAddress();

Async\concurrently([
    'server' => static function () use ($listener): void {
        $connection = $listener->accept();
        // Keep connection open until client is done
        $connection->readAll();
        $connection->close();
        $listener->close();
    },
    'client' => static function () use ($address): void {
        $pool = new TCP\SocketPool();
        $stream = $pool->checkout($address->host, $address->port ?? 0);
        // ... use stream ...
        $pool->checkin($stream);

        // Later -- reuses the same connection
        $stream = $pool->checkout($address->host, $address->port ?? 0);
        $pool->clear($stream);
        $pool->close();
    },
]);

Configuration

TCP\listen() and TCP\connect() accept configuration objects that control socket behavior:

All configuration objects are immutable and provide with* builder methods for fluent configuration:

use Psl\IO;
use Psl\TCP;

// Configuration objects are immutable with fluent with* builder methods
$config = TCP\ListenConfiguration::default()
    ->withNoDelay(true)
    ->withBacklog(2048)
    ->withIdleConnections(128);

$listener = TCP\listen('127.0.0.1', 0, $config);

$address = $listener->getLocalAddress();
IO\write_line('Listening on %s:%d', $address->host, $address->port ?? 0);

$listener->close();

// ConnectConfiguration works the same way
$connectConfig = TCP\ConnectConfiguration::default()->withNoDelay(true);

IO\write_line('Connect config noDelay: %s', $connectConfig->noDelay ? 'true' : 'false');
use Psl\TCP;

// Default backlog of 512
$listener = TCP\listen('127.0.0.1', 8080);

// High-throughput server with larger backlog
$listener = TCP\listen('127.0.0.1', 8080, new TCP\ListenConfiguration(backlog: 4096));

Low-Level Socket

Socket gives you fine-grained control over socket creation. Create a socket, bind to an address, then pass a configuration to listen() or connect():

use Psl\Async;
use Psl\TCP\Socket;

$socket = Socket::createV4();
$socket->bind('127.0.0.1', 0);

$listener = $socket->listen(new \Psl\TCP\ListenConfiguration(reuseAddress: true, reusePort: true, noDelay: true));

Async\concurrently([
    'server' => static function () use ($listener): void {
        $connection = $listener->accept();
        $data = $connection->readAll();
        $connection->writeAll($data);
        $connection->close();
        $listener->close();
    },
    'client' => static function () use ($listener): void {
        $address = $listener->getLocalAddress();
        $client = \Psl\TCP\connect($address->host, $address->port ?? 0);
        $client->writeAll('test');
        $client->shutdown();
        $response = $client->readAll();
        $client->close();
    },
]);

Examples

Echo Server

use Psl\Async;
use Psl\TCP;

$listener = TCP\listen('127.0.0.1');

Async\concurrently([
    'server' => static function () use ($listener): void {
        echo "Listening on {$listener->getLocalAddress()->toString()}\n";

        // Accept one connection then shut down
        $connection = $listener->accept();
        Async\run(static function () use ($connection): void {
            $data = $connection->readAll();
            $connection->writeAll($data);
            $connection->close();
        })->await();

        $listener->close();
    },
    'client' => static function () use ($listener): void {
        $address = $listener->getLocalAddress();
        $client = TCP\connect($address->host, $address->port ?? 0);
        $client->writeAll('hello from client');
        $client->shutdown();
        $response = $client->readAll();
        echo "Got: {$response}\n";
        $client->close();
    },
]);

Client with Cancellation

use Psl\Async;
use Psl\DateTime\Duration;
use Psl\TCP;

$listener = TCP\listen('127.0.0.1');

Async\concurrently([
    'server' => static function () use ($listener): void {
        $connection = $listener->accept();
        $_ = $connection->readAll();
        $connection->writeAll("HTTP/1.0 200 OK\r\n\r\nHello");
        $connection->close();
        $listener->close();
    },
    'client' => static function () use ($listener): void {
        $address = $listener->getLocalAddress();
        $client = TCP\connect(
            $address->host,
            $address->port ?? 0,
            cancellation: new Async\TimeoutCancellationToken(Duration::seconds(5)),
        );
        $client->writeAll("GET / HTTP/1.0\r\nHost: localhost\r\n\r\n");
        $client->shutdown();
        $_ = $client->readAll();
        $client->close();
    },
]);

Retry with Backoff

use Psl\Async;
use Psl\DateTime\Duration;
use Psl\TCP;

// Create a retry connector with exponential backoff
$connector = new TCP\RetryConnector(
    new TCP\Connector(new TCP\ConnectConfiguration(noDelay: true)),
    maxAttempts: 5,
    backoff: Duration::milliseconds(500),
);

// Demonstrate by connecting to a local server
$listener = TCP\listen('127.0.0.1');

Async\concurrently([
    'server' => static function () use ($listener): void {
        $connection = $listener->accept();
        $connection->writeAll('connected');
        $connection->close();
        $listener->close();
    },
    'client' => static function () use ($connector, $listener): void {
        $address = $listener->getLocalAddress();
        $stream = $connector->connect($address->host, $address->port ?? 0);
        $data = $stream->readAll();
        $stream->close();
    },
]);

Cancellable Accept

ListenerInterface::accept() accepts a CancellationTokenInterface, allowing you to cancel waiting for connections, for example during a graceful shutdown:

use Psl\Async;
use Psl\IO;
use Psl\TCP;

$listener = TCP\listen('127.0.0.1', 0);
$token = new Async\SignalCancellationToken();

// Simulate shutdown after 50ms
Async\Scheduler::delay(Psl\DateTime\Duration::milliseconds(50), static fn(string $_) => $token->cancel());

while (true) {
    try {
        $conn = $listener->accept($token);
    } catch (Async\Exception\CancelledException) {
        IO\write_line('Graceful shutdown');
        break;
    }
}

$listener->close();

Restricted Listener

RestrictedListener wraps any ListenerInterface and restricts connections to a set of allowed IP\Address and CIDR\Block entries. Rejected connections are closed silently.

use Psl\Async;
use Psl\CIDR;
use Psl\IO;
use Psl\IP;
use Psl\TCP;

// Only allow connections from localhost and the 10.0.0.0/8 private range
$inner = TCP\listen('127.0.0.1', 0);
$listener = new TCP\RestrictedListener($inner, [
    IP\Address::parse('127.0.0.1'),
    new CIDR\Block('10.0.0.0/8'),
]);

$address = $listener->getLocalAddress();
IO\write_line('Restricted listener on %s:%d', $address->host, $address->port ?? 0);

$token = new Async\SignalCancellationToken();
Async\Scheduler::delay(Psl\DateTime\Duration::milliseconds(50), static fn() => (
    // Simulate shutdown after 50ms
    $token->cancel()
));

while (true) {
    try {
        $stream = $listener->accept($token);

        IO\write_line('Accepted from %s', $stream->getPeerAddress()->host);

        $stream->close();
    } catch (Async\Exception\CancelledException) {
        break;
    }
}

$listener->close();

Composite Listener

Network\CompositeListener accepts connections from multiple listeners concurrently through a single accept() call. Each inner listener runs its own accept loop in a separate fiber, and connections are funneled through a shared channel. Closing the composite listener closes all inner listeners.

use Psl\Async;
use Psl\IO;
use Psl\Network;
use Psl\TCP;

// Listen on two different ports simultaneously
$listener1 = TCP\listen('127.0.0.1', 0);
$listener2 = TCP\listen('127.0.0.1', 0);

$composite = new Network\CompositeListener([$listener1, $listener2]);

$addr1 = $listener1->getLocalAddress();
$addr2 = $listener2->getLocalAddress();
IO\write_line('Listening on %s:%d and %s:%d', $addr1->host, $addr1->port ?? 0, $addr2->host, $addr2->port ?? 0);

// Simulate shutdown after 50ms
$token = new Async\SignalCancellationToken();
Async\Scheduler::delay(Psl\DateTime\Duration::milliseconds(50), static fn(string $_) => $token->cancel());

while (true) {
    try {
        $stream = $composite->accept($token);

        IO\write_line('Connection from %s', $stream->getPeerAddress()->host);

        $stream->close();
    } catch (Async\Exception\CancelledException) {
        break;
    }
}

// Closes all inner listeners
$composite->close();

See src/Psl/TCP/ for the full API.