Token management reference¶
This page is a complete reference for the token system: data types, store protocols, refresh hooks and policies, and custom token persistence.
Overview¶
Authentication uses AWS Cognito. After a successful login, the library holds two tokens:
- IdToken: A short-lived JWT (typically about 1 hour) sent as the
authorizationheader on every gRPC call. - RefreshToken: A long-lived token (weeks to months) used to renew the IdToken without user interaction.
The library never stores tokens globally. They live on the QuiltClient instance (self._token) and optionally in a TokenStore for persistence across processes. The FileStore in quilt_hp.cli.store is the CLI's implementation, but you can provide any store that implements the protocol.
CachedTokens¶
@dataclass(slots=True)
class CachedTokens:
id_token: str # Cognito IdToken (JWT)
refresh_token: str # Cognito RefreshToken
expires_at: float # Unix timestamp when id_token expires
@property
def is_expired(self) -> bool:
"""True if the IdToken should be treated as expired.
Applies a 5-minute safety buffer: returns True when
time.time() > expires_at - 300.
"""
CachedTokens is the data class that holds the token pair. expires_at is set from Cognito's ExpiresIn field (number of seconds until expiry) added to the current time at login or refresh time.
The 5-minute buffer in is_expired means the library proactively refreshes before actual expiry, preventing races where a token is valid at the start of an operation but expires before the RPC completes.
TokenStore (async)¶
class TokenStore(Protocol):
async def load(self, email: str) -> CachedTokens | None:
"""Return cached tokens for email, or None if absent / invalid."""
...
async def save(self, email: str, tokens: CachedTokens) -> None:
"""Persist tokens for email."""
...
The preferred protocol for new implementations. Both load and save are async coroutines. The store is keyed by email address, so a single store instance can manage tokens for multiple accounts.
LegacyTokenStore (sync)¶
class LegacyTokenStore(Protocol):
def load(self, email: str) -> CachedTokens | None: ...
def save(self, email: str, tokens: CachedTokens) -> None: ...
A compatibility protocol for synchronous stores. The library wraps synchronous calls with asyncio.to_thread() so they do not block the event loop.
TokenStoreLike¶
Type alias accepted by QuiltClient(token_store=...) and authenticate().
FileStore¶
FileStore in quilt_hp.cli.store is the CLI's implementation of TokenStore. It stores tokens as JSON in ~/.config/quilt-hp/tokens.json with chmod 0o600 (readable only by the owner).
from quilt_hp.cli.store import FileStore
store = FileStore()
tokens = await store.load("you@example.com") # CachedTokens | None
await store.save("you@example.com", tokens)
store.clear_tokens("you@example.com") # remove cached tokens
emails = store.list_emails() # all accounts with cached tokens
The JSON format is:
{
"you@example.com": {
"id_token": "eyJ...",
"refresh_token": "eyJ...",
"expires_at": 1720000000.0
}
}
Implementing a custom TokenStore¶
Implement TokenStore for any backend (database, Redis, AWS Secrets Manager, Home Assistant storage, system keychain, etc.):
from quilt_hp.tokens import TokenStore, CachedTokens
import json
class DatabaseTokenStore:
def __init__(self, db_session) -> None:
self._db = db_session
async def load(self, email: str) -> CachedTokens | None:
row = await self._db.fetch_one(
"SELECT id_token, refresh_token, expires_at FROM quilt_tokens WHERE email = :email",
{"email": email},
)
if row is None:
return None
return CachedTokens(
id_token=row["id_token"],
refresh_token=row["refresh_token"],
expires_at=row["expires_at"],
)
async def save(self, email: str, tokens: CachedTokens) -> None:
await self._db.execute(
"""
INSERT INTO quilt_tokens (email, id_token, refresh_token, expires_at)
VALUES (:email, :id_token, :refresh_token, :expires_at)
ON CONFLICT (email) DO UPDATE SET
id_token = EXCLUDED.id_token,
refresh_token = EXCLUDED.refresh_token,
expires_at = EXCLUDED.expires_at
""",
{
"email": email,
"id_token": tokens.id_token,
"refresh_token": tokens.refresh_token,
"expires_at": tokens.expires_at,
},
)
TokenRefreshHooks¶
class TokenRefreshHooks(Protocol):
async def on_refresh_start(self, context: TokenRefreshContext) -> None:
"""Called before attempting a token refresh."""
...
async def on_refresh_success(
self, context: TokenRefreshContext, tokens: CachedTokens
) -> None:
"""Called when refresh succeeds and new tokens are produced."""
...
async def on_refresh_failure(
self, context: TokenRefreshContext, error: Exception
) -> None:
"""Called when refresh fails."""
...
Hooks are called in authenticate() during the refresh path. They are useful for logging, metrics, and updating secondary caches. Hook failures are not caught, so if a hook raises, the exception propagates.
TokenRefreshPolicy¶
class TokenRefreshPolicy(Protocol):
def on_refresh_failure(
self, context: TokenRefreshContext, error: Exception
) -> RefreshFailureAction:
"""Return the action to take when token refresh fails."""
...
The policy is consulted after TokenRefreshHooks.on_refresh_failure. The return value determines what happens next:
RefreshFailureAction.FALLBACK_TO_OTP: Falls through to the OTP login flow. This is the default when no policy is configured, and it is not useful in daemon contexts where there is no OTP prompt.RefreshFailureAction.RAISE: Re-raises the refresh error immediately without attempting OTP.
TokenRefreshContext¶
@dataclass(slots=True, frozen=True)
class TokenRefreshContext:
reason: TokenRefreshReason
source: str
attempt: int = 1
Context object passed to refresh hooks and the refresh callback. Fields:
reason: Why the refresh is happening (see below).source: Module or location that triggered the refresh (for example,"authenticate","transport","streaming").attempt: Which attempt this is, starting at 1.
TokenRefreshReason¶
class TokenRefreshReason(StrEnum):
EXPIRED_CACHED_TOKEN = "expired_cached_token"
TRANSPORT_UNAUTHENTICATED = "transport_unauthenticated"
STREAM_UNAUTHENTICATED = "stream_unauthenticated"
EXPIRED_CACHED_TOKEN: The cached token expired, soauthenticate()is performing a proactive refresh.TRANSPORT_UNAUTHENTICATED: The gRPC transport interceptor receivedUNAUTHENTICATEDon a unary RPC.STREAM_UNAUTHENTICATED: TheNotifierStreamreceivedUNAUTHENTICATEDand is refreshing before reconnecting.
RefreshFailureAction¶
CurrentTokenProvider¶
class CurrentTokenProvider(Protocol):
def get_current_token(self) -> str:
"""Return the current authorization token."""
...
Protocol implemented by QuiltClient (get_current_token returns self._token). The transport interceptor calls get_current_token() on every outbound RPC. You can use this protocol to supply tokens from any source when using create_channel() directly.