Source code for polaris.sep10.token

from pytz import utc
from jwt import decode
from jwt.exceptions import InvalidTokenError
from datetime import datetime
from urllib.parse import urlparse
from typing import Union, Dict, Optional

from stellar_sdk import Keypair
from stellar_sdk.exceptions import Ed25519PublicKeyInvalidError

from polaris import settings


[docs]class SEP10Token: """ An object representing the authenticated session of the client. This object will be passed to every integration function that is called within the a request containing the JWT in the `Authorization` header. """ _REQUIRED_FIELDS = {"iss", "sub", "iat", "exp"} def __init__(self, jwt: Union[str, Dict]): if isinstance(jwt, str): try: jwt = decode(jwt, settings.SERVER_JWT_KEY, algorithms=["HS256"]) except InvalidTokenError: raise ValueError("unable to decode jwt") elif not isinstance(jwt, Dict): raise ValueError( "invalid type for 'jwt' parameter: must be a string or dictionary" ) if not self._REQUIRED_FIELDS.issubset(set(jwt.keys())): raise ValueError( f"jwt is missing one of the required fields: {', '.join(self._REQUIRED_FIELDS)}" ) try: Keypair.from_public_key(jwt["sub"]) except Ed25519PublicKeyInvalidError: raise ValueError(f"invalid Stellar public key: {jwt['sub']}") try: iat = datetime.fromtimestamp(jwt["iat"], tz=utc) except (OSError, ValueError, OverflowError): raise ValueError("invalid iat value") try: exp = datetime.fromtimestamp(jwt["exp"], tz=utc) except (OSError, ValueError, OverflowError): raise ValueError("invalid exp value") now = datetime.now(tz=utc) if now < iat or now > exp: raise ValueError("jwt is no longer valid") client_domain = jwt.get("client_domain") if ( client_domain and urlparse(f"https://{client_domain}").netloc != client_domain ): raise ValueError("'client_domain' must be a hostname") self._payload = jwt @property def account(self) -> str: """ The G-address specified in the payload's ``sub`` value """ return self._payload["sub"] @property def issuer(self) -> str: """ The principal that issued a token, RFC7519, Section 4.1.1 — a Uniform Resource Identifier (URI) for the issuer (https://example.com or https://example.com/G...) """ return self._payload["iss"] @property def issued_at(self) -> datetime: """ The time at which the JWT was issued RFC7519, Section 4.1.6 - represented as a UTC datetime object """ return datetime.fromtimestamp(self._payload["iat"], tz=utc) @property def expires_at(self) -> datetime: """ The expiration time on or after which the JWT will not accepted for processing, RFC7519, Section 4.1.4 — represented as a UTC datetime object """ return datetime.fromtimestamp(self._payload["exp"], tz=utc) @property def client_domain(self) -> Optional[str]: """ A nonstandard JWT claim containing the client's home domain, included if the challenge transaction contained a ``client_domain`` ManageData operation """ return self._payload.get("client_domain") @property def payload(self) -> dict: """ The decoded contents of the JWT string """ return self._payload