from pytz import utc
from jwt import decode
from jwt.exceptions import InvalidTokenError
from datetime import datetime
from urllib.parse import urlparse
from typing import Union, Dict, Optional

from stellar_sdk import Keypair
from stellar_sdk.exceptions import Ed25519PublicKeyInvalidError

from polaris import settings

[docs]class SEP10Token: """ An object representing the authenticated session of the client. This object will be passed to every integration function that is called within the a request containing the JWT in the `Authorization` header. """ _REQUIRED_FIELDS = {"iss", "sub", "iat", "exp"} def __init__(self, jwt: Union[str, Dict]): if isinstance(jwt, str): try: jwt = decode(jwt, settings.SERVER_JWT_KEY, algorithms=["HS256"]) except InvalidTokenError: raise ValueError("unable to decode jwt") elif not isinstance(jwt, Dict): raise ValueError( "invalid type for 'jwt' parameter: must be a string or dictionary" ) if not self._REQUIRED_FIELDS.issubset(set(jwt.keys())): raise ValueError( f"jwt is missing one of the required fields: {', '.join(self._REQUIRED_FIELDS)}" ) try: Keypair.from_public_key(jwt["sub"]) except Ed25519PublicKeyInvalidError: raise ValueError(f"invalid Stellar public key: {jwt['sub']}") try: iat = datetime.fromtimestamp(jwt["iat"], tz=utc) except (OSError, ValueError, OverflowError): raise ValueError("invalid iat value") try: exp = datetime.fromtimestamp(jwt["exp"], tz=utc) except (OSError, ValueError, OverflowError): raise ValueError("invalid exp value") now = if now < iat or now > exp: raise ValueError("jwt is no longer valid") client_domain = jwt.get("client_domain") if ( client_domain and urlparse(f"https://{client_domain}").netloc != client_domain ): raise ValueError("'client_domain' must be a hostname") self._payload = jwt @property def account(self) -> str: """ The G-address specified in the payload's ``sub`` value """ return self._payload["sub"] @property def issuer(self) -> str: """ The principal that issued a token, RFC7519, Section 4.1.1 — a Uniform Resource Identifier (URI) for the issuer ( or """ return self._payload["iss"] @property def issued_at(self) -> datetime: """ The time at which the JWT was issued RFC7519, Section 4.1.6 - represented as a UTC datetime object """ return datetime.fromtimestamp(self._payload["iat"], tz=utc) @property def expires_at(self) -> datetime: """ The expiration time on or after which the JWT will not accepted for processing, RFC7519, Section 4.1.4 — represented as a UTC datetime object """ return datetime.fromtimestamp(self._payload["exp"], tz=utc) @property def client_domain(self) -> Optional[str]: """ A nonstandard JWT claim containing the client's home domain, included if the challenge transaction contained a ``client_domain`` ManageData operation """ return self._payload.get("client_domain") @property def payload(self) -> dict: """ The decoded contents of the JWT string """ return self._payload