앞선 글에서 FastAPI 단에서 클라이언트로부터 파일을 업로드 받는 로직까지 작성했다.
이번에는 이 프로젝트의 핵심 단계인 Rekognition Workload를 구성해보도록한다.
구성하는 Rekognition Workload의 아키텍쳐는 아래 그림과 같다.
Workload는 여러개의 Lambda로 쪼개서 마이크로서비스 아키텍쳐에 가깝도록 설계해보았다.
IndexFace
클라이언트가 업로드한 파일이 S3 버킷에 업로드되면 S3 이벤트 알림(notification)이 발생하는 것을 트리거로 삼아서 Lambda를 구성했다.
작업ID에 대해 Collection을 생성(CreateCollection)해주고, 같이 업로드된 얼굴 이미지 파일과 Collection으로 IndexFace를 호출한다.
try:
rekog_response = rekog.create_collection(CollectionId=collection_id)
if rekog_response['StatusCode'] == 200:
logger.info('Collection created : '+rekog_response['CollectionArn'])
except Exception as e:
print(e)
# object의 메타데이터에 저장한 얼굴이름 가져오기
face_name = s3.head_object(Bucket=bucket, Key=key)['Metadata']['face_name']
# call rekognition to add face to collection
response = rekog.index_faces(
CollectionId=collection_id,
Image={
'S3Object': {
'Bucket': bucket,
'Name': key
}
},
ExternalImageId=face_name,
MaxFaces=1,
QualityFilter="AUTO",
DetectionAttributes=['ALL']
)
인덱싱할 얼굴의 이미지는 버킷에 업로드한 것을 가져오고, 업로드할때에 같이 입력받은 얼굴의 이름을 ExternalImageId로 넣어준다.
MaxFaces의 값은 1로 고정해줘서 사용자가 얼굴이 2개 이상이 담긴 이미지를 업로드해도 하나만 인덱싱을 하도록 설정했다.
IndexFaces API의 Docs
IndexFace API가 성공적으로 실행되고 결과를 받아오면 다음 작업으로 연결해줄 SNS로 Publish해준다.
Message에 job_id를 비롯한 내용들을 적어서 넘겨준다.
# IndexFaces API 실행 후 결과 Publish
sns_response = sns.publish(
TopicArn=SNS_TOPIC,
Message=json.dumps({
'bucket': bucket,
"job_id": collection_id,
'timestamp': timestamp,
'face_name': face_name,
'face_id': response['FaceRecords'][0]['Face']['FaceId']
})
)
StartFaceSearch
IndexFace lambda가 publish한 SNS를 통해서 트리거된 StartFaceSearch lambda 함수를 작성한다.
SNS로 트리거되었으므로 Message를 파싱해서 job_id 같은 내용을 가져온다.
버킷에 업로드된 비디오 파일을 가져와서 SNS로 넘겨받은 CollectionId와 함께 StartFaceSearch를 호출한다.
response = rekog.start_face_search(
Video={
'S3Object': {
'Bucket': FOCUSONYOU_BUCKET,
'Name': 'target-video/' + job_id+'/'+video_name
}
},
CollectionId=job_id,
NotificationChannel={
'SNSTopicArn': SNS_TOPIC, # publish할 SNS topic의 ARN
'RoleArn': ROLE_ARN # SNS topic에 publish할 수 있는 Role의 ARN
},
FaceMatchThreshold=90, # Face를 검출할 Threshold, 기본값은 80%
)
search_id = response['JobId']
logger.info("SearchId : " + search_id)
StartFaceSearch가 호출되면 Search작업에 대한 작업id가 생성되고 리턴된다. 이 id값을 다음 GetFaceSearch에서 사용한다.
호출한 StartFaceSearch 작업이 완료되면 설정해둔 NotificationChannel에 따라서 자동으로 SNS topic에 publish해준다.
StartFaceSearch가 해당 Topic에 publish하기 위해 작업을 수행할 수 있는 Role의 ARN도 설정 파라미터로 넣어준다.
NotificationChannel 설정에 대한 내용으로 아래와 같은 것들이 있다.
- SNS topic은 호출한 Rekognition endpoint와 동일한 Region이여야 한다.
- Role을 생성할때, 기본으로 생성되어있는 AmazonRekognitionServiceRole이라는 Policy를 사용하면 다른 다양한 SNS Topic에 Access 권한을 줄 수 있다.
(다만, 이 경우에는 SNS Topic의 이름을 AmazonRekognition이라는 접두사를 붙인 채로 생성해야한다.)
StartSearchFace의 API Docs
GetFaceSearch
Rekognition 워크로드의 마지막으로 StartFaceSearch 작업의 결과를 가져오기위한 GetFaceSearch를 실행하는 Lambda를 작성한다.
StartFaceSearch의 탐색작업이 끝나면 설정한 NotificationChannel을 통해 탐색 완료상태를 SNS로 Publish한다.
StartFaceSearch의 작업id를 GetFaceSearch의 파라미터로 넘긴 후 호출한다.
...
# GetFaceSearch API 실행
rekog_response = rekog.get_face_search(
JobId=sns_message['JobId'],
SortBy='INDEX'
)
...
SNS와 Lambda 사이에 SQS queue를 하나 더 두어서 동시다발적으로 GetFaceSearch 작업이 호출되어 쓰로틀링이 걸리지 않도록 설계했다. 또한 API 호출의 모니터링을 SQS queue하나만으로 간편하고 효율적으로 하기 위함도 있다.
이는 공식 Docs에서도 권장하는 방법이다.
이렇게 Rekognition API들을 호출하는 Lambda들을 구성해보았다.
다음으로 이 Lambda들을 연결하는 SNS와 SQS를 설정하는 파트를 알아보도록 한다.