laravel reverb ai streaming

Laravel Reverb AI Streaming: Token-by-Token WebSocket Delivery

Laravel Reverb AI streaming gives you a persistent, bidirectional WebSocket channel between the browser and your backend while the model is generating. For unidirectional token delivery, SSE is the simpler default. When you need mid-stream cancellation, presence-aware sessions, or a channel that survives the HTTP request lifecycle, SSE’s one-directional constraint becomes a hard ceiling. This article is part of the real-time AI UX module and covers the complete implementation: a queue worker opens the LLM stream, dispatches token batches via ShouldBroadcastNow, and the Echo client appends them without polling or additional HTTP requests. Channel authentication, disconnect detection, and production process management are all included.

Why Reverb for AI Streaming

The streaming transport selection guide covers the SSE vs WebSockets decision in full. The production SSE hardening guide covers reconnects, timeouts, and multi-tenant event streams for that path.

Reverb earns its added complexity in four situations: the client needs to cancel generation mid-stream and the server must distinguish a deliberate cancel from a connection drop; multiple users share a live AI response in the same session; you are building an SPA where the prompt session outlives the original HTTP request; or you want presence tracking and Laravel’s full broadcasting event system behind your AI delivery layer.

The table below maps both transports against common AI streaming requirements. Both options require Redis pub/sub for horizontal scaling. Infrastructure overhead is the primary tradeoff, not raw delivery capability.

CapabilitySSEReverb
Unidirectional token deliveryYesYes
Bidirectional messagingNoYes
Mid-stream client cancellationHTTP cancel endpointHTTP cancel endpoint
Presence trackingNoYes
SPA session persistenceManual reconnectAutomatic via Echo
Laravel broadcasting APINoYes
Horizontal scalingRedis pub/sub requiredRedis pub/sub required
Additional infrastructureNoneReverb server process

Pipeline Architecture

The pipeline splits into two distinct flows.

HTTP/queue path (steps ①–④): The browser generates a UUID session ID, subscribes to the private channel, then POSTs the prompt alongside the session ID. The controller persists the session via Eloquent and dispatches a StreamAiResponse job to the Redis queue. The Queue Worker picks it up and opens a streaming request to the LLM API.

Broadcast path (steps ⑤–⑦): As token chunks arrive, the worker accumulates them into a buffer and dispatches AiTokenReceived events that implement ShouldBroadcastNow. Laravel’s broadcasting system sends each event directly to Reverb, bypassing the queue. Reverb pushes the payload down the client’s authenticated private WebSocket channel. The Echo listener appends content to the UI.

Channel authentication is a separate HTTP handshake. The browser requests subscription to private-ai-stream.{sessionId}, Laravel validates session ownership, and Reverb grants the subscription.

BROWSER Laravel Echo pusher-js REVERB WebSocket Server Pusher Protocol LARAVEL APP Controller · AiSession REDIS QUEUE StreamAiResponse QUEUE WORKER Anthropic stream loop LLM API claude-sonnet-4-6 ① POST /ai/stream ② dispatch() ③ consume ④ stream req ⑤ token chunks ⑥ ShouldBroadcastNow ⑦ WebSocket push private channel auth (/broadcasting/auth) HTTP / Queue path Broadcast / WebSocket path Token stream (LLM)


Installing Reverb

composer require laravel/reverb
php artisan reverb:install

The Artisan CLI installer handles everything: it publishes config/reverb.php, registers routes/channels.php in the application bootstrap, and appends the required environment variables. Update your .env:

BROADCAST_CONNECTION=reverb

REVERB_APP_ID=your-app-id
REVERB_APP_KEY=your-app-key
REVERB_APP_SECRET=your-app-secret
REVERB_HOST=127.0.0.1
REVERB_PORT=8080
REVERB_SCHEME=http

