Backend/FastAPI

[FastAPI] 이벤트 기반 아키텍처 구축하기 (w. fastapi-events)

mopil 2023. 12. 30. 01:47
반응형

FastAPI 프레임워크로 이벤트 기반 아키텍처를 구축하는 방법에 대해 공유한다.

 

# 이벤트 기반 아키텍처란?

새로운 댓글 작성 시, 게시물 작성자에게 알림을 보내야 하는 기능을 구현해야 한다고 가정해 보자.

 

이때 "댓글 작성 로직" 마지막에 "게시물 작성자에게 푸시 알림 전송 로직"이 포함될 것이다.

 

이러면 "댓글 작성"은 본연의 역할과는 다소 다른 도메인인, "푸시 알림 전송"이라는 로직을 포함하게 된다.

(즉, 알림 도메인에 의존성이 생긴다.)

 

이를 이벤트 기반 아키텍처로 변경하면 결합도를 낮출 수 있다.

 

 

댓글 작성 -> 댓글 작성 완료 이벤트 발행

댓글 작성 완료 이벤트 핸들러에서 푸시 알림 전송

 

이러한 이벤트 기반 아키텍처는 로컬 이벤트(앱 서버 내부 이벤트)와 외부 이벤트 구조(카프카, 메시지 큐 이용 등)로 나뉜다.

 

이벤트 기반 아키텍처를 고려할 때 주의할 점은, 이벤트의 범위가 도메인에 국한되어야 한다는 것이다.

 

(댓글 작성 완료 이벤트 O, 푸시 알림 이벤트 X)

 

 

여기서는 FastAPI 프레임워크로 로컬 이벤트 구조를 구축하는 방법에 대해 알아보자.

 

 

# fastapi-events

스프링은 자체적으로 이벤트 핸들러를 제공해 주지만, FastAPI는 없다.

 

따라서 써드파티 라이브러리를 사용해야 하는데, 꽤 괜찮은 라이브러리를 찾을 수 있었다.

https://github.com/melvinkcx/fastapi-events/tree/dev

 

GitHub - melvinkcx/fastapi-events: Asynchronous event dispatching/handling library for FastAPI and Starlette

Asynchronous event dispatching/handling library for FastAPI and Starlette - GitHub - melvinkcx/fastapi-events: Asynchronous event dispatching/handling library for FastAPI and Starlette

github.com

 

스타가 그렇게 많진 않지만, 이벤트로 검색했을 때 가장 좋아 보였다.

 

리드미에 사용법이 아주 친절하게 나와있어서 이를 참조하여도 좋다.

 

 

# 설정

우리는 로컬 이벤트 아키텍처를 만들 것이므로, pip으로 설치해 주자.

pip install fastapi-events

 

비슷한 이름의 fastapi-event도 있는데, 이는 아니니 주의 (s가 붙어야 한다.)

 

미들웨어 설정

from fastapi_events.handlers.local import local_handler
from fastapi_events.middleware import EventHandlerASGIMiddleware

app.add_middleware(EventHandlerASGIMiddleware, handlers=[local_handler])

 

이벤트를 핸들러를 미들웨어에 등록해줘야 한다.

 

여기서 app은 FastAPI()로 생성된 app을 의미한다.

 

 

# 이벤트 정의하기

위에서 들었던 예시처럼 "댓글 작성 완료" 이벤트를 만들어보자.

from enum import Enum
from typing import Optional

from pydantic import BaseModel
from fastapi_events.registry.payload_schema import registry as payload_schema


class CommentEvents(Enum):
    CREATE_COMMENT = "CREATE_COMMENT"


@payload_schema.register(event_name=CommentEvents.CREATE_COMMENT)
class CreateCommentPayload(BaseModel):
    feed_owner_user_id: int
    parent_comment_writer_user_id: Optional[int] = None
    comment_writer_user_id: int
    comment_id: int  # 지금 생성 완료된 댓글 id

 

CommentEvents는 이벤트의 이름(키)을 정의해 준다.

 

그리고 pydantic을 사용하여 해당 이벤트의 payload(전달할 값)을 정의해 준다.

 

그리고 @payload_schema.register()를 통해서 이벤트 이름과 매핑시켜 준다.

 

# 이벤트 핸들러 만들기

위에서 정의한 이벤트를 수신하여 로직을 처리할 핸들러를 정의한다.

from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event

from core.event.events import CommentEvents


@local_handler.register(event_name=CommentEvents.CREATE_COMMENT)
async def handle_create_comment_send_push_handler(
    event: Event,
):
    print("야-호! 이벤트 핸들링 성공-!")
    event_name, payload = event
    print(f"event_name: {event_name} handled")

 

 

이벤트 핸들러는 @local_handler.register()를 통해 등록한다. 

 

그리고 이는 앞서 미들웨어로 추가했던 부분을 통해서 핸들러로서 등록된다.

 

핸들러는 인자로 Event를 받으며, 이는 event_name과 payload를 가지고 있어서 구조분해할당으로 꺼내서 쓸 수 있다.

 

 

event_name에는 아까 설정한 이벤트 이름을 그대로 넣어도 되고, COMMENT* 이런 형식도 가능하다.

 

(이러면 COMMENT로 시작하는 모든 이벤트를 수신한다)

 

리드미에는 이렇게 설명하고 있다.

 

