Working with Controllers and Repositories#

We’ve been working our way up the stack, starting with the database models, and now we are ready to use the repository in an actual route. Let’s see how we can use this in a controller.

Tip

The full code for this tutorial can be found below in the Full Code section.

First, we create a simple function that returns an instance of AuthorRepository. This function will be used to inject a repository instance into our controller routes. Note that we are only passing in the database session in this example with no other parameters.

app.py#
1async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
2    """This provides the default Authors repository."""
3    return AuthorRepository(session=db_session)

Because we’ll be using the SQLAlchemy plugin in Litestar, the session is automatically configured as a dependency.

By default, the repository doesn’t add any additional query options to your base statement, but provides the flexibility to override it, allowing you to pass your own statement:

app.py#
1# we can optionally override the default `select` used for the repository to pass in
2# specific SQL options such as join details
3async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
4    """This provides a simple example demonstrating how to override the join options for the repository."""
5    return AuthorRepository(
6        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
7        session=db_session,
8    )

In this instance, we enhance the repository function by adding a selectinload option. This option configures the specified relationship to load via SELECT .. IN … loading pattern, optimizing the query execution.

Next, we define the AuthorController. This controller exposes five routes for interacting with the Author model:

AuthorController (click to toggle)
app.py#
 1    @get(path="/authors")
 2    async def list_authors(
 3        self,
 4        authors_repo: AuthorRepository,
 5        limit_offset: filters.LimitOffset,
 6    ) -> OffsetPagination[Author]:
 7        """List authors."""
 8        results, total = await authors_repo.list_and_count(limit_offset)
 9        type_adapter = TypeAdapter(list[Author])