VITE_REVERB_APP_KEY="${REVERB_APP_KEY}"
VITE_REVERB_HOST="${REVERB_HOST}"
VITE_REVERB_PORT="${REVERB_PORT}"
VITE_REVERB_SCHEME="${REVERB_SCHEME}"

Use BROADCAST_CONNECTION, not the older BROADCAST_DRIVER alias. Both resolve in Laravel 13, but they diverge after certain framework-level configuration changes and BROADCAST_CONNECTION is canonical.

REVERB_HOST=127.0.0.1 assumes nginx or Caddy proxies WebSocket connections to Reverb locally, which is the recommended production setup. If you expose Reverb directly without a reverse proxy, set 0.0.0.0 and open the configured port in your firewall.

Session Model and Controller

The client generates the UUID session ID before any server interaction. This ensures the private channel name is known before the Echo subscription opens, which eliminates any race condition between subscription setup and early token dispatch from the queue worker.

// app/Models/AiSession.php

namespace App\Models;

use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\Relations\BelongsTo;

class AiSession extends Model
{
    protected $keyType = 'string';
    public $incrementing = false;

    protected $fillable = ['id', 'user_id'];

    public function user(): BelongsTo
    {
        return $this->belongsTo(User::class);
    }
}

The migration keeps the schema minimal:

// database/migrations/xxxx_create_ai_sessions_table.php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        Schema::create('ai_sessions', function (Blueprint $table) {
            $table->uuid('id')->primary();
            $table->foreignId('user_id')->constrained()->cascadeOnDelete();
            $table->timestamps();
        });
    }

    public function down(): void
    {
        Schema::dropIfExists('ai_sessions');
    }
};

The controller validates the client-supplied UUID, persists the Eloquent session record, dispatches the job, and returns immediately:

// app/Http/Controllers/AiStreamController.php

namespace App\Http\Controllers;

use App\Jobs\StreamAiResponse;
use App\Models\AiSession;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;

class AiStreamController extends Controller
{
    public function store(Request $request): JsonResponse
    {
        $validated = $request->validate([
            'session_id' => ['required', 'uuid'],
            'prompt'     => ['required', 'string', 'max:4000'],
        ]);

        $session = AiSession::create([
            'id'      => $validated['session_id'],
            'user_id' => $request->user()->id,
        ]);

        StreamAiResponse::dispatch($session->id, $validated['prompt']);

        return response()->json(['session_id' => $session->id]);
    }
}

The Broadcast Event and Channel Authentication

Two decisions determine whether this holds up under load.

Use ShouldBroadcastNow, not ShouldBroadcast. ShouldBroadcast queues each broadcast event as a second job. At 40–80 tokens per second per active stream, that is 40–80 additional queue writes per second, and each token event waits behind whatever is ahead of it. ShouldBroadcastNow bypasses the queue entirely and writes directly to the Reverb WebSocket server. For token streaming this is the only viable choice.

Keep the payload minimal. Reverb serialises every AiTokenReceived event to JSON and pushes it to the subscribed client. No Eloquent models, no nested objects, no metadata the client does not consume.

// app/Events/AiTokenReceived.php

namespace App\Events;

use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;

class AiTokenReceived implements ShouldBroadcastNow
{
    use Dispatchable, InteractsWithSockets;

    public function __construct(
        public readonly string $sessionId,
        public readonly string $content,
        public readonly bool $done = false,
    ) {}

    public function broadcastOn(): array
    {
        return [
            new PrivateChannel('ai-stream.' . $this->sessionId),
        ];
    }

    public function broadcastAs(): string
    {
        return 'token.received';
    }
}

Channel authorization goes in routes/channels.php. The callback validates session ownership with a single Eloquent query:

// routes/channels.php

use App\Models\AiSession;
use Illuminate\Support\Facades\Broadcast;

Broadcast::channel('ai-stream.{sessionId}', function ($user, string $sessionId): bool {
    return AiSession::where('id', $sessionId)
        ->where('user_id', $user->id)
        ->exists();
});