추가로 async를 붙이면 비동기적으로 사용가능하고, 그냥 def만 붙여도 사용가능하다.

 

# 이벤트 발행하기

여기까지 하면 이제 모든 준비가 완료되었다.

 

이제 FastAPI 앱 어디서든 dispatch() 함수를 통해 이벤트를 발행할 수 있다.

 

그러면 같은 키를 리스닝하는 핸들러가 이를 수신할 것이다.

 

@router.post("/push")
async def send_push_notification():
    payload = CreateCommentPayload(
        comment_id=1,
        feed_owner_user_id=1,
        comment_writer_user_id=1,
    )
    dispatch(CommentEvents.CREATE_COMMENT, payload.model_dump())

 

dispatch 함수는 첫 번째 인자로 이벤트 이름(키)을, 두 번째 인자로 딕셔너리 형태의 payload를 받는다.

 

앞서 정의한 payload는 pydantic으로 정의되었으므로, model_dump()를 통해 딕셔너리로 변환하여 보내야 한다.

 

해당 API를 호출하면,

 

이벤트가 잘 핸들링되는 것을 확인할 수 있다.

 

 

# 댓글 작성 후 푸시 알림 구조에 이벤트 적용하기

처음 구조

@router.post(
    "",
    dependencies=[Depends(transactional)],
    response_model=CommentResponse,
)
@auth_required
async def create_comment(
    request: Request, feed_id: int, request_body: CommentCreateRequest
):
    feed = Feed.get_or_raise(feed_id)

    login_user = User.get_or_raise(get_login_user_id(request))
    comment = Comment.create(
        user=login_user,
        feed=feed_id,
        content=request_body.content,
    )
    
    # 피드 주인에게 새 댓글 알림 로직
    # 댓글 작성 로직에, 푸시 알림 로직이 포함되어버림
    feed_owner = User.get_or_raise(feed.user)
    push_request = PushRequest(
        device_type=feed_owner.device_type,
        device_token=feed_owner.push_token,
        title="피드 댓글 알림",
        content="새로운 댓글이 달렸어요",
    )
    push_client.send_push(push_request)

    return CommentResponse.of(
        comment=comment,
        is_writer=True,
    )

 

댓글 작성 API에 푸시 알림 발송이라는 로직이 포함되어 있다.

 

얼핏 보면 이 구조도 나쁘지 않아 보인다.

 

하지만, 만약 댓글 작성 시 추가적으로 다른 도메인의 행위가 필요하다면?

 

댓글 작성 로직에 다른 도메인 로직이 포함되게 되고, 결국 여럿 도메인과 결합도가 높아질 것이다.

 

그리고 이러한 추가적인 요건이 생길 때마다, 댓글 작성 로직과는 직접적으로 관련이 없지만, 댓글 작성 로직을 수정해야 하는 일이 발생한다.

 

 

이벤트 기반 구조로 변경

@router.post(
    "",
    dependencies=[Depends(transactional)],
    response_model=CommentResponse,
)
@auth_required
async def create_comment(
    request: Request, feed_id: int, request_body: CommentCreateRequest
):
    feed = Feed.get_or_raise(feed_id)

    login_user = User.get_or_raise(get_login_user_id(request))
    comment = Comment.create(
        user=login_user,
        feed=feed_id,
        content=request_body.content,
    )

    # 댓글 작성 완료 이벤트 발행 
    # 푸시 알림과 관련된 로직은 없음
    payload = CreateCommentPayload(
        comment_id=comment.id,
        feed_owner_user_id=feed.user,
        comment_writer_user_id=login_user.id,
    )
    dispatch(CommentEvents.CREATE_COMMENT, payload.model_dump())

    return CommentResponse.of(
        comment=comment,
        is_writer=True,
    )

 

 

이렇게 나누면 댓글 작성 API는 "댓글 작성"이라는 본인의 업무만 수행하고, 완료 이벤트를 발행한다.

 

그 후 이 이벤트를 다양한 핸들러가 컨슘 하여 푸시 알림을 보내든, 뭘 하든 해당 API는 관심사 밖의 영역이다.

 

추후에 댓글 작성 완료 후 다른 도메인의 행위가 필요하더라도, 이 이벤트를 핸들링하는 핸들러만 추가로 만들면 된다.

 

즉, 댓글 작성 로직을 수정하거나 여기에 다른 도메인 로직을 추가할 필요가 없다.

 

이렇게 하면 응집도를 높이고, 결합도를 낮출 수 있다.

 

 

다만, 이런 경우 이벤트 핸들러가 언제 이벤트를 컨슘 하는지 추적하기 어렵고, 보는 바와 같이 코드량이 조금 늘어나는 단점도 있으니 이 점도 상기하면 좋을 것 같다.

 

 

# 마무리

FastAPI 프레임워크에서 이벤트 기반 구조를 구현하는 방법에 대해 알아보았다. 

 

프레임워크 차원에서 공식적으로 지원해 주면 더 좋을 것 같다는 생각을 했다.

(아무래도 경량 프레임워크 컨셉이라 이러한 부분을 개발자가 직접 찾아서 구현해야 한다는 것이 조금 규모가 있는 프로젝트에선 번거로울 수 있다고 생각했다.)

 

다시금 스프링이 꽤 많은 기능을 제공해 줌을 느낄 수 있었다.

반응형