1
0
Fork 0
mirror of https://github.com/maziggy/bambuddy.git synced 2026-05-09 08:25:54 +02:00

[GH-ISSUE #1089] Feature: Camera stream fan-out — support multiple concurrent viewers per printer #775

Closed
opened 2026-05-07 00:13:42 +02:00 by BreizhHardware · 2 comments

Originally created by @swheettaos on GitHub (Apr 22, 2026).
Original GitHub issue: https://github.com/maziggy/bambuddy/issues/1089

Originally assigned to: @maziggy on GitHub.

Problem

Currently each request to /api/v1/printers/{id}/camera/stream opens an
independent connection to the physical printer:

  • RTSP models (X1, P2S, H2): spawns a new ffmpeg process per viewer
  • Chamber image models (A1, P1P, P1S): opens a new TLS socket to port 6000 per viewer

Most Bambu Lab printers only support one simultaneous camera connection.
The second viewer gets a failed/empty stream while the first is connected.

Relevant code: backend/app/api/routes/camera.py_active_streams is keyed
by stream_id (unique per viewer), not printer_id. There is no shared
broadcast mechanism.

Proposed Solution

Add an in-process fan-out broadcaster per printer: BamBuddy opens exactly
one upstream connection, buffers each frame, and pushes copies to N subscriber
queues. New viewers tap the existing connection; no new printer connection is made.

1. Broadcaster class (backend/app/services/camera_fanout.py)

import asyncio

class MjpegBroadcaster:
    """Hold one upstream MJPEG connection, fan frames out to N subscribers."""

    def __init__(self):
        self._subscribers: list[asyncio.Queue] = []
        self._lock = asyncio.Lock()

    async def subscribe(self) -> asyncio.Queue:
        q: asyncio.Queue = asyncio.Queue(maxsize=4)
        async with self._lock:
            self._subscribers.append(q)
        return q

    async def unsubscribe(self, q: asyncio.Queue) -> None:
        async with self._lock:
            self._subscribers.remove(q)

    async def broadcast(self, frame: bytes) -> None:
        async with self._lock:
            dead = []
            for q in self._subscribers:
                try:
                    q.put_nowait(frame)
                except asyncio.QueueFull:
                    dead.append(q)  # slow viewer — drop frame, don't block others
            for q in dead:
                self._subscribers.remove(q)

    def subscriber_count(self) -> int:
        return len(self._subscribers)


# Global registry: printer_id → MjpegBroadcaster
_broadcasters: dict[int, MjpegBroadcaster] = {}
_broadcaster_tasks: dict[int, asyncio.Task] = {}

2. Single upstream pump per printer

async def _pump_rtsp(printer_id: int, broadcaster: MjpegBroadcaster,
                     ip: str, access_code: str, model: str | None):
    async for frame in generate_rtsp_mjpeg_stream(ip, access_code, model):
        await broadcaster.broadcast(frame)

async def _pump_chamber(printer_id: int, broadcaster: MjpegBroadcaster,
                        ip: str, access_code: str, model: str | None):
    async for frame in generate_chamber_mjpeg_stream(ip, access_code):
        await broadcaster.broadcast(frame)

3. Updated stream endpoint

@router.get("/{printer_id}/camera/stream")
async def stream_camera(
    printer_id: int,
    request: Request,
    db: AsyncSession = Depends(get_db),
    _token=Depends(RequireCameraStreamTokenIfAuthEnabled),
):
    printer = await get_printer_or_404(printer_id, db)

    broadcaster = _broadcasters.setdefault(printer_id, MjpegBroadcaster())

    # Start upstream pump only if not already running
    if printer_id not in _broadcaster_tasks or _broadcaster_tasks[printer_id].done():
        pump = _pump_rtsp if supports_rtsp(printer.model) else _pump_chamber
        _broadcaster_tasks[printer_id] = asyncio.create_task(
            pump(printer_id, broadcaster, printer.ip_address, printer.access_code, printer.model)
        )

    q = await broadcaster.subscribe()

    async def generate():
        try:
            while True:
                if await request.is_disconnected():
                    break
                frame = await asyncio.wait_for(q.get(), timeout=30.0)
                yield (
                    b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"
                    + frame + b"\r\n"
                )
        finally:
            await broadcaster.unsubscribe(q)

    return StreamingResponse(
        generate(),
        media_type="multipart/x-mixed-replace; boundary=frame",
    )

Expected Behaviour After Fix

Viewers Before After
1 stream works stream works
2+ 2nd viewer fails / printer drops all viewers receive same stream
0 n/a upstream connection closes automatically

Printer models affected

All models — most visibly A1, P1P, P1S (chamber image protocol) and P2S
(RTSP, single session). X1C may tolerate two connections on some firmware
versions but is still affected.

Implementation notes

  • QueueFull on slow viewers drops frames rather than blocking fast ones — correct behaviour for live video
  • The pump task should auto-cancel when subscriber_count() == 0 to avoid holding an idle connection to the printer
  • _last_frames[printer_id] for snapshot capture can be populated by the broadcaster pump instead of per-viewer generators, simplifying existing code
Originally created by @swheettaos on GitHub (Apr 22, 2026). Original GitHub issue: https://github.com/maziggy/bambuddy/issues/1089 Originally assigned to: @maziggy on GitHub. ## Problem Currently each request to `/api/v1/printers/{id}/camera/stream` opens an independent connection to the physical printer: - **RTSP models (X1, P2S, H2):** spawns a new `ffmpeg` process per viewer - **Chamber image models (A1, P1P, P1S):** opens a new TLS socket to port 6000 per viewer Most Bambu Lab printers only support **one simultaneous camera connection**. The second viewer gets a failed/empty stream while the first is connected. Relevant code: `backend/app/api/routes/camera.py` — `_active_streams` is keyed by `stream_id` (unique per viewer), not `printer_id`. There is no shared broadcast mechanism. ## Proposed Solution Add an in-process **fan-out broadcaster** per printer: BamBuddy opens exactly one upstream connection, buffers each frame, and pushes copies to N subscriber queues. New viewers tap the existing connection; no new printer connection is made. ### 1. Broadcaster class (`backend/app/services/camera_fanout.py`) ```python import asyncio class MjpegBroadcaster: """Hold one upstream MJPEG connection, fan frames out to N subscribers.""" def __init__(self): self._subscribers: list[asyncio.Queue] = [] self._lock = asyncio.Lock() async def subscribe(self) -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue(maxsize=4) async with self._lock: self._subscribers.append(q) return q async def unsubscribe(self, q: asyncio.Queue) -> None: async with self._lock: self._subscribers.remove(q) async def broadcast(self, frame: bytes) -> None: async with self._lock: dead = [] for q in self._subscribers: try: q.put_nowait(frame) except asyncio.QueueFull: dead.append(q) # slow viewer — drop frame, don't block others for q in dead: self._subscribers.remove(q) def subscriber_count(self) -> int: return len(self._subscribers) # Global registry: printer_id → MjpegBroadcaster _broadcasters: dict[int, MjpegBroadcaster] = {} _broadcaster_tasks: dict[int, asyncio.Task] = {} ``` ### 2. Single upstream pump per printer ```python async def _pump_rtsp(printer_id: int, broadcaster: MjpegBroadcaster, ip: str, access_code: str, model: str | None): async for frame in generate_rtsp_mjpeg_stream(ip, access_code, model): await broadcaster.broadcast(frame) async def _pump_chamber(printer_id: int, broadcaster: MjpegBroadcaster, ip: str, access_code: str, model: str | None): async for frame in generate_chamber_mjpeg_stream(ip, access_code): await broadcaster.broadcast(frame) ``` ### 3. Updated stream endpoint ```python @router.get("/{printer_id}/camera/stream") async def stream_camera( printer_id: int, request: Request, db: AsyncSession = Depends(get_db), _token=Depends(RequireCameraStreamTokenIfAuthEnabled), ): printer = await get_printer_or_404(printer_id, db) broadcaster = _broadcasters.setdefault(printer_id, MjpegBroadcaster()) # Start upstream pump only if not already running if printer_id not in _broadcaster_tasks or _broadcaster_tasks[printer_id].done(): pump = _pump_rtsp if supports_rtsp(printer.model) else _pump_chamber _broadcaster_tasks[printer_id] = asyncio.create_task( pump(printer_id, broadcaster, printer.ip_address, printer.access_code, printer.model) ) q = await broadcaster.subscribe() async def generate(): try: while True: if await request.is_disconnected(): break frame = await asyncio.wait_for(q.get(), timeout=30.0) yield ( b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n" ) finally: await broadcaster.unsubscribe(q) return StreamingResponse( generate(), media_type="multipart/x-mixed-replace; boundary=frame", ) ``` ## Expected Behaviour After Fix | Viewers | Before | After | |---------|--------|-------| | 1 | ✅ stream works | ✅ stream works | | 2+ | ❌ 2nd viewer fails / printer drops | ✅ all viewers receive same stream | | 0 | n/a | upstream connection closes automatically | ## Printer models affected All models — most visibly A1, P1P, P1S (chamber image protocol) and P2S (RTSP, single session). X1C may tolerate two connections on some firmware versions but is still affected. ## Implementation notes - `QueueFull` on slow viewers drops frames rather than blocking fast ones — correct behaviour for live video - The pump task should auto-cancel when `subscriber_count() == 0` to avoid holding an idle connection to the printer - `_last_frames[printer_id]` for snapshot capture can be populated by the broadcaster pump instead of per-viewer generators, simplifying existing code
BreizhHardware 2026-05-07 00:13:42 +02:00
Author
Owner

@swheet-etr commented on GitHub (Apr 22, 2026):

I got around this by creating a seperate pod in my cluster that uses nginx with go2rtc sidecar and intercepts the api call (I have a seperate public URL for just viewing). It works but I'd rather not have to do that. Native support would be better.

the nginx also ensures only certian API's are accessible and also limits api calls to "get". I want a secure locked down anomymous "viewer" .. it works for me .. the only problem I had was the multi-user view stream on a P1S ..

THANK YOU for all the hard work on this!

<!-- gh-comment-id:4299929122 --> @swheet-etr commented on GitHub (Apr 22, 2026): I got around this by creating a seperate pod in my cluster that uses nginx with go2rtc sidecar and intercepts the api call (I have a seperate public URL for just viewing). It works but I'd rather not have to do that. Native support would be better. the nginx also ensures only certian API's are accessible and also limits api calls to "get". I want a secure locked down anomymous "viewer" .. it works for me .. the only problem I had was the multi-user view stream on a P1S .. THANK YOU for all the hard work on this!
Author
Owner

@maziggy commented on GitHub (Apr 25, 2026):

Available/Fixed in branch dev and available with the next release or daily build. Please let me know if it works for you.


If you find Bambuddy useful, please consider giving it a on GitHub — it helps others discover the project!

<!-- gh-comment-id:4318497960 --> @maziggy commented on GitHub (Apr 25, 2026): Available/Fixed in branch dev and available with the next release or daily build. Please let me know if it works for you. ----- If you find Bambuddy useful, please consider giving it a ⭐ on [GitHub](https://github.com/maziggy/bambuddy) — it helps others discover the project!
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/bambuddy-maziggy-1#775
No description provided.