ToyProject

[GAMST] Streaming API 구현기 (feat. SSE)

Omoknooni 2024. 5. 4. 23:22

이번 프로젝트에서 API 부분은 Django와 drf(djangorestframework)를 사용해서 구현하기로 했다.

우선 사용자로부터 비디오파일을 업로드받아 이를 Bucket에 저장하고, 저장됨과 동시에 Model 처리 서버로 하여금 처리작업을 시작하도록 파이프라인을 구성했다.

 

그 뒤로 Model 처리 결과 중 RISK 탐지내역을 클라이언트에 프레임 별로 처리되는 즉시 전송을 해주어야한다. 아무래도 주제가 CCTV 관제의 보조 도구이므로 실시간성이 무엇보다도 중요하게 다가왔다.

이를 위해서는 RISK 탐지 내역의 Streaming API의 구현이 필요했다.

 

Streaming의 구현에서 주로 언급되는 기술로는 폴링과 웹 소켓이 있다. 이외에도 SSE(Server-Sent Event)기법이 있다.

이 기술들 중에서 우리의 프로젝트에 도입하기 가장 적합한 기술을 선택해야했다.

 

 

Streaming 기법 비교

폴링(Polling)

Polling은 클라이언트가 일정한 간격으로 서버에게 요청을 보내어 업데이트를 확인하는 방식이다. 클라이언트는 정기적으로 서버에게 데이터를 요청하고, 서버는 데이터를 반환한다. 이는 전통적인 HTTP 통신방식으로 HTML5가 등장하기 이전에 사용되던 통신방식이라고 한다. 주로 Ajax나 Fetch API를 통해서 브라우저 단에서 서버로 Request를 보내는 방식이다.

실시간 업데이트를 위해 반복적으로 요청을 보내므로 효율성이 낮고 실시간성도 떨어진다. 또한 지속적으로 반복되는 HTTP Request/Response로 인해 서버 쪽에 부담이 가는 부분이 존재한다.

 

 

웹소켓(WebSocket)

다음으로 웹소켓의 경우 서버-클라이언트와 양방향 통신이 가능한 기술로, 주로 채팅서비스 등에서 활용된다. 

Websocket은 양방향 통신을 가능하게 하는 TCP 기반의 통신 프로토콜이다. 클라이언트와 서버 간에 지속적인 연결을 유지하고, 양방향으로 데이터를 전송할 수 있습니다. 웹소켓은 주로 클라이언트-서버 양방향 소통이 가능한 채팅서비스에서 주로 활용된다. 이를 통해 실시간 데이터 전송이 가능하며, Polling과 달리 연결을 유지하므로 효율성과 성능이 높다.

 

다만, 웹소켓은 websocket 프로토콜을 위한 웹소켓 서버의 구축이 필요하다는 점이 있다. 

 

SSE(Server-Sent Event)

Server-sent-event는 서버에서 클라이언트로 데이터를 지속적으로 푸시하는 방식이다. 클라이언트는 서버에게 한 번 연결을 요청하고, 서버는 연결을 유지한 채로 필요할 때마다 데이터를 보내게 된다. IE를 제외한 대부분의 모던 브라우저에서 지원하는 방식이다.

SSE는 Websocket과 비슷하지만, SSE는 HTML5 표준의 정식 기술로 HTTP 프로토콜을 그대로 사용할 수 있다는 특징이 존재한다. 

Bidirectional한 Websocket과는 달리 단방향 통신이므로 클라이언트에서 서버로의 메시지 전송이 불가능하다는 점이 있다.

 

서버가 클라이언트 쪽으로 SSE를 전송하기 위해서는 Content-Type을 'text/event-stream'으로 설정한 뒤, 전송하는 메시지에 따라 format에 맞게 지정해주기만 하면 된다.

 

SSE 메시지는 크게 2가지 요소로 이루어져 있다.

event: event1
data: event data 123

event: event2
data: {"message": "Hello world", "price": 123456}

 

Event Type

전송되는 이벤트의 타입을 지정하는 부분으로, 필수 요소는 아니다.

이 field를 통해 여러개의 Event를 구분하여 데이터를 전송할 수 있다. AWS SNS에서의 Topic 개념으로 바라볼 수 있다.

 

Event Data

이벤트들의 데이터(메시지)가 담기는 부분으로 필수 요소다. Event Stream에서의 데이터들은 한쌍의 개행(\n\n)으로 구분지어 전송해주어야한다. 이렇게 구분되지 않은 데이터는 하나의 메시지로 간주되어 처리된다.

 

 

Using server-sent events - Web APIs | MDN

Developing a web application that uses server-sent events is straightforward. You'll need a bit of code on the server to stream events to the front-end, but the client side code works almost identically to websockets in part of handling incoming events. Th

developer.mozilla.org

 

최종 선택

폴링의 경우 실시간성이 많이 떨어지고, 서버에 부담이 되는 부분이 있어서 바로 제외를 했다.

이 프로젝트의 Streaming은 서버로부터 탐지된 RISK 데이터를 가져와서 출력해주기만 하면 되는 부분이므로 초기 세팅이 많이 필요한 웹소켓보다 타 API와 마찬가지의 HTTP Request만 형식에 맞춰서 생성해주면 되었기에 SSE를 채택하게 되었다. 그리고 SSE의 도입은 프론트엔드 개발 담당도 구현하기에 편해진다는 점도 있었다. 

 

구현의 간단함

 

SSE Streaming API 구현

