Nexus API Reference

ThreadQueueTransport
in package
implements WorkerTransport

FinalYes
Tags
psalm-api

Thread-safe transport backed by Swoole\Thread\Queue.

send() — pushes Envelope to target worker's Queue. Thread\Queue serializes the object internally via php_serialize; no explicit serializer needed.

listen() — registers a handler and spawns an adaptive-poll coroutine that reads from this worker's own Queue. Thread\Queue::pop() blocks the entire OS thread, so pop(0) (non-blocking) is used with Coroutine::sleep() backoff to remain coroutine-friendly.

Adaptive poll backoff (Swoole minimum sleep is 1 ms):

  • Messages present → continue immediately (no sleep, tight drain loop)
  • 1–99 empty polls → 1ms sleep (recently active, check again soon)
  • 100–999 empty polls → 5ms sleep
  • 1000+ empty polls → 10ms sleep (idle steady state)
  • Message arrives → reset counter, return to tight drain loop

Table of Contents

Interfaces

WorkerTransport

Methods

__construct()  : mixed
close()  : void
Close the transport and release resources.
isStopped()  : bool
Whether stop() has been called on this transport.
listen()  : void
Register a listener for incoming envelopes.
send()  : void
Send an envelope to the target worker.
stop()  : void
Signal the transport to stop. The receive loop exits cooperatively on its next backoff wakeup (within ~10ms in the worst case).

Methods

__construct()

public __construct(array<int, Queue$queues, int $workerId) : mixed
Parameters
$queues : array<int, Queue>

All workers' queues, keyed by worker ID

$workerId : int

isStopped()

Whether stop() has been called on this transport.

public isStopped() : bool
Return values
bool

listen()

Register a listener for incoming envelopes.

public listen(callable $onEnvelope) : void
Parameters
$onEnvelope : callable

send()

Send an envelope to the target worker.

public send(int $targetWorker, Envelope $envelope) : void
Parameters
$targetWorker : int
$envelope : Envelope

stop()

Signal the transport to stop. The receive loop exits cooperatively on its next backoff wakeup (within ~10ms in the worst case).

public stop() : void

Idempotent — calling stop() on an already-stopped transport is a no-op.

Required for graceful shutdown: without stop(), the receive loop blocks indefinitely and the worker cannot exit cleanly.


        
On this page

Search results