10        return OffsetPagination[Author](
11            items=type_adapter.validate_python(results),
12            total=total,
13            limit=limit_offset.limit,
14            offset=limit_offset.offset,
15        )
16
17    @post(path="/authors")
18    async def create_author(
19        self,
20        authors_repo: AuthorRepository,
21        data: AuthorCreate,
22    ) -> Author:
23        """Create a new author."""
24        obj = await authors_repo.add(
25            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
26        )
27        await authors_repo.session.commit()
28        return Author.model_validate(obj)
29
30    # we override the authors_repo to use the version that joins the Books in
31    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
32    async def get_author(
33        self,
34        authors_repo: AuthorRepository,
35        author_id: Annotated[
36            UUID,
37            PathParameter(
38                title="Author ID",
39                description="The author to retrieve.",
40            ),
41        ],
42    ) -> Author:
43        """Get an existing author."""
44        obj = await authors_repo.get(author_id)
45        return Author.model_validate(obj)
46
47    @patch(
48        path="/authors/{author_id:uuid}",
49        dependencies={"authors_repo": Provide(provide_author_details_repo)},
50    )
51    async def update_author(
52        self,
53        authors_repo: AuthorRepository,
54        data: AuthorUpdate,
55        author_id: Annotated[
56            UUID,
57            PathParameter(
58                title="Author ID",
59                description="The author to update.",
60            ),
61        ],
62    ) -> Author:
63        """Update an author."""
64        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
65        raw_obj.update({"id": author_id})
66        obj = await authors_repo.update(AuthorModel(**raw_obj))
67        await authors_repo.session.commit()
68        return Author.from_orm(obj)
69
70    @delete(path="/authors/{author_id:uuid}")
71    async def delete_author(
72        self,
73        authors_repo: AuthorRepository,
74        author_id: Annotated[
75            UUID,
76            PathParameter(
77                title="Author ID",
78                description="The author to delete.",
79            ),
80        ],

In our list detail endpoint, we use the pagination filter for limiting the amount of data returned, allowing us to retrieve large datasets in smaller, more manageable chunks.

In the above examples, we’ve used the asynchronous repository implementation. However, Litestar also supports synchronous database drivers with an identical implementation. Here’s a corresponding synchronous version of the previous example:

Synchronous Repository (click to toggle)
app.py#
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING, Annotated
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey, select
 10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
 11
 12from litestar import Litestar, get
 13from litestar.controller import Controller
 14from litestar.di import Provide
 15from litestar.handlers.http_handlers.decorators import delete, patch, post
 16from litestar.pagination import OffsetPagination
 17from litestar.params import PathParameter, QueryParameter
 18from litestar.plugins.sqlalchemy import (
 19    SQLAlchemyInitPlugin,
 20    SQLAlchemySyncConfig,
 21    base,
 22    repository,
 23)
 24from litestar.repository.filters import LimitOffset
 25
 26if TYPE_CHECKING:
 27    from sqlalchemy.orm import Session
 28
 29
 30class BaseModel(_BaseModel):
 31    """Extend Pydantic's BaseModel to enable ORM mode"""
 32
 33    model_config = {"from_attributes": True}
 34
 35
 36# The SQLAlchemy base includes a declarative model for you to use in your models.
 37# The `UUIDBase` class includes a `UUID` based primary key (`id`)
 38class AuthorModel(base.UUIDBase):
 39    # we can optionally provide the table name instead of auto-generating it
 40    __tablename__ = "author"  #  type: ignore[assignment]
 41    name: Mapped[str]
 42    dob: Mapped[date | None]
 43    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 44
 45
 46# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
 47# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
 48# record created, and `updated_at` is the last time the record was modified.
 49class BookModel(base.UUIDAuditBase):
 50    __tablename__ = "book"  #  type: ignore[assignment]
 51    title: Mapped[str]
 52    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 53    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 54
 55
 56# we will explicitly define the schema instead of using DTO objects for clarity.
 57
 58
 59class Author(BaseModel):
 60    id: UUID | None
 61    name: str
 62    dob: date | None = None
 63
 64
 65class AuthorCreate(BaseModel):
 66    name: str
 67    dob: date | None = None
 68
 69
 70class AuthorUpdate(BaseModel):
 71    name: str | None = None
 72    dob: date | None = None
 73
 74
 75class AuthorRepository(repository.SQLAlchemySyncRepository[AuthorModel]):
 76    """Author repository."""
 77
 78    model_type = AuthorModel
 79
 80
 81async def provide_authors_repo(db_session: Session) -> AuthorRepository:
 82    """This provides the default Authors repository."""
 83    return AuthorRepository(session=db_session)
 84
 85
 86# we can optionally override the default `select` used for the repository to pass in
 87# specific SQL options such as join details
 88async def provide_author_details_repo(db_session: Session) -> AuthorRepository:
 89    """This provides a simple example demonstrating how to override the join options
 90    for the repository."""
 91    return AuthorRepository(
 92        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
 93        session=db_session,
 94    )
 95
 96
 97def provide_limit_offset_pagination(
 98    current_page: Annotated[int, QueryParameter(name="currentPage", ge=1, required=False)] = 1,
 99    page_size: Annotated[int, QueryParameter(name="pageSize", ge=1, required=False)] = 10,
100) -> LimitOffset:
101    """Add offset/limit pagination.
102
103    Return type consumed by `Repository.apply_limit_offset_pagination()`.
104
105    Parameters
106    ----------
107    current_page : int
108        LIMIT to apply to select.
109    page_size : int
110        OFFSET to apply to select.
111    """
112    return LimitOffset(page_size, page_size * (current_page - 1))
113
114
115class AuthorController(Controller):
116    """Author CRUD"""
117
118    dependencies = {"authors_repo": Provide(provide_authors_repo, sync_to_thread=False)}
119
120    @get(path="/authors")
121    def list_authors(
122        self,
123        authors_repo: AuthorRepository,
124        limit_offset: LimitOffset,
125    ) -> OffsetPagination[Author]:
126        """List authors."""
127        results, total = authors_repo.list_and_count(limit_offset)
128        type_adapter = TypeAdapter(list[Author])
129        return OffsetPagination[Author](
130            items=type_adapter.validate_python(results),
131            total=total,
132            limit=limit_offset.limit,
133            offset=limit_offset.offset,
134        )
135
136    @post(path="/authors")
137    def create_author(
138        self,
139        authors_repo: AuthorRepository,
140        data: AuthorCreate,
141    ) -> Author:
142        """Create a new author."""
143        obj = authors_repo.add(
144            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
145        )
146        authors_repo.session.commit()
147        return Author.model_validate(obj)
148
149    # we override the authors_repo to use the version that joins the Books in
150    @get(
151        path="/authors/{author_id:uuid}",
152        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
153    )
154    def get_author(
155        self,
156        authors_repo: AuthorRepository,
157        author_id: Annotated[
158            UUID,
159            PathParameter(
160                title="Author ID",
161                description="The author to retrieve.",
162            ),
163        ],
164    ) -> Author:
165        """Get an existing author."""
166        obj = authors_repo.get(author_id)
167        return Author.model_validate(obj)
168
169    @patch(
170        path="/authors/{author_id:uuid}",
171        dependencies={"authors_repo": Provide(provide_author_details_repo, sync_to_thread=False)},
172    )
173    def update_author(
174        self,
175        authors_repo: AuthorRepository,
176        data: AuthorUpdate,
177        author_id: Annotated[
178            UUID,
179            PathParameter(
180                title="Author ID",
181                description="The author to update.",
182            ),
183        ],
184    ) -> Author:
185        """Update an author."""
186        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
187        raw_obj.update({"id": author_id})
188        obj = authors_repo.update(AuthorModel(**raw_obj))
189        authors_repo.session.commit()
190        return Author.model_validate(obj)
191
192    @delete(path="/authors/{author_id:uuid}")
193    def delete_author(
194        self,
195        authors_repo: AuthorRepository,
196        author_id: Annotated[
197            UUID,
198            PathParameter(
199                title="Author ID",
200                description="The author to delete.",
201            ),
202        ],
203    ) -> None:
204        """Delete a author from the system."""
205        _ = authors_repo.delete(author_id)
206        authors_repo.session.commit()
207
208
209sqlalchemy_config = SQLAlchemySyncConfig(connection_string="sqlite:///test.sqlite")  # Create 'db_session' dependency.
210sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
211
212
213def on_startup() -> None:
214    """Initializes the database."""
215    with sqlalchemy_config.get_engine().begin() as conn:
216        base.UUIDBase.metadata.create_all(conn)
217
218
219app = Litestar(
220    route_handlers=[AuthorController],
221    on_startup=[on_startup],
222    plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
223    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
224)

The examples above enable a feature-complete CRUD service that includes pagination! In the next section, we’ll explore how to extend the built-in repository to add additional functionality to our application.

Full Code#

Full Code (click to toggle)
app.py#
  1from __future__ import annotations
  2
  3from datetime import date
  4from typing import TYPE_CHECKING, Annotated
  5from uuid import UUID
  6
  7from pydantic import BaseModel as _BaseModel
  8from pydantic import TypeAdapter
  9from sqlalchemy import ForeignKey, select
 10from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
 11
 12from litestar import Litestar, get
 13from litestar.controller import Controller
 14from litestar.di import Provide
 15from litestar.handlers.http_handlers.decorators import delete, patch, post
 16from litestar.pagination import OffsetPagination
 17from litestar.params import PathParameter, QueryParameter
 18from litestar.plugins.sqlalchemy import (
 19    AsyncSessionConfig,
 20    SQLAlchemyAsyncConfig,
 21    SQLAlchemyInitPlugin,
 22    base,
 23    filters,
 24    repository,
 25)
 26
 27if TYPE_CHECKING:
 28    from sqlalchemy.ext.asyncio import AsyncSession
 29
 30
 31class BaseModel(_BaseModel):
 32    """Extend Pydantic's BaseModel to enable ORM mode"""
 33
 34    model_config = {"from_attributes": True}
 35
 36
 37# The SQLAlchemy base includes a declarative model for you to use in your models.
 38# The `UUIDBase` class includes a `UUID` based primary key (`id`)
 39class AuthorModel(base.UUIDBase):
 40    # we can optionally provide the table name instead of auto-generating it
 41    __tablename__ = "author"  #  type: ignore[assignment]
 42    name: Mapped[str]
 43    dob: Mapped[date | None]
 44    books: Mapped[list[BookModel]] = relationship(back_populates="author", lazy="noload")
 45
 46
 47# The `UUIDAuditBase` class includes the same UUID` based primary key (`id`) and 2
 48# additional columns: `created_at` and `updated_at`. `created_at` is a timestamp of when the
 49# record created, and `updated_at` is the last time the record was modified.
 50class BookModel(base.UUIDAuditBase):
 51    __tablename__ = "book"  #  type: ignore[assignment]
 52    title: Mapped[str]
 53    author_id: Mapped[UUID] = mapped_column(ForeignKey("author.id"))
 54    author: Mapped[AuthorModel] = relationship(lazy="joined", innerjoin=True, viewonly=True)
 55
 56
 57# we will explicitly define the schema instead of using DTO objects for clarity.
 58
 59
 60class Author(BaseModel):
 61    id: UUID | None
 62    name: str
 63    dob: date | None = None
 64
 65
 66class AuthorCreate(BaseModel):
 67    name: str
 68    dob: date | None = None
 69
 70
 71class AuthorUpdate(BaseModel):
 72    name: str | None = None
 73    dob: date | None = None
 74
 75
 76class AuthorRepository(repository.SQLAlchemyAsyncRepository[AuthorModel]):
 77    """Author repository."""
 78
 79    model_type = AuthorModel
 80
 81
 82async def provide_authors_repo(db_session: AsyncSession) -> AuthorRepository:
 83    """This provides the default Authors repository."""
 84    return AuthorRepository(session=db_session)
 85
 86
 87# we can optionally override the default `select` used for the repository to pass in
 88# specific SQL options such as join details
 89async def provide_author_details_repo(db_session: AsyncSession) -> AuthorRepository:
 90    """This provides a simple example demonstrating how to override the join options for the repository."""
 91    return AuthorRepository(
 92        statement=select(AuthorModel).options(selectinload(AuthorModel.books)),
 93        session=db_session,
 94    )
 95
 96
 97def provide_limit_offset_pagination(
 98    current_page: Annotated[int, QueryParameter(name="currentPage", ge=1, required=False)] = 1,
 99    page_size: Annotated[int, QueryParameter(name="pageSize", ge=1, required=False)] = 10,
100) -> filters.LimitOffset:
101    """Add offset/limit pagination.
102
103    Return type consumed by `Repository.apply_limit_offset_pagination()`.
104
105    Parameters
106    ----------
107    current_page : int
108        LIMIT to apply to select.
109    page_size : int
110        OFFSET to apply to select.
111    """
112    return filters.LimitOffset(page_size, page_size * (current_page - 1))
113
114
115class AuthorController(Controller):
116    """Author CRUD"""
117
118    dependencies = {"authors_repo": Provide(provide_authors_repo)}
119
120    @get(path="/authors")
121    async def list_authors(
122        self,
123        authors_repo: AuthorRepository,
124        limit_offset: filters.LimitOffset,
125    ) -> OffsetPagination[Author]:
126        """List authors."""
127        results, total = await authors_repo.list_and_count(limit_offset)
128        type_adapter = TypeAdapter(list[Author])
129        return OffsetPagination[Author](
130            items=type_adapter.validate_python(results),
131            total=total,
132            limit=limit_offset.limit,
133            offset=limit_offset.offset,
134        )
135
136    @post(path="/authors")
137    async def create_author(
138        self,
139        authors_repo: AuthorRepository,
140        data: AuthorCreate,
141    ) -> Author:
142        """Create a new author."""
143        obj = await authors_repo.add(
144            AuthorModel(**data.model_dump(exclude_unset=True, exclude_none=True)),
145        )
146        await authors_repo.session.commit()
147        return Author.model_validate(obj)
148
149    # we override the authors_repo to use the version that joins the Books in
150    @get(path="/authors/{author_id:uuid}", dependencies={"authors_repo": Provide(provide_author_details_repo)})
151    async def get_author(
152        self,
153        authors_repo: AuthorRepository,
154        author_id: Annotated[
155            UUID,
156            PathParameter(
157                title="Author ID",
158                description="The author to retrieve.",
159            ),
160        ],
161    ) -> Author:
162        """Get an existing author."""
163        obj = await authors_repo.get(author_id)
164        return Author.model_validate(obj)
165
166    @patch(
167        path="/authors/{author_id:uuid}",
168        dependencies={"authors_repo": Provide(provide_author_details_repo)},
169    )
170    async def update_author(
171        self,
172        authors_repo: AuthorRepository,
173        data: AuthorUpdate,
174        author_id: Annotated[
175            UUID,
176            PathParameter(
177                title="Author ID",
178                description="The author to update.",
179            ),
180        ],
181    ) -> Author:
182        """Update an author."""
183        raw_obj = data.model_dump(exclude_unset=True, exclude_none=True)
184        raw_obj.update({"id": author_id})
185        obj = await authors_repo.update(AuthorModel(**raw_obj))
186        await authors_repo.session.commit()
187        return Author.from_orm(obj)
188
189    @delete(path="/authors/{author_id:uuid}")
190    async def delete_author(
191        self,
192        authors_repo: AuthorRepository,
193        author_id: Annotated[
194            UUID,
195            PathParameter(
196                title="Author ID",
197                description="The author to delete.",
198            ),
199        ],
200    ) -> None:
201        """Delete a author from the system."""
202        _ = await authors_repo.delete(author_id)
203        await authors_repo.session.commit()
204
205
206session_config = AsyncSessionConfig(expire_on_commit=False)
207sqlalchemy_config = SQLAlchemyAsyncConfig(
208    connection_string="sqlite+aiosqlite:///test.sqlite", session_config=session_config
209)  # Create 'db_session' dependency.
210sqlalchemy_plugin = SQLAlchemyInitPlugin(config=sqlalchemy_config)
211
212
213async def on_startup() -> None:
214    """Initializes the database."""
215    async with sqlalchemy_config.get_engine().begin() as conn:
216        await conn.run_sync(base.UUIDBase.metadata.create_all)
217
218
219app = Litestar(
220    route_handlers=[AuthorController],
221    on_startup=[on_startup],
222    plugins=[SQLAlchemyInitPlugin(config=sqlalchemy_config)],
223    dependencies={"limit_offset": Provide(provide_limit_offset_pagination)},
224)