Skip to content

API Reference

AuthService

pico_auth.service.AuthService

Core auth operations.

Source code in pico_auth/service.py
@component
class AuthService:
    """Core auth operations."""

    def __init__(
        self,
        users: UserRepository,
        tokens: RefreshTokenRepository,
        groups: GroupRepository,
        service_tokens: ServiceTokenRepository,
        passwords: PasswordService,
        jwt_provider: JWTProvider,
        settings: AuthSettings,
    ):
        self._users = users
        self._tokens = tokens
        self._groups = groups
        self._service_tokens = service_tokens
        self._passwords = passwords
        self._jwt = jwt_provider
        self._settings = settings
        self._registration_enabled = settings.registration_enabled

    async def register(
        self,
        email: str,
        password: str,
        display_name: str,
        role: str = "viewer",
        _admin: bool = False,
    ) -> User:
        if not _admin and not self._registration_enabled:
            raise RegistrationDisabledError()
        existing = await self._users.find_by_email(email)
        if existing:
            raise UserExistsError(email)

        user = User(
            id=uuid4().hex[:12],
            email=email,
            display_name=display_name,
            password_hash=self._passwords.hash(password),
            role=role,
            org_id="default",
            status="active",
            created_at=_now_iso(),
        )
        await self._users.save(user)
        return user

    async def login(self, email: str, password: str) -> dict:
        user = await self._users.find_by_email(email)
        if not user or not self._passwords.verify(password, user.password_hash):
            raise InvalidCredentialsError()
        if user.status == "suspended":
            raise UserSuspendedError()

        await self._users.update_last_login(user.id, _now_iso())

        group_ids = await self._groups.get_group_ids_for_user(user.id)
        access_token = self._jwt.create_access_token(
            user.id,
            user.email,
            user.role,
            user.org_id,
            groups=group_ids,
        )
        raw_refresh = self._jwt.create_refresh_token()
        refresh = _build_refresh_token(
            user.id, raw_refresh, self._settings.refresh_token_expire_days
        )
        await self._tokens.save(refresh)

        return {
            "access_token": access_token,
            "refresh_token": raw_refresh,
            "token_type": "Bearer",
            "expires_in": self._settings.access_token_expire_minutes * 60,
        }

    async def refresh(self, raw_refresh_token: str) -> dict:
        stored = await self._tokens.find_by_hash(_hash_token(raw_refresh_token))
        if not stored:
            raise TokenInvalidError()

        if datetime.fromisoformat(stored.expires_at) < datetime.now(UTC):
            await self._tokens.delete_by_hash(stored.token_hash)
            raise TokenExpiredError()

        user = await self._users.find_by_id(stored.user_id)
        if not user:
            raise TokenInvalidError()

        # Rotate: delete old, create new
        await self._tokens.delete_by_hash(stored.token_hash)
        new_raw = self._jwt.create_refresh_token()
        new_refresh = _build_refresh_token(
            user.id, new_raw, self._settings.refresh_token_expire_days
        )
        await self._tokens.save(new_refresh)

        group_ids = await self._groups.get_group_ids_for_user(user.id)
        access_token = self._jwt.create_access_token(
            user.id,
            user.email,
            user.role,
            user.org_id,
            groups=group_ids,
        )
        return {
            "access_token": access_token,
            "refresh_token": new_raw,
            "token_type": "Bearer",
            "expires_in": self._settings.access_token_expire_minutes * 60,
        }

    async def get_profile(self, user_id: str) -> User:
        user = await self._users.find_by_id(user_id)
        if not user:
            raise UserNotFoundError(user_id)
        return user

    async def change_password(
        self,
        user_id: str,
        old_password: str,
        new_password: str,
    ) -> None:
        user = await self._users.find_by_id(user_id)
        if not user:
            raise UserNotFoundError(user_id)
        if not self._passwords.verify(old_password, user.password_hash):
            raise InvalidCredentialsError()
        await self._users.update_password(user_id, self._passwords.hash(new_password))
        await self._tokens.delete_by_user(user_id)

    async def list_users(self) -> list[User]:
        return await self._users.list_all()

    async def update_role(self, user_id: str, role: str) -> User:
        if role not in VALID_ROLES:
            raise AuthError(f"Invalid role: {role}")
        user = await self._users.update_role(user_id, role)
        if not user:
            raise UserNotFoundError(user_id)
        return user

    async def create_service_token(
        self,
        name: str,
        role: str = "operator",
        org_id: str = "default",
        description: str = "",
    ) -> dict:
        existing = await self._service_tokens.find_by_name(name)
        if existing:
            return {"id": existing.id, "name": existing.name, "already_exists": True}

        import secrets

        raw_token = f"pico_svc_{secrets.token_hex(32)}"
        token = ServiceToken(
            id=uuid4().hex[:12],
            name=name,
            token_hash=_hash_token(raw_token),
            role=role,
            org_id=org_id,
            description=description,
            created_at=_now_iso(),
        )
        await self._service_tokens.save(token)
        return {"id": token.id, "name": name, "token": raw_token, "already_exists": False}

    async def validate_service_token(self, raw_token: str) -> ServiceToken:
        token = await self._service_tokens.find_by_hash(_hash_token(raw_token))
        if not token:
            raise TokenInvalidError()
        return token

    async def revoke_service_token(self, name: str) -> bool:
        return await self._service_tokens.revoke(name, _now_iso())

    async def list_service_tokens(self) -> list[ServiceToken]:
        return await self._service_tokens.list_active()

    async def ensure_admin(self, email: str, password: str) -> None:
        """Create the initial admin if it does not exist."""
        existing = await self._users.find_by_email(email)
        if existing:
            return
        await self.register(email, password, "Admin", role="superadmin", _admin=True)

    async def admin_create_user(
        self,
        email: str,
        password: str,
        display_name: str = "",
        role: str = "viewer",
    ) -> User:
        if role not in VALID_ROLES:
            raise AuthError(f"Invalid role: {role}")
        return await self.register(email, password, display_name, role=role, _admin=True)

    async def admin_reset_password(self, user_id: str, new_password: str) -> None:
        user = await self._users.find_by_id(user_id)
        if not user:
            raise UserNotFoundError(user_id)
        await self._users.update_password(user_id, self._passwords.hash(new_password))
        await self._tokens.delete_by_user(user_id)

    def set_registration_enabled(self, enabled: bool) -> None:
        self._registration_enabled = enabled

    def is_registration_enabled(self) -> bool:
        return self._registration_enabled

