Intro
In my previous blogpost, I compared services and command handlers.
In today’s blogpost, I want to show you how to use commands and command handlers in connection with CommandBus.
CommandBus is the last missing bit in this whole puzzle.
We will start with the most straightforward implementation, then I will show you how to use libraries available in the Python ecosystem.
Finally, I want to show you an end-to-end example starting from the HTTP endpoint and ending at the command handled
in the command handler with the usage of the command bus.
Before we jump straight to the code, I will explain what Command, CommandHandler, and CommandBus are.
What is a Command?
Command is a design pattern that encapsulates a request as an immutable object.
Can be executed immediately or at a later time or by different components within a system.
The Command pattern decouples the sender of a request from the receiver that performs the action.
It provides a way to encapsulate the details of an operation, such as the method to be called, the parameters required for the method, and the object that will execute the method.
This encapsulation allows the sender to be decoupled from the specifics of how the request is handled, making the system more flexible and extensible.
The most important! A command in software engineering is an object that represents an intention, instruction or action to be performed.
It typically includes all the necessary information for executing the intention.
By encapsulating requests as objects, commands can be easily manipulated, queued, or undone, providing a more flexible and modular design.
The relationship with CommandHandler is one-to-one, one command can have only one handler.
It is commonly employed in GUI applications, multi-level undo mechanisms, transactional systems, and event-driven architectures where decoupling components is essential.
You can read more about the Command as a design pattern in this book
What is a CommandHandler?
CommandHandler is a component or class responsible for executing a command.
The CommandHandler acts as an intermediate layer between the client that creates and sends the command objects and the receiver that performs the actual action.
It receives the command object, extracts the necessary information, and then delegates the execution to the appropriate components of our system.
What is a CommandBus?
CommandBus is a communication mechanism used to decouple the sender of a command from its handler or executor.
It is a part of the Command Pattern and is commonly used in applications that follow the CQRS (Command Query Responsibility Segregation) pattern or other similar architectural styles.
If you want me to write a separate article about CQRS with Python examples, please let me know 😉.
The CommandBus acts as an intermediary that receives commands from clients and routes them to their corresponding CommandHandlers.
Instead of the client knowing which specific CommandHandler should be used to execute a particular command, it simply sends the command to the CommandBus, and the bus takes care of finding the appropriate handler and executing the command.
Implementation
Firstly let me define an interface for our CommandBus.
It will allow us to hide the implementation details, we will change only the implementation depending on our needs.
The code and test tests can be found on my GitHub here
1
2
3
4
|
# blog/examples/src/command_bus_examples/command.py
class Command:
pass
|
1
2
3
4
5
6
7
8
9
10
11
12
|
# blog/examples/src/command_bus_examples/generic_handler.py
from typing import Generic, TypeVar
from src.command_bus_examples.command import Command
TCommand = TypeVar("TCommand", bound=Command)
class Handler(Generic[TCommand]):
def __call__(self, command: TCommand) -> None:
raise NotImplementedError
|
1
2
3
4
5
6
7
8
|
# blog/examples/src/command_bus_examples/custom_types.py
from typing import Any, Callable, Union
from src.command_bus_examples.command import Command
from src.command_bus_examples.generic_handler import Handler
HandlerType = Union[Callable[[Any], None], Handler[Command]]
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# blog/examples/src/command_bus_examples/command_bus.py
from abc import ABC, abstractmethod
from typing import Type
from src.command_bus_examples.command import Command
from src.command_bus_examples.custom_types import HandlerType
class CommandBus(ABC):
@abstractmethod
def register_handler(self, command: Type[Command], handler: HandlerType) -> None:
pass
@abstractmethod
def execute(self, command: Command) -> None:
pass
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# blog/examples/src/command_bus_examples/errors.py
from __future__ import annotations
from src.command_bus_examples.command import Command
class CommandAlreadyRegistered(Exception):
@classmethod
def for_command(cls, command_type: str) -> CommandAlreadyRegistered:
return cls(f"`{command_type}` has been already registered!")
class HandlerNotFound(Exception):
@classmethod
def for_command(cls, command: Command) -> HandlerNotFound:
return cls(f"No handler has been found for {command}!")
|
The simplest implementation
The simplest approach is to use a dictionary that will keep commands and corresponding command handlers.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# blog/examples/src/command_bus_examples/simple_command_bus.py
from typing import Dict, Type
from src.command_bus_examples.command import Command
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.custom_types import HandlerType
from src.command_bus_examples.errors import CommandAlreadyRegistered, HandlerNotFound
class SimpleCommandBus(CommandBus):
def __init__(self) -> None:
self._handlers: Dict[Type[Command], HandlerType] = {}
def register_handler(self, command: Type[Command], handler: HandlerType) -> None:
if command in self._handlers:
raise CommandAlreadyRegistered.for_command(command.__name__)
self._handlers[command] = handler
def execute(self, command: Command) -> None:
try:
handler = self._handlers[type(command)]
handler(command)
except KeyError:
raise HandlerNotFound.for_command(command)
|
Implementation with the usage of existing libraries
There are a lot of libraries that give you the functionality of a CommandBus.
One of them is apos. I used this one in my examples.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# blog/examples/src/command_bus_examples/apos_command_bus.py
from typing import Type
from apos import Apos, MissingHandler, OverwritingHandler
from src.command_bus_examples.command import Command
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.custom_types import HandlerType
from src.command_bus_examples.errors import CommandAlreadyRegistered, HandlerNotFound
class AposCommandBus(CommandBus):
def __init__(self) -> None:
self._apos = Apos()
def register_handler(self, command: Type, handler: HandlerType) -> None:
try:
self._apos.subscribe_command(command, handler)
except OverwritingHandler as error:
raise CommandAlreadyRegistered.for_command(command.__name__) from error
def execute(self, command: Command) -> None:
try:
self._apos.publish_command(command)
except MissingHandler as error:
raise HandlerNotFound.for_command(command) from error
|
Implementation with the usage of DI container
Another interesting approach may be an implementation with the usage of a DI container.
In my examples, I used the kink library.
It is a dependency injection container made for Python.
If you want to learn more about dependency injection I encourage you to read my blog post about it.
You can find it here.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# blog/examples/src/command_bus_examples/kink_command_bus.py
from typing import Type
from kink import Container
from kink.errors import ServiceError
from src.command_bus_examples.command import Command
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.custom_types import HandlerType
from src.command_bus_examples.errors import CommandAlreadyRegistered, HandlerNotFound
from src.command_bus_examples.generic_handler import Handler
class KinkCommandBus(CommandBus):
def __init__(self, container: Container) -> None:
self._container = container
def register_handler(self, command: Type[Command], handler: HandlerType) -> None:
if not issubclass(handler.__class__, Handler):
raise ValueError("Handler must be a subclass of Handler[TCommand]")
if command in self._container:
raise CommandAlreadyRegistered.for_command(command.__name__)
self._container[command] = handler
def execute(self, command: Command) -> None:
try:
handler = self._container[type(command)]
handler(command)
except ServiceError as error:
raise HandlerNotFound.for_command(command) from error
|
As you can see I’m using the DI container as a registry for my commands and their handlers.
Implementation with the usage of generics
Finally, we reached the last one 😉
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# blog/examples/src/command_bus_examples/command_bus_using_generics.py
from typing import Dict, Type
from src.command_bus_examples.command import Command
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.custom_types import HandlerType
from src.command_bus_examples.errors import CommandAlreadyRegistered, HandlerNotFound
from src.command_bus_examples.generic_handler import Handler
class CommandBusUsingGenerics(CommandBus):
def __init__(self) -> None:
self._handlers: Dict[Type[Command], HandlerType] = {}
def register_handler(self, command: Type[Command], handler: HandlerType) -> None:
if not issubclass(handler.__class__, Handler):
raise ValueError("Handler must be a subclass of Handler[TCommand]")
if command in self._handlers:
raise CommandAlreadyRegistered.for_command(command.__name__)
self._handlers[command] = handler
def execute(self, command: Command) -> None:
try:
handler = self._handlers[type(command)]
handler(command)
except KeyError:
raise HandlerNotFound.for_command(command)
|
Tests pass for each implementation 🎉 let us move to the final example.
Simple FastAPI application with the usage of CommandBus
In this section, I want to show you how to connect everything that I mentioned in this blog post.
I created a simple FastAPI application that has only two endpoints.
One for registering the user, and the second one for getting the user information.
The goal is to show you how all of these components work together.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# blog/examples/src/command_bus_examples/fast_api_example/bootstrap.py
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.command_bus_using_generics import CommandBusUsingGenerics
from src.command_bus_examples.fast_api_example.command_handlers import RegisterUserHandler
from src.command_bus_examples.fast_api_example.commands import RegisterUser
from src.command_bus_examples.fast_api_example.repository import InMemoryUserRepository, UserRepository
command_bus = CommandBusUsingGenerics()
user_repository = InMemoryUserRepository()
command_bus.register_handler(RegisterUser, RegisterUserHandler(user_repository))
def get_command_bus() -> CommandBus:
return command_bus
def get_user_repository() -> UserRepository:
return user_repository
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# blog/examples/src/command_bus_examples/fast_api_example/app.py
from typing import Annotated
import uvicorn
from fastapi import Depends, FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from src.command_bus_examples.command_bus import CommandBus
from src.command_bus_examples.fast_api_example.bootstrap import get_command_bus, get_user_repository
from src.command_bus_examples.fast_api_example.commands import RegisterUser
from src.entity_id import EntityId
from src.command_bus_examples.fast_api_example.errors import UserAlreadyExist, UserNotFound, ValidationError
from src.command_bus_examples.fast_api_example.repository import UserRepository
app = FastAPI()
class RegisterUserRequest(BaseModel):
first_name: str
last_name: str
email: str
@app.post("/users", status_code=201)
async def register_user(
request: RegisterUserRequest, command_bus: Annotated[CommandBus, Depends(get_command_bus)]
) -> JSONResponse:
user_id = EntityId.new_one()
command_bus.execute(RegisterUser(user_id, request.first_name, request.last_name, request.email))
return JSONResponse(content={}, headers={"Location": f"/users/{user_id}"})
@app.get("/users/{user_id}")
async def get_user(
user_id: str, user_repository: Annotated[UserRepository, Depends(get_user_repository)]
) -> JSONResponse:
user = user_repository.get(EntityId.of(user_id))
return JSONResponse(content={"id": str(user.id), "full_name": user.full_name, "email": user.email})
@app.exception_handler(UserNotFound)
async def user_not_found_exception_handler(request: Request, exc: UserNotFound) -> JSONResponse:
return JSONResponse(
status_code=404,
content={"message": f"Oops! {exc}"},
)
@app.exception_handler(UserAlreadyExist)
async def user_already_exist_exception_handler(request: Request, exc: UserAlreadyExist) -> JSONResponse:
return JSONResponse(
status_code=409,
content={"message": f"Oops! {exc}"},
)
@app.exception_handler(ValidationError)
async def validation_error_exception_handler(request: Request, exc: ValidationError) -> JSONResponse:
return JSONResponse(
status_code=400,
content={"message": f"Oops! {exc}"},
)
if __name__ == "__main__":
uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True)
|
our command handler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# blog/examples/src/command_bus_examples/fast_api_example/command_handlers.py
from email_validator import EmailNotValidError, validate_email
from src.command_bus_examples.fast_api_example.commands import RegisterUser
from src.command_bus_examples.fast_api_example.errors import ValidationError
from src.command_bus_examples.fast_api_example.repository import UserRepository
from src.command_bus_examples.fast_api_example.user import User
from src.command_bus_examples.generic_handler import Handler
class RegisterUserHandler(Handler[RegisterUser]):
def __init__(self, user_repository: UserRepository) -> None:
self._user_repository = user_repository
def __call__(self, command: RegisterUser) -> None:
try:
validated_email = validate_email(command.email, check_deliverability=False)
self._user_repository.save(User(command.id, command.first_name, command.last_name, validated_email.email))
except EmailNotValidError as error:
raise ValidationError(f"Provided email value `{command.email}` is not correct") from error
|
If you want to check all the source files,
the code is also available on my GitHub https://github.com/szymon6927/szymonmiks.pl/tree/master/blog/examples/src/command_bus_examples/fast_api_example
Outro
You reached this section - thank you!
Just as a small recap:
- command - an intention to be performed, we may also say that it is “something that changes the state of our system”
- command handler - the component responsible for executing a command
- command bus - communication mechanism used to decouple the sender of a command from its handler
My goal in writing this blog post was to show you the general idea behind CommandBus as a pattern.
Don’t be strict with the implementation I showed here.
The implementation details may differ, do not treat it as the one and only correct solution 😉.
If you have any interesting approaches to the implementation of CommandBus - I would love to see them.
That’s all, let me know what you think, and leave a comment if you have any questions 😉
Happy coding!