Authenticate and manage tokens¶
Use this guide for common authentication patterns in the quilt_hp library. For background on why the token resolution works this way, see Authentication and token lifecycle.
Perform a first-time OTP login¶
To authenticate with a fresh account (no cached tokens):
- Import
QuiltClientand instantiate it with your email:
from quilt_hp import QuiltClient
async with QuiltClient("you@example.com") as client:
await client.login(otp_callback=lambda email: input(f"OTP for {email}: "))
-
Run the script. Quilt sends a one-time password to your email address.
-
Type the OTP at the prompt. Authentication succeeds and a Cognito session is established.
The otp_callback can be async:
async def prompt_otp(email: str) -> str:
return input(f"Enter OTP for {email}: ")
await client.login(otp_callback=prompt_otp)
Persist tokens across processes with FileStore¶
To avoid the OTP prompt on every run:
- Import
FileStoreand pass it astoken_store:
from quilt_hp import QuiltClient
from quilt_hp.cli.store import FileStore
store = FileStore() # saves to ~/.config/quilt-hp/tokens.json
async with QuiltClient("you@example.com", token_store=store) as client:
await client.login(otp_callback=lambda email: input(f"OTP for {email}: "))
# Tokens are now saved; next run will not prompt
-
On subsequent runs, call
login()with the same store. If the cached access token is still valid it is reused immediately. If it is expired but the refresh token is valid, the library performs a silent refresh. No OTP is required. -
To clear cached tokens:
Implement a custom token store (Redis example)¶
To integrate with a custom backend:
- Implement the
TokenStoreprotocol withasync loadandasync savemethods:
from quilt_hp.tokens import TokenStore, CachedTokens
import json
class RedisTokenStore:
def __init__(self, redis_client) -> None:
self._redis = redis_client
async def load(self, email: str) -> CachedTokens | None:
raw = await self._redis.get(f"quilt_token:{email}")
if raw is None:
return None
data = json.loads(raw)
return CachedTokens(
id_token=data["id_token"],
refresh_token=data["refresh_token"],
expires_at=data["expires_at"],
)
async def save(self, email: str, tokens: CachedTokens) -> None:
data = {
"id_token": tokens.id_token,
"refresh_token": tokens.refresh_token,
"expires_at": tokens.expires_at,
}
await self._redis.set(
f"quilt_token:{email}",
json.dumps(data),
ex=86400,
)
- Pass the store to
QuiltClient:
store = RedisTokenStore(redis_client)
async with QuiltClient("you@example.com", token_store=store) as client:
await client.login(otp_callback=...)
For a synchronous store (e.g., wrapping a blocking database call), implement load and save as regular (non-async) methods. The library wraps synchronous stores with asyncio.to_thread automatically.
For the complete TokenStore and LegacyTokenStore protocol definitions, see Token management reference.
Handle token refresh failures in a daemon¶
To prevent a daemon from hanging on an OTP prompt it cannot display:
- Implement
TokenRefreshPolicyto returnRefreshFailureAction.RAISEon failure:
from quilt_hp.tokens import TokenRefreshPolicy, TokenRefreshContext, RefreshFailureAction
import logging
logger = logging.getLogger(__name__)
class DaemonRefreshPolicy:
def on_refresh_failure(
self, context: TokenRefreshContext, error: Exception
) -> RefreshFailureAction:
logger.error(
"Token refresh failed (source=%s attempt=%d): %s",
context.source, context.attempt, error,
)
return RefreshFailureAction.RAISE
- Pass it to
QuiltClient:
async with QuiltClient(
"you@example.com",
token_store=store,
token_refresh_policy=DaemonRefreshPolicy(),
) as client:
await client.login()
...
With RAISE, a refresh failure propagates as QuiltAuthError and exits the async with block. A process supervisor (systemd, Docker restart policy) then restarts the daemon, which retries the OTP flow from scratch if the refresh token itself has expired.
Use token refresh lifecycle hooks for logging¶
To observe token refresh events (for logging or metrics):
- Implement
TokenRefreshHooks:
from quilt_hp.tokens import TokenRefreshHooks, TokenRefreshContext, CachedTokens
import logging
logger = logging.getLogger(__name__)
class LoggingRefreshHooks:
async def on_refresh_start(self, context: TokenRefreshContext) -> None:
logger.info(
"Token refresh starting: reason=%s source=%s",
context.reason, context.source,
)
async def on_refresh_success(self, context: TokenRefreshContext, tokens: CachedTokens) -> None:
logger.info("Token refreshed. Expires at %s", tokens.expires_at)
async def on_refresh_failure(self, context: TokenRefreshContext, error: Exception) -> None:
logger.error("Token refresh failed: %s", error)
- Pass it to
QuiltClient:
client = QuiltClient(
"you@example.com",
token_store=store,
token_refresh_hooks=LoggingRefreshHooks(),
)
Hooks are called in addition to the refresh policy, not instead of it. If a hook raises, the exception propagates, so keep hooks lightweight and avoid raising from them.
For the complete hook and policy protocol definitions, see Token management reference.