SSE API를 구현하기에 앞서서 현재 백엔드 환경인 django에서 사용하는 WSGI 서버가 아닌 ASGI 서버가 필요하다. WSGI는 기본적으로 짧은 생명주기를 가진다. 따라서 웹소켓이나 Streaming과 같은 긴 요청을 처리하기 위해서 ASGI의 도입이 필수적이게 된다.

 

여기서는 daphne를 사용하기로 한다. daphne는 django에서 웹소켓과 같은 비동기 프로토콜을 지원하는 Channel을 지원하기위해 개발된 ASGI 서버다. 

pip install daphne

 

daphne를 설치 후, settings.py의 INSTALLED_APP에 추가해주고 ASGI_APPLICATION 정의를 해준다.

INSTALLED_APPS = [
    'daphne',	# 가장 앞에 추가를 해준다
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    # CORS
    'corsheaders',

    # drf
    'drf_yasg',
    'rest_framework',
    'rest_framework_simplejwt',

    # app
    'video',
    'user',
    'camera',
]

ASGI_APPLICATION = 'config.asgi.application'

 

 

세팅 후, Streaming 해줄 API를 생성한다. 여기에서 StreamingHttpResponse가 사용되었다. 이는 대부분 django의 응답 객체로 사용되는 HttpResponse와는 다르게 Response body를 여러개로 쪼개어 클라이언트에 응답해준다.

SSE를 Streaming해줄 것이므로 content_type으로 'text/event-stream'을 지정해준다.

class StreamRiskList(APIView):
    def get(self, request, pk):
        try:
            response = StreamingHttpResponse(self.generate_object(), content_type='text/event-stream')
            response['Cache-Control'] = 'no-cache'
            response['Connection'] = 'keep-alive'
            return response
        except Exception as e:
            return Response({'Error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

 

 

다음으로 이 StreamingHttpResponse를 통해 Stream해줄 response content의 iterator를 지정해주어야한다. 

우리는 DB에서 추가된 row를 가져와야하므로, ORM을 통해 DB 테이블로부터 fetch해온 object를 바탕으로 event_stream data를 생성해주도록 iterator를 구성했다.

이 구조를 조금 더 단계적으로 바라보기위해 이 iterator의 구조를 get_queryset → get_objects → generate_object로 쪼개어 구현했다.

    # 마지막에 추가된 row 반환
    @sync_to_async
    def get_queryset(self, last_object_id):
        if not last_object_id:
            queryset = RiskySection.objects.filter(
                video=self.kwargs['pk']
            ).order_by('id').values().first()
        else:
            queryset = RiskySection.objects.filter(
                video=self.kwargs['pk'],
                id=last_object_id,
                ).order_by('id').values().last()
        return queryset
        
    async def get_objects(self, last_object_id):
        await asyncio.sleep(1)
        result = await self.get_queryset(last_object_id)
        if result:
            return result
        else:
            return {}

    # Stream할 데이터를 계속 생성
    async def generate_object(self):
        last_object_id = None
        while True:
            object = await self.get_objects(last_object_id)
            if object:
                yield f"data: {object}\n\n"
                last_object_id = object['id'] + 1

 

ORM을 통해 테이블의 값을 가져오는 get_queryset에는 sync_to_async라는 데코레이터가 사용되었는데, 이는 asgrief 패키지의 함수로 비동기 코드와 동기 코드의 상호 운용을 위해 사용된다.

django의 ORM 작업은 기본적으로 동기적으로 작동한다. ASGI에서의 StreamingHttpResponse는 비동기로 작동하도록 구현한 함수가 필요한데 이를 위해서 sync 함수에 데코레이터로 달아주는 sync_to_async를 통해 동기/비동기 문제를 해결할 수 있다.

 

SSE Renderer

SSE Streaming이 정상적으로 클라이언트 측에 랜더링되기 위해, Renderer 설정이 추가로 필요하다.

BaseRenderer를 상속받은 Custom Renderer를 정의하고 SSE Streaming에 사용될 media_type, render 메소드를 구현해준다.

# config/sse_render.py
from rest_framework.renderers import BaseRenderer

class ServerSentEventRenderer(BaseRenderer):
    media_type = 'text/event-stream'
    format = 'sse'
    charset = None
    content_type = 'text/event-stream'

    def render(self, data, media_type=None, renderer_context=None):
        return "data: {}\n\n".format(data)

 

 

이렇게 생성한 SSE Renderer를 앞서 생성해준 Streaming API에 renderer_class로 설정해준다.

class StreamRiskList(APIView):
    renderer_classes = [ServerSentEventRenderer]
    
    def get(self, request, pk):
    ......

 

 

Stream 테스트

이제 이 API를 url로 연결시킨 후, 호출해 SSE Stream이 받아와지는 것을 확인할 수 있다.

 

현재 구현한 API와 iterator에 따르면, 특정 video_id 값의 RISK 탐지 내역이 생성된 순서대로 stream된다. Model 처리 과정과 연동하면 SSE Stream API를 먼저 열어둔 뒤, 특정 비디오를 처리하는 과정 내에서 RISK가 탐지되어 DB에 insert되는 즉시 Stream API를 통해 클라이언트쪽으로 데이터가 Stream되는 것이다.

 

기존의 단순 Request-Response 구조보다는 어려웠던 실시간 Stream API를 구현해보았다. 괜히 django를 썼나하는 생각도 들긴했지만, 어떻게든 방법은 있다는 것을 깨달았다. 이 로직을 FastAPI와 같은 ASGI 친화적인 프레임워크를 사용했으면 어땠을까라는 생각도 살짝 해보게 되었다.