[Edge Case Alert] Return a strict bool from private channel callbacks. A non-boolean return sends user data back in the authorization response, which is intentional for presence channels but is a data leak for private ones. exists() already returns bool. The explicit return type hint on the closure enforces this through future refactors.

Streaming Inside the Queue Worker

The job holds the LLM streaming connection open for the full duration of generation and batches content into broadcast events on each iteration.

On token batching. Broadcasting on every individual token is not practical. Modern inference at 40–80 tokens per second means 40–80 ShouldBroadcastNow dispatches per second per stream. Batching by character count rather than token count decouples the flush rate from the model’s tokenizer. A 15-character threshold maps to roughly 4–5 tokens and delivers at 10–15 broadcast events per second. That is smooth to the human eye and a fraction of the raw overhead.

[Efficiency Gain] Character-based batching means switching LLM providers or models does not change the perceived UX. The flush rate stays consistent regardless of tokenizer differences between claude-sonnet-4-6 and gpt-4o.

This article uses the Anthropic PHP SDK (anthropics/anthropic-sdk-php) directly rather than the laravel/ai SDK. The Anthropic SDK’s streaming iterator exposes typed event objects with a known chunk structure (content_block_delta) that we access inside the loop. For driver-agnostic implementations, the pattern adapts directly once the SDK is configured. The Claude API integration guide covers full SDK setup and driver configuration.

// app/Jobs/StreamAiResponse.php

namespace App\Jobs;

use App\Events\AiTokenReceived;
use Anthropic\Laravel\Facades\Anthropic;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Cache;

class StreamAiResponse implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable;

    public int $timeout = 120;
    public int $tries = 1;

    public function __construct(
        public readonly string $sessionId,
        public readonly string $prompt,
    ) {}

    public function handle(): void
    {
        $buffer    = '';
        $threshold = 15; // characters

        try {
            $stream = Anthropic::messages()->createStreamed([
                'model'      => 'claude-sonnet-4-6',
                'max_tokens' => 2048,
                'messages'   => [
                    ['role' => 'user', 'content' => $this->prompt],
                ],
            ]);

            foreach ($stream as $event) {
                if (Cache::get("ai-stream:{$this->sessionId}:cancelled")) {
                    break;
                }

                if ($event->type === 'content_block_delta') {
                    $buffer .= $event->delta->text ?? '';

                    if (strlen($buffer) >= $threshold) {
                        AiTokenReceived::dispatch($this->sessionId, $buffer);
                        $buffer = '';
                    }
                }
            }

            // Flush any content remaining in the buffer before signalling completion.
            if ($buffer !== '') {
                AiTokenReceived::dispatch($this->sessionId, $buffer);
            }
        } catch (\Anthropic\Exceptions\RateLimitException $e) {
            AiTokenReceived::dispatch(
                $this->sessionId,
                '[Rate limit reached. Please try again shortly.]',
                done: true,
            );
            $this->fail($e);
            return;
        } catch (\Throwable $e) {
            AiTokenReceived::dispatch(
                $this->sessionId,
                '[Stream error. Generation did not complete.]',
                done: true,
            );
            report($e);
            $this->fail($e);
            return;
        }

        AiTokenReceived::dispatch($this->sessionId, '', done: true);
    }
}

$tries = 1 is deliberate. If the job retries after partial token delivery, the client receives duplicate content with no way to deduplicate at the broadcast level. Failures are surfaced to the client as a terminal done event with an error message and recorded in failed_jobs via $this->fail() for debugging. For rate-limit retry logic, dispatch a fresh job with an explicit delay rather than relying on the queue’s built-in retry mechanism.

[Production Pitfall] Set $timeout above your expected maximum generation time. At 2,048 max_tokens and 40 tokens per second, a stream can run for over 50 seconds. 120 seconds is a reasonable floor. The queue worker’s --timeout flag must also exceed the job timeout. Set it to at least $timeout + 30 seconds so the job has time to execute its cleanup path before the worker forcibly kills the process.

