Build an automation daemon¶
This guide shows how to build a long-running Python daemon that reconnects automatically, responds to HVAC events with custom logic, and shuts down cleanly.
Process lifecycle¶
A well-behaved daemon follows this lifecycle:
On shutdown (SIGINT or SIGTERM), the daemon:
- Sets a stop event.
- Waits for the stream to drain in-flight callbacks.
- Stops the stream (sends gRPC close).
- Exits the
QuiltClientcontext manager (closes the channel).
Full daemon template¶
#!/usr/bin/env python3
"""
quilt-daemon — minimal production daemon template.
Set QUILT_EMAIL in the environment before running.
"""
from __future__ import annotations
import asyncio
import logging
import os
import signal
from quilt_hp import QuiltClient
from quilt_hp.cli.store import FileStore
from quilt_hp.models.space import Space
from quilt_hp.models.indoor_unit import IndoorUnit
from quilt_hp.models.system import SystemSnapshot
LOG = logging.getLogger("quilt-daemon")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
EMAIL = os.environ["QUILT_EMAIL"]
_stop = asyncio.Event()
_snapshot: SystemSnapshot | None = None
async def on_space_update(space: Space) -> None:
global _snapshot
if _snapshot is not None:
space = _snapshot.apply_space(space)
temp = (
f"{space.state.ambient_temperature_c:.1f}°C"
if space.state.ambient_temperature_c is not None
else "unknown"
)
LOG.info(
"[space] %s — mode=%s temp=%s",
space.name,
space.controls.hvac_mode.value,
temp,
)
if (
space.state.ambient_temperature_c is not None
and space.state.ambient_temperature_c > 27.0
and space.controls.hvac_mode.value in ("auto", "cool")
):
LOG.warning("[space] %s is above 27°C — check cooling", space.name)
async def on_idu_update(idu: IndoorUnit) -> None:
global _snapshot
if _snapshot is not None:
idu = _snapshot.apply_indoor_unit(idu)
LOG.debug("[idu] %s — fan=%s online=%s", idu.id, idu.controls.fan_speed.value, idu.is_online)
async def run() -> None:
global _snapshot
store = FileStore()
async with QuiltClient(EMAIL, token_store=store, snapshot_ttl_s=300) as client:
LOG.info("Logging in as %s", EMAIL)
await client.login()
_snapshot = await client.get_snapshot()
LOG.info(
"Snapshot loaded: system=%s rooms=%d idus=%d",
client.system_name,
len(_snapshot.rooms),
len(_snapshot.indoor_units),
)
topics = _snapshot.stream_topics()
stream = client.stream(topics, max_reconnects=-1, reconnect_delay_s=2.0)
stream.on_space_update(on_space_update)
stream.on_indoor_unit_update(on_idu_update)
stream.on_error(lambda exc: LOG.error("Stream stopped: %s", exc))
async with stream:
LOG.info("Daemon running. Send SIGINT or SIGTERM to stop.")
await _stop.wait()
LOG.info("Daemon stopped cleanly.")
def main() -> None:
loop = asyncio.new_event_loop()
def _handle_signal(sig: signal.Signals) -> None:
LOG.info("Received %s; shutting down…", sig.name)
loop.call_soon_threadsafe(_stop.set)
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _handle_signal, sig)
try:
loop.run_until_complete(run())
except Exception:
LOG.exception("Daemon exited with unhandled error")
raise
finally:
loop.close()
if __name__ == "__main__":
main()
Install as a systemd service¶
Create /etc/systemd/system/quilt-daemon.service:
[Unit]
Description=Quilt HP HVAC daemon
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=quilt
Environment=QUILT_EMAIL=you@example.com
ExecStart=/usr/local/bin/quilt-daemon
Restart=on-failure
RestartSec=10s
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Add a periodic snapshot refresh¶
To re-fetch the full snapshot periodically as a consistency check after reconnects:
async def refresh_loop(client: QuiltClient, interval_s: float = 300.0) -> None:
while not _stop.is_set():
await asyncio.sleep(interval_s)
if _stop.is_set():
break
try:
client.invalidate_snapshot()
fresh = await client.get_snapshot()
global _snapshot
_snapshot = fresh
LOG.info("Periodic snapshot refresh: %d rooms", len(fresh.rooms))
except Exception:
LOG.warning("Periodic snapshot refresh failed; will retry", exc_info=True)
Start it alongside the stream:
async with stream:
refresh_task = asyncio.create_task(refresh_loop(client))
await _stop.wait()
refresh_task.cancel()
try:
await refresh_task
except asyncio.CancelledError:
pass
Handle token expiry in long-running daemons¶
To make sure the daemon crashes, so systemd can restart it, instead of hanging on an OTP prompt after a token refresh failure:
from quilt_hp.tokens import TokenRefreshPolicy, TokenRefreshContext, RefreshFailureAction
class DaemonRefreshPolicy:
def on_refresh_failure(
self, context: TokenRefreshContext, error: Exception
) -> RefreshFailureAction:
LOG.error(
"Token refresh failed (attempt=%d source=%s): %s",
context.attempt, context.source, error,
)
return RefreshFailureAction.RAISE
async with QuiltClient(
EMAIL,
token_store=store,
token_refresh_policy=DaemonRefreshPolicy(),
) as client:
...
With RAISE, a refresh failure propagates as QuiltAuthError, exits run(), and lets systemd restart the process after RestartSec.
For more on token refresh policies, see Authenticate and manage tokens.