ensure_admin(email, password) async

Create the initial admin if it does not exist.

Source code in pico_auth/service.py
async def ensure_admin(self, email: str, password: str) -> None:
    """Create the initial admin if it does not exist."""
    existing = await self._users.find_by_email(email)
    if existing:
        return
    await self.register(email, password, "Admin", role="superadmin", _admin=True)

JWTProvider

pico_auth.jwt_provider.JWTProvider

Creates and validates JWT tokens with auto-generated RSA or ML-DSA keys.

Source code in pico_auth/jwt_provider.py
@component
class JWTProvider:
    """Creates and validates JWT tokens with auto-generated RSA or ML-DSA keys."""

    def __init__(self, settings: AuthSettings):
        self._settings = settings
        self._algorithm = settings.algorithm
        self._kid = "pico-auth-1"

        if settings.is_pqc:
            self._secret_key, self._public_key_bytes = self._load_or_generate_pqc_keys()
            self._private_key = None
            self._public_key = None
        else:
            self._private_key, self._public_key = self._load_or_generate_rsa_keys()
            self._secret_key = None
            self._public_key_bytes = None

    def _load_or_generate_rsa_keys(self) -> tuple[str, str]:
        data_dir = self._settings.data_path
        data_dir.mkdir(parents=True, exist_ok=True)
        priv_path = self._settings.private_key_path
        pub_path = self._settings.public_key_path

        if priv_path.exists() and pub_path.exists():
            logger.info("Loaded RSA keys from %s", data_dir)
            return priv_path.read_text(), pub_path.read_text()

        key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        private_pem = key.private_bytes(
            serialization.Encoding.PEM,
            serialization.PrivateFormat.PKCS8,
            serialization.NoEncryption(),
        ).decode()
        public_pem = (
            key.public_key()
            .public_bytes(
                serialization.Encoding.PEM,
                serialization.PublicFormat.SubjectPublicKeyInfo,
            )
            .decode()
        )

        priv_path.write_text(private_pem)
        priv_path.chmod(0o600)
        pub_path.write_text(public_pem)
        logger.info("Generated RSA key pair at %s", data_dir)
        return private_pem, public_pem

    def _load_or_generate_pqc_keys(self) -> tuple[bytes, bytes]:
        data_dir = self._settings.data_path
        data_dir.mkdir(parents=True, exist_ok=True)
        secret_path = self._settings.pqc_key_path
        pub_path = self._settings.pqc_pub_path

        if secret_path.exists() and pub_path.exists():
            logger.info("Loaded %s keys from %s", self._algorithm, data_dir)
            return secret_path.read_bytes(), pub_path.read_bytes()

        oqs = _import_oqs()
        signer = oqs.Signature(self._algorithm)
        public_key = signer.generate_keypair()
        secret_key = signer.export_secret_key()

        secret_path.write_bytes(secret_key)
        secret_path.chmod(0o600)
        pub_path.write_bytes(public_key)
        logger.info("Generated %s key pair at %s", self._algorithm, data_dir)
        return secret_key, public_key

    def _sign_pqc(self, signing_input: bytes) -> bytes:
        oqs = _import_oqs()
        signer = oqs.Signature(self._algorithm, self._secret_key)
        return signer.sign(signing_input)

    def create_access_token(
        self,
        user_id: str,
        email: str,
        role: str,
        org_id: str,
        groups: list[str] | None = None,
    ) -> str:
        now = datetime.now(UTC)
        exp = now + timedelta(minutes=self._settings.access_token_expire_minutes)
        claims = {
            "sub": user_id,
            "email": email,
            "role": role,
            "org_id": org_id,
            "groups": groups or [],
            "iss": self._settings.issuer,
            "aud": self._settings.audience,
            "iat": int(now.timestamp()),
            "exp": int(exp.timestamp()),
            "jti": uuid4().hex[:12],
        }

        if self._settings.is_pqc:
            return self._encode_pqc_token(claims)

        return jwt.encode(
            claims,
            self._private_key,
            algorithm=self._algorithm,
            headers={"kid": self._kid},
        )

    def _encode_pqc_token(self, claims: dict) -> str:
        header = {"alg": self._algorithm, "typ": "JWT", "kid": self._kid}
        header_b64 = _b64url_encode(json.dumps(header, separators=(",", ":")).encode())
        payload_b64 = _b64url_encode(json.dumps(claims, separators=(",", ":")).encode())
        signing_input = f"{header_b64}.{payload_b64}".encode("ascii")
        signature = self._sign_pqc(signing_input)
        sig_b64 = _b64url_encode(signature)
        return f"{header_b64}.{payload_b64}.{sig_b64}"

    def create_refresh_token(self) -> str:
        return uuid4().hex

    def decode_access_token(self, token: str) -> dict:
        if self._settings.is_pqc:
            return self._decode_pqc_token(token)

        return jwt.decode(
            token,
            self._public_key,
            algorithms=[self._algorithm],
            audience=self._settings.audience,
            issuer=self._settings.issuer,
        )

    def _decode_pqc_token(self, token: str) -> dict:
        parts = token.split(".")
        if len(parts) != 3:
            raise ValueError("Malformed JWT: expected 3 parts")

        header_b64, payload_b64, sig_b64 = parts
        signing_input = f"{header_b64}.{payload_b64}".encode("ascii")
        signature = _b64url_decode(sig_b64)

        oqs = _import_oqs()
        verifier = oqs.Signature(self._algorithm)
        if not verifier.verify(signing_input, signature, self._public_key_bytes):
            raise ValueError("Invalid signature")

        claims = json.loads(_b64url_decode(payload_b64))
        if claims.get("exp") and time.time() > claims["exp"]:
            raise ValueError("Token has expired")
        if claims.get("iss") != self._settings.issuer:
            raise ValueError("Invalid issuer")

        token_aud = claims.get("aud")
        if isinstance(token_aud, list):
            if self._settings.audience not in token_aud:
                raise ValueError("Invalid audience")
        elif token_aud != self._settings.audience:
            raise ValueError("Invalid audience")

        return claims

    def jwks(self) -> dict:
        if self._settings.is_pqc:
            return self._jwks_pqc()
        return self._jwks_rsa()

    def _jwks_rsa(self) -> dict:
        pub_key = load_pem_public_key(self._public_key.encode())
        assert isinstance(pub_key, RSAPublicKey)
        numbers = pub_key.public_numbers()

        n_len = (numbers.n.bit_length() + 7) // 8
        e_len = max((numbers.e.bit_length() + 7) // 8, 1)

        return {
            "keys": [
                {
                    "kty": "RSA",
                    "kid": self._kid,
                    "use": "sig",
                    "alg": self._algorithm,
                    "n": _b64url_encode(numbers.n.to_bytes(n_len, "big")),
                    "e": _b64url_encode(numbers.e.to_bytes(e_len, "big")),
                }
            ],
        }

    def _jwks_pqc(self) -> dict:
        return {
            "keys": [
                {
                    "kty": "AKP",
                    "kid": self._kid,
                    "use": "sig",
                    "alg": self._algorithm,
                    "pub": _b64url_encode(self._public_key_bytes),
                }
            ],
        }

    def openid_configuration(self) -> dict:
        iss = self._settings.issuer
        return {
            "issuer": iss,
            "token_endpoint": f"{iss}/api/v1/auth/login",
            "jwks_uri": f"{iss}/api/v1/auth/jwks",
            "response_types_supported": ["token"],
            "subject_types_supported": ["public"],
            "id_token_signing_alg_values_supported": [self._algorithm],
        }

PasswordService

pico_auth.passwords.PasswordService

Bcrypt password hashing and verification.

Source code in pico_auth/passwords.py
@component
class PasswordService:
    """Bcrypt password hashing and verification."""

    def hash(self, password: str) -> str:
        return bcrypt.hashpw(
            password.encode("utf-8")[:72],
            bcrypt.gensalt(),
        ).decode("utf-8")

    def verify(self, password: str, password_hash: str) -> bool:
        try:
            return bcrypt.checkpw(
                password.encode("utf-8")[:72],
                password_hash.encode("utf-8"),
            )
        except (ValueError, TypeError):
            return False

UserRepository

pico_auth.repository.UserRepository

Data access for User entities.

Source code in pico_auth/repository.py
@component
class UserRepository:
    """Data access for User entities."""

    def __init__(self, sm: SessionManager):
        self._sm = sm

    async def find_by_email(self, email: str) -> User | None:
        async with self._sm.transaction() as session:
            result = await session.execute(
                select(User).where(User.email == email),
            )
            return result.scalar_one_or_none()

    async def find_by_id(self, user_id: str) -> User | None:
        async with self._sm.transaction() as session:
            return await session.get(User, user_id)

    async def save(self, user: User) -> None:
        async with self._sm.transaction() as session:
            merged = await session.merge(user)
            await session.flush()
            user.id = merged.id

    async def list_all(self) -> list[User]:
        async with self._sm.transaction(read_only=True) as session:
            result = await session.execute(select(User))
            return list(result.scalars().all())

    async def update_role(self, user_id: str, role: str) -> User | None:
        async with self._sm.transaction() as session:
            user = await session.get(User, user_id)
            if user:
                user.role = role
                await session.flush()
            return user

    async def update_password(self, user_id: str, password_hash: str) -> None:
        async with self._sm.transaction() as session:
            user = await session.get(User, user_id)
            if user:
                user.password_hash = password_hash
                await session.flush()

    async def update_last_login(self, user_id: str, timestamp: str) -> None:
        async with self._sm.transaction() as session:
            user = await session.get(User, user_id)
            if user:
                user.last_login_at = timestamp
                await session.flush()

RefreshTokenRepository

pico_auth.repository.RefreshTokenRepository

Data access for RefreshToken entities.

Source code in pico_auth/repository.py
@component
class RefreshTokenRepository:
    """Data access for RefreshToken entities."""

    def __init__(self, sm: SessionManager):
        self._sm = sm

    async def find_by_hash(self, token_hash: str) -> RefreshToken | None:
        async with self._sm.transaction() as session:
            result = await session.execute(
                select(RefreshToken).where(RefreshToken.token_hash == token_hash),
            )
            return result.scalar_one_or_none()

    async def save(self, token: RefreshToken) -> None:
        async with self._sm.transaction() as session:
            await session.merge(token)
            await session.flush()

    async def delete_by_user(self, user_id: str) -> None:
        async with self._sm.transaction() as session:
            await session.execute(
                delete(RefreshToken).where(RefreshToken.user_id == user_id),
            )

    async def delete_by_hash(self, token_hash: str) -> None:
        async with self._sm.transaction() as session:
            await session.execute(
                delete(RefreshToken).where(RefreshToken.token_hash == token_hash),
            )

GroupService

pico_auth.service.GroupService

Group management operations.

Source code in pico_auth/service.py
@component
class GroupService:
    """Group management operations."""

    def __init__(self, groups: GroupRepository, users: UserRepository):
        self._groups = groups
        self._users = users

    async def create_group(self, name: str, org_id: str, description: str = "") -> Group:
        existing = await self._groups.find_by_name_and_org(name, org_id)
        if existing:
            raise GroupExistsError(name)
        group = Group(
            id=uuid4().hex[:12],
            name=name,
            description=description,
            org_id=org_id,
            created_at=_now_iso(),
            updated_at=_now_iso(),
        )
        await self._groups.save(group)
        return group

    async def get_group(self, group_id: str) -> Group:
        group = await self._groups.find_by_id(group_id)
        if not group:
            raise GroupNotFoundError(group_id)
        return group

    async def list_groups(self, org_id: str) -> list[Group]:
        return await self._groups.list_by_org(org_id)

    async def update_group(
        self, group_id: str, name: str | None = None, description: str | None = None
    ) -> Group:
        group = await self._groups.find_by_id(group_id)
        if not group:
            raise GroupNotFoundError(group_id)
        if name is not None:
            group.name = name
        if description is not None:
            group.description = description
        group.updated_at = _now_iso()
        await self._groups.update(group)
        return group

    async def delete_group(self, group_id: str) -> None:
        group = await self._groups.find_by_id(group_id)
        if not group:
            raise GroupNotFoundError(group_id)
        await self._groups.delete(group_id)

    async def add_member(self, group_id: str, user_id: str) -> None:
        group = await self._groups.find_by_id(group_id)
        if not group:
            raise GroupNotFoundError(group_id)
        user = await self._users.find_by_id(user_id)
        if not user:
            raise UserNotFoundError(user_id)
        existing = await self._groups.find_member(group_id, user_id)
        if existing:
            raise MemberAlreadyInGroupError(user_id, group_id)
        member = GroupMember(
            group_id=group_id,
            user_id=user_id,
            joined_at=_now_iso(),
        )
        await self._groups.add_member(member)

    async def remove_member(self, group_id: str, user_id: str) -> None:
        existing = await self._groups.find_member(group_id, user_id)
        if not existing:
            raise MemberNotInGroupError(user_id, group_id)
        await self._groups.remove_member(group_id, user_id)

    async def get_members(self, group_id: str) -> list[GroupMember]:
        group = await self._groups.find_by_id(group_id)
        if not group:
            raise GroupNotFoundError(group_id)
        return await self._groups.list_members(group_id)

    async def get_group_ids_for_user(self, user_id: str) -> list[str]:
        return await self._groups.get_group_ids_for_user(user_id)

GroupRepository

pico_auth.repository.GroupRepository

Data access for Group and GroupMember entities.

Source code in pico_auth/repository.py
@component
class GroupRepository:
    """Data access for Group and GroupMember entities."""

    def __init__(self, sm: SessionManager):
        self._sm = sm

    async def find_by_id(self, group_id: str) -> Group | None:
        async with self._sm.transaction() as session:
            return await session.get(Group, group_id)

    async def find_by_name_and_org(self, name: str, org_id: str) -> Group | None:
        async with self._sm.transaction() as session:
            result = await session.execute(
                select(Group).where(Group.name == name, Group.org_id == org_id),
            )
            return result.scalar_one_or_none()

    async def save(self, group: Group) -> None:
        async with self._sm.transaction() as session:
            merged = await session.merge(group)
            await session.flush()
            group.id = merged.id

    async def list_by_org(self, org_id: str) -> list[Group]:
        async with self._sm.transaction(read_only=True) as session:
            result = await session.execute(
                select(Group).where(Group.org_id == org_id),
            )
            return list(result.scalars().all())

    async def update(self, group: Group) -> None:
        async with self._sm.transaction() as session:
            await session.merge(group)
            await session.flush()

    async def delete(self, group_id: str) -> None:
        async with self._sm.transaction() as session:
            await session.execute(
                delete(GroupMember).where(GroupMember.group_id == group_id),
            )
            await session.execute(
                delete(Group).where(Group.id == group_id),
            )

    async def add_member(self, member: GroupMember) -> None:
        async with self._sm.transaction() as session:
            await session.merge(member)
            await session.flush()

    async def remove_member(self, group_id: str, user_id: str) -> None:
        async with self._sm.transaction() as session:
            await session.execute(
                delete(GroupMember).where(
                    GroupMember.group_id == group_id,
                    GroupMember.user_id == user_id,
                ),
            )

    async def find_member(self, group_id: str, user_id: str) -> GroupMember | None:
        async with self._sm.transaction() as session:
            return await session.get(GroupMember, (group_id, user_id))

    async def list_members(self, group_id: str) -> list[GroupMember]:
        async with self._sm.transaction(read_only=True) as session:
            result = await session.execute(
                select(GroupMember).where(GroupMember.group_id == group_id),
            )
            return list(result.scalars().all())

    async def get_group_ids_for_user(self, user_id: str) -> list[str]:
        async with self._sm.transaction(read_only=True) as session:
            result = await session.execute(
                select(GroupMember.group_id).where(GroupMember.user_id == user_id),
            )
            return list(result.scalars().all())

AuthSettings

pico_auth.config.AuthSettings dataclass

Auth server settings from application.yaml / env vars.

Source code in pico_auth/config.py
@configured(target="self", prefix="auth", mapping="tree")
@dataclass
class AuthSettings:
    """Auth server settings from application.yaml / env vars."""

    data_dir: str = "~/.pico-auth"
    access_token_expire_minutes: int = 15
    refresh_token_expire_days: int = 7
    issuer: str = "http://localhost:8100"
    audience: str = "pico-bot"
    algorithm: str = "RS256"
    auto_create_admin: bool = True
    admin_email: str = "admin@pico.local"
    admin_password: str = "admin"
    registration_enabled: bool = True
    email_credentials_token: str = ""

    @property
    def is_pqc(self) -> bool:
        return self.algorithm in _PQC_ALGORITHMS

    @property
    def data_path(self) -> Path:
        return Path(self.data_dir).expanduser()

    @property
    def private_key_path(self) -> Path:
        return self.data_path / "private.pem"

    @property
    def public_key_path(self) -> Path:
        return self.data_path / "public.pem"

    @property
    def pqc_key_path(self) -> Path:
        return self.data_path / "pqc_secret.bin"

    @property
    def pqc_pub_path(self) -> Path:
        return self.data_path / "pqc_public.bin"

Models

pico_auth.models.User

Bases: AppBase

Source code in pico_auth/models.py
class User(AppBase):
    __tablename__ = "users"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    display_name: Mapped[str] = mapped_column(String(255), default="")
    password_hash: Mapped[str] = mapped_column(String(255))
    role: Mapped[str] = mapped_column(String(50), default="viewer")
    org_id: Mapped[str] = mapped_column(String(100), default="default")
    status: Mapped[str] = mapped_column(String(50), default="active")
    created_at: Mapped[str] = mapped_column(String(50))
    last_login_at: Mapped[str | None] = mapped_column(String(50), nullable=True)

pico_auth.models.RefreshToken

Bases: AppBase

Source code in pico_auth/models.py
class RefreshToken(AppBase):
    __tablename__ = "refresh_tokens"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    user_id: Mapped[str] = mapped_column(String(36), index=True)
    token_hash: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    expires_at: Mapped[str] = mapped_column(String(50))
    created_at: Mapped[str] = mapped_column(String(50))

pico_auth.models.Group

Bases: AppBase

Source code in pico_auth/models.py
class Group(AppBase):
    __tablename__ = "groups"

    id: Mapped[str] = mapped_column(String(36), primary_key=True)
    name: Mapped[str] = mapped_column(String(255), index=True)
    description: Mapped[str] = mapped_column(String(500), default="")
    org_id: Mapped[str] = mapped_column(String(100), index=True)
    created_at: Mapped[str] = mapped_column(String(50))
    updated_at: Mapped[str] = mapped_column(String(50))

pico_auth.models.GroupMember

Bases: AppBase

Source code in pico_auth/models.py
class GroupMember(AppBase):
    __tablename__ = "group_members"

    group_id: Mapped[str] = mapped_column(String(36), primary_key=True)
    user_id: Mapped[str] = mapped_column(String(36), primary_key=True, index=True)
    joined_at: Mapped[str] = mapped_column(String(50))

Errors

pico_auth.errors.AuthError

Bases: Exception

Source code in pico_auth/errors.py
4
5
6
7
class AuthError(Exception):
    def __init__(self, message: str):
        self.message = message
        super().__init__(message)

pico_auth.errors.UserExistsError

Bases: AuthError

Source code in pico_auth/errors.py
class UserExistsError(AuthError):
    def __init__(self, email: str):
        super().__init__(f"User already exists: {email}")

pico_auth.errors.InvalidCredentialsError

Bases: AuthError

Source code in pico_auth/errors.py
class InvalidCredentialsError(AuthError):
    def __init__(self):
        super().__init__("Invalid email or password")

pico_auth.errors.TokenExpiredError

Bases: AuthError

Source code in pico_auth/errors.py
class TokenExpiredError(AuthError):
    def __init__(self):
        super().__init__("Token has expired")

pico_auth.errors.TokenInvalidError

Bases: AuthError

Source code in pico_auth/errors.py
class TokenInvalidError(AuthError):
    def __init__(self):
        super().__init__("Invalid token")

pico_auth.errors.UserNotFoundError

Bases: AuthError

Source code in pico_auth/errors.py
class UserNotFoundError(AuthError):
    def __init__(self, user_id: str):
        super().__init__(f"User not found: {user_id}")

pico_auth.errors.InsufficientPermissionsError

Bases: AuthError

Source code in pico_auth/errors.py
class InsufficientPermissionsError(AuthError):
    def __init__(self):
        super().__init__("Insufficient permissions")

pico_auth.errors.UserSuspendedError

Bases: AuthError

Source code in pico_auth/errors.py
class UserSuspendedError(AuthError):
    def __init__(self):
        super().__init__("User account is suspended")

pico_auth.errors.GroupNotFoundError

Bases: AuthError

Source code in pico_auth/errors.py
class GroupNotFoundError(AuthError):
    def __init__(self, group_id: str):
        super().__init__(f"Group not found: {group_id}")

pico_auth.errors.GroupExistsError

Bases: AuthError

Source code in pico_auth/errors.py
class GroupExistsError(AuthError):
    def __init__(self, name: str):
        super().__init__(f"Group already exists: {name}")

pico_auth.errors.MemberAlreadyInGroupError

Bases: AuthError

Source code in pico_auth/errors.py
class MemberAlreadyInGroupError(AuthError):
    def __init__(self, user_id: str, group_id: str):
        super().__init__(f"User {user_id} is already in group {group_id}")

pico_auth.errors.MemberNotInGroupError

Bases: AuthError

Source code in pico_auth/errors.py
class MemberNotInGroupError(AuthError):
    def __init__(self, user_id: str, group_id: str):
        super().__init__(f"User {user_id} is not in group {group_id}")