Handling Client Disconnects

When the browser tab closes, the WebSocket connection drops and Reverb cleans up the channel subscription. The queue job continues running and continues spending inference budget on a dead channel.

A cancellation flag in the cache layer is the simplest reliable fix. The job checks it on every batch iteration, and the worst case is one extra batch dispatched in the window between the flag being written and the next check:

// routes/api.php

use App\Models\AiSession;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Route;

Route::post('/ai/stream/{sessionId}/cancel', function (string $sessionId) {
    abort_if(
        AiSession::where('id', $sessionId)
            ->where('user_id', auth()->id())
            ->doesntExist(),
        403,
    );

    Cache::put("ai-stream:{$sessionId}:cancelled", true, now()->addMinutes(5));

    return response()->noContent();
})->middleware('auth:sanctum');

On the frontend, use fetch with keepalive: true in the beforeunload handler. The keepalive flag tells the browser to complete the request after the page unloads, with full fetch semantics including CSRF headers:

window.addEventListener('beforeunload', () => {
    fetch(`/api/ai/stream/${sessionId}/cancel`, {
        method:   'POST',
        keepalive: true,
        headers:  { 'X-CSRF-TOKEN': csrfToken },
    });
});

keepalive requests have a 64 KB total payload limit per the Fetch specification. For this cancel endpoint, which sends no body, that limit is not relevant. Do not carry keepalive into other requests with large payloads. Call the same endpoint from any explicit cancel button in the UI. UUID-based session IDs ensure the cancellation cache key is unique per user per stream, preventing collisions in workers processing concurrent jobs.

Frontend Integration with Echo

Install the dependencies:

npm install laravel-echo pusher-js

Reverb uses the Pusher WebSocket protocol. pusher-js is the underlying transport regardless of whether you connect to Pusher or Reverb.

// resources/js/echo.js

import Echo  from 'laravel-echo';
import Pusher from 'pusher-js';

window.Pusher = Pusher;

export const echo = new Echo({
    broadcaster:       'reverb',
    key:               import.meta.env.VITE_REVERB_APP_KEY,
    wsHost:            import.meta.env.VITE_REVERB_HOST,
    wsPort:            import.meta.env.VITE_REVERB_PORT  ?? 8080,
    wssPort:           import.meta.env.VITE_REVERB_PORT  ?? 443,
    forceTLS:          (import.meta.env.VITE_REVERB_SCHEME ?? 'https') === 'https',
    enabledTransports: ['ws', 'wss'],
});

The client generates the session ID via crypto.randomUUID(), which is available in all modern browsers without a dependency. It subscribes to the private channel before sending the HTTP request. Subscribing first closes the window where early token events dispatched by the queue worker could arrive before the handshake completes:

const sessionId = crypto.randomUUID();

echo.private(`ai-stream.${sessionId}`)
    .listen('.token.received', ({ content, done }) => {
        if (done) {
            echo.leave(`ai-stream.${sessionId}`);
            return;
        }
        outputEl.textContent += content;
    });

await fetch('/ai/stream', {
    method:  'POST',
    headers: {
        'Content-Type': 'application/json',
        'X-CSRF-TOKEN':  csrfToken,
    },
    body: JSON.stringify({ session_id: sessionId, prompt }),
});

The .token.received event name carries a leading dot. That dot is Echo’s convention for custom event names defined by broadcastAs(). Without it, Echo expects a fully-qualified class name as the event identifier and will not match your event.

[Production Pitfall] The channel authorization request goes to /broadcasting/auth. In local development with a Vite dev server proxying requests on a different port, this endpoint returns 403 due to cookie scope or CSRF mismatches. Add an explicit proxy rule for /broadcasting/auth in your vite.config.js and verify cookie scope before treating it as a Reverb problem.

For a PHP-side approach to real-time AI responses without managing an Echo subscription directly, the Livewire Claude API implementation guide covers that pattern end to end.

