Skip to content

Stream real-time updates

Use this guide to subscribe to and process live HVAC events from the Quilt streaming API. For background on how the stream works, see The streaming protocol.


Subscribe to real-time updates for all entities

To open a stream and receive updates for every entity in your system:

from quilt_hp.models.space import Space
from quilt_hp.models.indoor_unit import IndoorUnit

snapshot = await client.get_snapshot()

def on_space(space: Space) -> None:
    merged = snapshot.apply_space(space)
    print(f"{merged.name}: {merged.state.ambient_temperature_c}°C")

def on_idu(idu: IndoorUnit) -> None:
    print(f"IDU {idu.id}: fan={idu.controls.fan_speed}")

async with client.stream(snapshot.stream_topics()) as stream:
    stream.on_space_update(on_space)
    stream.on_indoor_unit_update(lambda idu: print(snapshot.apply_indoor_unit(idu).id))
    stream.on_outdoor_unit_update(snapshot.apply_outdoor_unit)
    stream.on_controller_update(snapshot.apply_controller)
    stream.on_qsm_update(snapshot.apply_qsm)
    stream.on_remote_sensor_update(snapshot.apply_remote_sensor)
    stream.on_controller_remote_sensor_update(snapshot.apply_controller_remote_sensor)
    stream.on_software_update_info(lambda info: print(f"Update info: {info.id}"))
    stream.on_error(lambda e: print(f"Fatal error: {e}"))
    await asyncio.sleep(3600)  # run for 1 hour

snapshot.stream_topics() returns the full list of hds/<type>/<id> topic strings for every entity in your snapshot.


Merge stream updates into a snapshot

Stream events carry only the fields that changed. Each event is a sparse diff. Always merge into the snapshot to preserve unchanged fields:

snapshot = await client.get_snapshot()

def on_space(space: Space) -> None:
    # Without apply_space, hvac_mode and setpoints would appear as UNSPECIFIED/0
    merged = snapshot.apply_space(space)
    print(f"{merged.name}: mode={merged.controls.hvac_mode}, temp={merged.state.ambient_temperature_c}°C")

stream = client.stream(snapshot.stream_topics())
stream.on_space_update(on_space)
await stream.run_forever()

For indoor units:

def on_idu(idu: IndoorUnit) -> None:
    merged = snapshot.apply_indoor_unit(idu)
    print(f"{merged.id}: online={merged.is_online}")

For background on why sparse diffs require merging, see Snapshot and stream data model.


Callback registration methods

NotifierStream accepts both synchronous and async callbacks. Register whichever entity types you care about:

Method Callback argument Typical use
on_space_update() Space Merge room diffs with snapshot.apply_space()
on_indoor_unit_update() IndoorUnit Merge IDU diffs with snapshot.apply_indoor_unit()
on_outdoor_unit_update() OutdoorUnit Merge ODU diffs with snapshot.apply_outdoor_unit()
on_controller_update() Controller Merge Dial diffs with snapshot.apply_controller()
on_qsm_update() QuiltSmartModule Merge QSM diffs with snapshot.apply_qsm()
on_remote_sensor_update() RemoteSensor Merge standalone sensor diffs with snapshot.apply_remote_sensor()
on_controller_remote_sensor_update() ControllerRemoteSensor Merge Dial sensor diffs with snapshot.apply_controller_remote_sensor()
on_software_update_info() SoftwareUpdateInfo Observe firmware/software update records
on_error() Exception Handle fatal stream failure after reconnects are exhausted

Lifecycle methods

Use these methods to control the stream explicitly:

Method / property What it does
await stream.start() Starts the listener in the background
await stream.run_forever() Runs inline until cancelled or a fatal error stops it
await stream.stop() Cancels the background task and closes the stream
await stream.subscribe(topics) Adds topic subscriptions after startup
await stream.unsubscribe(topics) Removes topic subscriptions
stream.error Last fatal exception, or None while healthy

Run the stream as a background task

To run the stream while doing other work concurrently:

stream = client.stream(snapshot.stream_topics())
stream.on_space_update(on_space)
await stream.start()
try:
    result = await do_something_else()
    await asyncio.sleep(3600)
finally:
    await stream.stop()
    if stream.error is not None:
        print(f"Stream stopped with error: {stream.error}")

Use this pattern in integrations (Home Assistant, automation daemons) where the stream is just one part of a larger async application.


Run the stream as a blocking call

To block the current coroutine until the stream ends or encounters a fatal error:

stream = client.stream(snapshot.stream_topics())
stream.on_space_update(on_space)
stream.on_error(lambda e: print(f"Fatal: {e}"))
await stream.run_forever()  # blocks until cancelled or budget exhausted

Use this pattern in standalone scripts and CLI tools.


Handle dynamic subscription changes

To subscribe to additional topics while the stream is running:

async with client.stream(snapshot.stream_topics()) as stream:
    stream.on_space_update(on_space)
    await asyncio.sleep(5)

    # Add a new topic for a newly discovered entity
    await stream.subscribe(["hds/space/new-room-uuid"])

    # Remove a topic you no longer need
    await stream.unsubscribe(["hds/space/old-room-uuid"])

    await asyncio.sleep(3600)

Handle stream errors and reconnect

The stream reconnects automatically with exponential back-off (1 s, 2 s, 4 s, … up to a 60 s cap). Use on_error() or the error property to observe only fatal failures after the reconnect budget is exhausted. Configure the reconnect budget like this:

# Unlimited reconnects (default: -1)
stream = client.stream(snapshot.stream_topics(), max_reconnects=-1)

# Exactly 5 reconnect attempts before giving up
stream = client.stream(snapshot.stream_topics(), max_reconnects=5)

# Adjust the initial back-off delay
stream = client.stream(
    snapshot.stream_topics(),
    max_reconnects=-1,
    reconnect_delay_s=2.0,  # start at 2s, doubles to 60s cap
)

To observe fatal stream failures:

stream.on_error(lambda e: print(f"Fatal error (budget exhausted): {e}"))

await stream.run_forever()
if stream.error is not None:
    print(f"Last fatal error: {stream.error}")

on_error() is called only when the reconnect budget is exhausted. Until then, disconnects and transient errors trigger automatic reconnection without surfacing a fatal error to your callback.

For the full reconnect state machine, see The streaming protocol.