Source code for sanic_jwt_extended.tokens

import datetime
import uuid
from calendar import timegm
from typing import Callable, Dict, Union

import jwt
from sanic import Sanic

from sanic_jwt_extended.exceptions import JWTDecodeError


def _encode_jwt(
    additional_token_data: dict,
    expires_delta: datetime.timedelta,
    secret: str,
    algorithm: str,
    json_encoder: Callable[..., str],
) -> str:
    uid = str(uuid.uuid4())
    now = datetime.datetime.utcnow()
    token_data = {"iat": now, "nbf": now, "jti": uid}
    # If expires_delta is False, the JWT should never expire
    # and the 'exp' claim is not set.
    if expires_delta:
        token_data["exp"] = now + expires_delta
    token_data.update(additional_token_data)
    encoded_token = jwt.encode(
        token_data, secret, algorithm, json_encoder=json_encoder
    ).decode("utf-8")
    return encoded_token


[docs]async def encode_access_token( identity: str, secret: str, algorithm: str, expires_delta: datetime.timedelta, fresh: Union[datetime.timedelta, bool], user_claims: dict, role: str, identity_claim_key: str, user_claims_key: str, json_encoder: Callable[..., str] = None, ) -> str: """ Creates a new encoded (utf-8) access token. :param identity: Identifier for who this token is for (ex, username). This data must be json serializable :param secret: Secret key to encode the JWT with :param algorithm: Which algorithm to encode this JWT with :param expires_delta: How far in the future this token should expire (set to False to disable expiration) :type expires_delta: datetime.timedelta or False :param fresh: If this should be a 'fresh' token or not. If a datetime.timedelta is given this will indicate how long this token will remain fresh. :param user_claims: Custom claims to include in this token. This data must be json serializable :param role: A role field for RBAC :param identity_claim_key: Which key should be used to store the identity :param user_claims_key: Which key should be used to store the user claims :param json_encoder: json encoder :return: Encoded access token """ if isinstance(fresh, datetime.timedelta): now = datetime.datetime.utcnow() fresh = timegm((now + fresh).utctimetuple()) token_data = {identity_claim_key: identity, "fresh": fresh, "type": "access"} # Don't add extra data to the token if user_claims is empty. if user_claims: token_data[user_claims_key] = user_claims if role: token_data["role"] = role return _encode_jwt( token_data, expires_delta, secret, algorithm, json_encoder=json_encoder )
[docs]async def encode_refresh_token( identity, secret, algorithm, expires_delta, user_claims, identity_claim_key, user_claims_key, json_encoder=None, ): """ Creates a new encoded (utf-8) refresh token. :param identity: Some identifier used to identify the owner of this token :param secret: Secret key to encode the JWT with :param algorithm: Which algorithm to use for the toek :param expires_delta: How far in the future this token should expire (set to False to disable expiration) :type expires_delta: datetime.timedelta or False :param user_claims: Custom claims to include in this token. This data must be json serializable :param identity_claim_key: Which key should be used to store the identity :param user_claims_key: Which key should be used to store the user claims :param json_encoder: json encoder :return: Encoded refresh token """ token_data = {identity_claim_key: identity, "type": "refresh"} # Don't add extra data to the token if user_claims is empty. if user_claims: token_data[user_claims_key] = user_claims return _encode_jwt( token_data, expires_delta, secret, algorithm, json_encoder=json_encoder )
[docs]async def decode_jwt( encoded_token: str, secret: str, algorithm: str, identity_claim_key: str, user_claims_key: str, ) -> Dict: """ Decodes an encoded JWT :param encoded_token: The encoded JWT string to decode :param secret: Secret key used to encode the JWT :param algorithm: Algorithm used to encode the JWT :param identity_claim_key: expected key that contains the identity :param user_claims_key: expected key that contains the user claims :return: Dictionary containing contents of the JWT """ # This call verifies the ext, iat, and nbf claims data: dict = jwt.decode(encoded_token, secret, algorithms=[algorithm]) # Make sure that any custom claims we expect in the token are present if "jti" not in data: raise JWTDecodeError("Missing claim: jti") if identity_claim_key not in data: raise JWTDecodeError("Missing claim: {}".format(identity_claim_key)) if "type" not in data or data["type"] not in ("refresh", "access"): raise JWTDecodeError("Missing or invalid claim: type") if data["type"] == "access": if "fresh" not in data: raise JWTDecodeError("Missing claim: fresh") if user_claims_key not in data: data[user_claims_key] = {} return data
[docs]class Token: """ Token object that contains decoded token data and passed with kwargs to endpoint function """ data: dict app: Sanic def __init__(self, app: Sanic, token: dict): self.app = app self.data = token @property def raw_jwt(self) -> dict: """ :return: full jwt data in dictionary form """ return self.data @property def jwt_identity(self) -> Union[str, None]: """ :return: jwt identity claim data (or this can be None if data does not exist) """ return self.data.get(self.app.config.JWT_IDENTITY_CLAIM, None) @property def jwt_user_claims(self) -> Dict: """ :return: user claim data """ return self.data.get(self.app.config.JWT_USER_CLAIMS, {}) @property def jti(self) -> str: """ :return: jti data """ return self.data.get("jti", None)