Running Reverb in Production

Start Reverb as a long-lived process via the Artisan CLI:

php artisan reverb:start --host=127.0.0.1 --port=8080

Manage it with Supervisor alongside your Queue Worker processes:

[program:reverb]
process_name=%(program_name)s
command=php /var/www/html/artisan reverb:start --host=127.0.0.1 --port=8080
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/reverb.log
stopwaitsecs=10

Configure nginx to proxy WebSocket connections to Reverb. Set proxy_read_timeout above the job $timeout with margin. A job timeout of 120 seconds needs at least 150 seconds at the proxy to avoid the connection being closed at the boundary before the job completes its cleanup path:

location /app/ {
    proxy_pass         http://127.0.0.1:8080;
    proxy_http_version 1.1;
    proxy_set_header   Upgrade    $http_upgrade;
    proxy_set_header   Connection "upgrade";
    proxy_set_header   Host       $host;
    proxy_read_timeout 150s;
}

For horizontal scaling across multiple Reverb nodes, two requirements apply. First, configure a shared Redis instance as the Reverb pub/sub backend in config/reverb.php under the scaling key. Without it, broadcasts from a worker on node A cannot reach clients connected to node B because the two Reverb instances have no shared channel state. Second, set a unique REVERB_SERVER_ID environment variable per node. Reverb uses this to prevent routing loops when a shared-bus message arrives at multiple nodes simultaneously.

[Architect’s Note] Queue Worker configuration matters as much as Reverb setup for AI streaming workloads. Workers handling streaming jobs need higher timeouts and dedicated queue names to prevent long-running stream jobs from blocking short-lived tasks queued behind them. The Horizon AI queue workload configuration guide covers supervisor group isolation and timeout setup in full.


For official reference, the Laravel Reverb documentation covers scaling configuration in detail. The Anthropic streaming messages reference documents the full event type surface, including error events and stream lifecycle signals relevant to production error handling.


Frequently Asked Questions

Why use ShouldBroadcastNow instead of ShouldBroadcast for token events?

Why use ShouldBroadcastNow instead of ShouldBroadcast for token events? ShouldBroadcast places each broadcast event back into the queue as a second job. At 40–80 tokens per second, every token waits behind whatever is ahead of it in the queue before reaching the client. ShouldBroadcastNow bypasses the queue entirely and writes directly to the Reverb WebSocket server, delivering tokens as fast as the LLM produces them. For token streaming, ShouldBroadcast is the wrong interface.

Does Reverb need to run as a separate process from the queue workers?

Yes. Reverb is a standalone WebSocket server started with php artisan reverb:start. It runs independently of your queue workers and your PHP-FPM or Octane application process. All three need to be running concurrently for the full pipeline to function. Supervisor manages all three as separate process groups.

What happens to the queue job if the client disconnects mid-stream?

The job continues running until the stream completes or the process timeout is hit, billing output tokens to a dead channel. The cancellation flag pattern in this article addresses this: a POST to the cancel endpoint writes a flag to the cache, and the job checks it on every batch iteration. The keepalive fetch in the beforeunload handler fires the cancel request automatically when the tab closes.

How do I scale Reverb horizontally across multiple servers?

Configure a shared Redis instance as the pub/sub backend in config/reverb.php under the scaling key and set a unique REVERB_SERVER_ID per node. With shared Redis, broadcasts from any queue worker on any server propagate to all Reverb nodes, which push to their locally connected clients. Without shared Redis, only clients connected to the same node as the broadcasting worker receive events.

Dewald Hugo

A software architect with 15+ years of experience in the PHP and Laravel ecosystem. Dewald created Origin Main to provide the engineering rigour required to integrate AI into professional, high-concurrency production systems. He writes for developers who care less about "getting it to work" and more about "getting it to last".

Subscribe
Notify of
0 Comments
Oldest
Newest Most Voted
Quick Navigation
Scroll to Top