이전글에서 Lambda를 트리거할 수 있도록 SNS와 SQS로 연결해주었다.
이제 Rekognition API를 통해 추출된 비디오에서 특정 인물이 탐색된 구간 timestamp를 바탕으로 해당 구간끼리만 연결해주는 작업만 남았다.
Elastic Transcoder
S3 버킷에 저장된 미디어 파일을 변환(Transcoder)해주는 서비스이다. 클라우드 미디어 편집 서비스 정도로 이해하면 편하다. Transcoder는 글을 작성하는 현재 서울 리전을 지원하지 않는 관계로 us-east-1에서 작업을 진행했다.
Elastic Transcoder에서는 크게 4가지 개념요소가 존재한다.
- 작업 : Transcoding을 수행하는 것으로, 비동기 처리에서의 job정도로 볼 수 있다.
- 파이프라인 : Transcoding 작업을 관리하는 대기열(작업 큐), 작업을 생성할 때 파이프라인을 지정해주어야한다.
- 프리셋 : Transcoding 작업을 수행하는 중에 적용할 설정값의 템플릿 (비디오 해상도, 오디오 채널 수 등)
- 알림 : Transcode 작업의 진행과정을 나타내는 것, SNS와 연동해서 작업 진행 중 / 작업 완료 / Warning / Error 상태들에 대해 알림을 전송할 수 있다.
버킷에 저장된 비디오를 Elastic Transcoder로 처리하는 작업은 파이프라인에 Transcoding 작업을 생성함으로써 진행된다.
결과 비디오 저장 버킷 생성
Transcode 작업을 마치면 결과물이 저장될 버킷을 생성한다. 결과물을 확인할 수 있도록 퍼블릭하게 오픈한 버킷을 생성해주도록 한다.
퍼블릭 액세스와 버킷 정책을 수정해준다.
파이프라인 생성
Transcoder 작업을 걸어주려면 파이프라인 ID를 지정해주어야한다. 따라서, 작업에 사용될 파이프라인을 미리 생성해준다.
Elastic Transcoder에서 Create New Pipeline으로 들어가 Pipeline 생성 메뉴로 넘어온다.
파이프라인 생성에서 설정해주는 필수 항목은 크게 파이프라인 이름, 처리할 비디오가 있는 버킷(input bucket), Transcoder IAM Role, Transcoder 작업 결과물을 저장할 버킷이 있다.
IAM Role의 경우, Default role이 있으니 다른 서비스와 연계하는 일이 없는 이상 default role을 사용해도 무방하다.
그 하단의 Optional한 설정으로는 Notification과 Encryption이 있다.
위에서 설명했던 4가지 이벤트( 작업 진행 중 / 작업 완료 / Warning / Error )별로 각각 SNS Topic을 지정해 이벤트 내역을 수신받을 수 있다.
Transcoder 작업의 결과를 다른 Lambda로 수신받아 처리하기 위해 SNS Topic을 지정하고 Pipeline을 생성했다.
파이프라인을 생성하면 파이프라인 ID가 부여된다.
구간 편집(Clip Stitching)
이제 Rekognition API로 비디오에서 얼굴이 탐지된 구간의 timestamp가 담긴 배열을 Transcoder에서 사용할 수 있도록 format을 가공 후 Transcoder Job을 생성해준다. (create_job)
create_job의 사용법은 첨부된 Docs에 적혀있다.
create_job의 docs가 꽤 길고 복잡하다. 여기서 핵심이 되는 부분은 Inputs로, 여기에 얼굴이 탐지된 구간의 timestamp 내용이 들어가서 해당 구간만 Clipping되어 처리되는 것이다.
Inputs는 dictionary의 list로, dictionary는 각 구간의 내용이다. dictionary의 TimeSpan에서 구간의 시작위치(StartTime)와 구간의 길이(Duration)으로 input 비디오의 구간을 인식한다.
Inputs=[
{
'Key': 'string',
...
'TimeSpan': {
'StartTime': 'string',
'Duration': 'string'
},
...
},
],
Inputs의 format에 맞게 변형된 detected_timestamp를 넣어준다.
# call elastic transcoder to stitch the timestamp and make new video
transcoder_job = tscoder.create_job(
PipelineId=pipeline_id,
Inputs=detected_timestamp_str,
Output={
'Key': job_id + '.mp4',
}
)
transjob_id = transcoder_job['Job']['Id']
transjob_status = transcoder_job['Job']['Status']
logger.info(f'Transcoder job : {transjob_id}')
logger.info(f'Transcoder job status : {transjob_status}')
이렇게 구성 후, 비디오를 업로드해 job을 진행하면 Transcoder create job까지 이어지는 것을 확인할 수 있다.
Transcode 작업 결과 알림
이렇게 Transcode 작업 Create까지 연동된것을 확인했다. 마지막으로 Transcode 작업이 끝나고 결과 내역을 확인하도록 알림까지 전송해보도록 한다.
파이프라인을 만들때, 이벤트에 대한 알림 SNS 설정을 해준 것을 바탕으로 Lambda를 트리거해 Slack으로 알림을 쏘는 아키텍쳐를 추가해준다.
SNS 구독으로 트리거된 Lambda를 작성한다.
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
logger.info(f'Message : {message}')
# parse message
transjob_status = message.get('state') # PROGRESSING|COMPLETED|WARNING|ERROR
transjob_id = message.get('jobId')
job_id = message.get('input').get('key').split('/')[1]
output_filename = message.get('outputs')[0].get('key')
transcoded_video_url = f"https://{FOCUSONYOU_RESULT_BUCKET}.s3.{BUCKET_REGION}.amazonaws.com/{output_filename}"
if transjob_status == 'COMPLETED':
...
slack_message = {
'channel': SLACK_CHANNEL,
'text': f"*[FocusOnYou]*\n\nJob {job_id} has been completed!\nVideo : {transcoded_video_url}"
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", slack_message['channel'])
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
전송된 Message의 상태가 COMPLETED면, 설정된 채널로 메시지를 전송한다.
결과 비디오도 문제없이 접근 가능한 것도 볼 수 있다.