[FocusOnYou] SNS와 SQS로 Lambda 연결하기
이전 글에서 Rekognition API들을 호출하는 Lambda들을 작성했었다.
이번에는 이 Lambda들을 연결해주는 SNS와 SQS 부분을 작성해보도록 한다. 이 두 서비스는 메시지 브로커로써 유사한 개념을 가진 서비스로 다소 햇갈릴만한 요소가 있으니 서비스별로 개념을 파악해가며 알아보도록 한다.
SNS
Simple Notification Service라는 이름을 가진 SNS는 Topic 기반의 메시지 브로커 서비스이다. Subscribe가 존재하는 알림 서비스이다.
하나의 주제(Topic)에 구독한 구독자(Subscribers)들에 일괄적으로 메시지를 Push하는 구조를 가진다.
이번 프로젝트에서 Rekognition API 호출을 여러번했고, 비동기적 처리를 위해 여러 Lambda 함수로 분리했기 때문에 이 함수들을 트리거하기 위해 SNS를 도입했다.
IndexFaces - StartFaceSearch
IndexFaces API를 실행 후 생성된 CollectionId를 통해 StartFaceSearch API를 호출하기 위해 Lambda 사이에 생성한 SNS Topic이다.
IndexFaces API 호출 이후, 다음과 같이 Topic에 publish한다.
sns_response = sns.publish(
TopicArn=SNS_TOPIC,
Message=json.dumps({
'bucket': bucket,
"job_id": collection_id,
'timestamp': timestamp,
'face_name': face_name,
'face_id': response['FaceRecords'][0]['Face']['FaceId']
})
해당 Topic의 구독을 StartFaceSearch 함수로 지정해, publish되면 트리거하도록 한다. 이는 Topic에서 구독을 생성해주거나 Lambda에서 해당 Topic을 트리거로 지정함으로써 설정할 수 있다.
StartFaceSearch - GetFaceSearch
StartFaceSearch의 NotificationChannel 옵션을 통해 Face search 작업의 결과를 SNS로 publish할 수 있다. 이 topic에서 바로 GetFaceSearch Lambda를 트리거하도록 설정할 수 있지만, Rekognition의 Get관련 API들을 호출할 때에는 SQS queue를 통해서 처리하라는 공식 Docs의 내용이 있었기에 SQS queue를 구독하도록 설정해주었다.
SQS
Simple Queue Service인 SQS는 Queue 기반의 관리형 메시지 브로커 서비스로, AWS에서 제공한 최초의 서비스라고 한다.
Producer가 큐에 데이터를 넣고, Consumer가 큐에 존재하는 데이터를 빼는 구조를 통해 Application 간의 연결을 느슨하게 해주는 역할을 한다.
이 프로젝트에서 SQS를 사용하게 된 것은 아래와 같은 이유였다.
- StartFaceSearch 작업의 결과를 SNS로 Publish해주는 구간에서 유실되는 메시지가 없도록 하기 위함
(SNS는 메시지를 보냄과 동시에 삭제하므로 Subscriber 측에서 문제가 발생하면 메시지가 유실될 우려가 있다.) - GetFaceSearch 작업이 쓰로틀링이 걸릴 정도로 과도하게 호출되는 것을 막기 위함
(Rekognition API의 공식 Docs에서도 Warning으로 제안해주는 부분이다.)
SQS에서 대기열 생성을 통해 '표준' 타입 queue를 생성해준다.
이후, SNS에서 이 queue에 메시지를 push하도록 하기 위해 액세스 정책을 수정해준다.
이렇게 액세스 정책을 추가해야 topic이 이 queue에 정상적으로 SendMessage를 호출할 수 있다.
원래 여기서 SQS queue를 FIFO queue로 생성해서 SNS topic과 연결하려고 했다. FIFO queue의 특징인 메시지의 순차적 전송, 정확히 1회 전송이 이 프로젝트에 도입하기에 적합하다고 생각했기 때문이다.
그런데 StartFaceSearch가 FIFO 타입 topic을 지원하지 않는 것인지 결과를 topic으로 전송하지 못하는 것을 발견했다. 비교를 위해서 일반 타입 topic을 생성하고 NotificationChannel을 이 topic으로 설정하고 실행했을 때에는 문제없이 잘 넘어가는 것을 볼 수 있었다. 분명히 topic의 정책도 둘다 동일하게 따로 수정하지도 않았고, 두 queue의 정책도 SNS 서비스에서 접근할 수 있도록 설정 마친 상태임에도 불구하고, fifo 타입 Topic의 로그가 아무것도 남지 않았다.
아무튼 표준 타입의 queue를 생성하고 Lambda 트리거를 설정해준다.
비디오를 업로드해 워크로드 프로세스를 진행시켜본다.
StartFaceSearch 작업이 완료되면 정상적으로 topic에 publish되고, queue로 메시지가 들어가 정상적으로 lambda를 트리거할 수 있었다.
이렇게 워크로드 프로세스는 FastAPI 클라이언트에서 파일을 업로드하면 Rekognition API들을 순차적으로 거쳐 Collection의 얼굴이 비디오에서 발견된 구간의 정보를 찾아주는 과정까지 도달했다.
이제 마지막으로 이렇게 추출된 얼굴 발견 구간 정보를 elastic transcoder로 넘겨서 transcoding 작업만 마치면 모든 워크로드 설계가 완성된다.
다음으로 elastic transcoder 작업에 대해 알아보도록 한다.