DEV/AWS

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with AWS Lambda, AWS SQS

거대한 개발자 2022. 6. 28. 14:06
반응형

1. Flow

- AWS SQS, AWS Lambda 사용

처리 프로세스


2. 환경 세팅

- 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.

    2-1. AWS SQS

 

로그 데이터 처리를 위한 AWS SQS 환경 구성

1. 대기열 생성 - AWS 콘솔 접속 후 AWS SQS 페이지로 이동 - 대기열 생성 버튼 클릭 - 대기열 세부 정보 입력 - 구성 정보 입력 (default 유지, 필요시 수정) - 액세스 정책 선택 (default 유지) - 대기열..

dev-gabriel.tistory.com

    2-2. AWS LAMBDA

 

로그 데이터 처리를 위한 AWS Lambda 환경 구성

1. 람다 함수 생성 - AWS 콘솔 접속, lambda 검색 후 서비스 선택 - 왼쪽 메뉴에서 함수 선택 후 오른쪽 목록에서 함수 생성 버튼 클릭 - 함수 생성 기본 정보 입력 - 런타임은 각자 상황에 맞는 언어를

dev-gabriel.tistory.com

    2-3. AWS S3

 

로그 데이터 처리를 위한 AWS S3 환경 구성

- 버킷 생성 : 버킷 이름 작성 후 나머지 옵션들은 그냥 기본으로 놓고 스크롤 제일 하단으로 내린 후 버킷 만들기 버튼 선택

dev-gabriel.tistory.com

    2-4. AWS Glue

 

로그 데이터 처리를 위한 AWS Glue 환경 구성

1. AWS Glue Crawler 추가 - 클롤러 추가 버튼 클릭 합니다. - 크롤러 이름을 입력 후 다음을 버튼 클릭합니다. - 크롤러 소스 타입 : 이미 테이블을 생성했기 때문에 Existing catalog tables 선택 후 다음 버..

dev-gabriel.tistory.com

    2-5. AWS Athena

 

로그 데이터 처리를 위한 AWS Athena 환경 구성

1. 데이터 원본 생성 - 콘솔을 접속해서 Athena 페이지로 이동 - 왼쪽 메뉴에서 데이터 원본 클릭 - 오른쪽에 데이터 원본 생성 버튼 클릭 ( 생성하려고 하는 데이터 원본이 AWS Glue 데이터 카탈로그

dev-gabriel.tistory.com


3. 소스 개발

    3-1. AWS SQS

        - Web Application 에서 AWS SQS 로 전송 소스 구현

import egovframework.com.cmm.AwsSqs;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SqsException;

/**
* Map 형태를 json string 형태로 변경 
*/
private String convertMapToJsonString(Map<String, String> params) throws JsonProcessingException {
    return new ObjectMapper().writeValueAsString(params);
}

/**
* AWS SQS message 전송 처리
*/
private void sendSqsMessage(Map<String, String> params){
    AwsSqs awsSqs = new AwsSqs();
	try(SqsClient client = awsSqs.getSqsClient()){
        if(!awsSqs.queueUrl.equals("queueUrl")){
            awsSqs.createQueue(client, "queueUrl"); // 큐 생성
        }
        // param map json string 형태로 변환 큐에 전송
        awsSqs.sendMessage(client, convertMapToJsonString(params));   
    } catch (SqsException sqsException){
        logger.error("AWS Simple Queue Service Exception : " + sqsException.getMessage());
    } catch (JsonProcessingException jsonException){
        logger.error("JsonProcessingException : " + jsonException.getMessage());
    }
}

    3-2. AWS Lambda

        - AWS Lambda 함수 로컬 환경 구성 참고

 

AWS Serverless 개발 환경 세팅 in IntelliJ

1. 개요 - AWS Lambda를 이용한 기능 구현이 필요해서 구글링 해봤는데, 조금 복잡한 것 같아서 재사용을 위해 정리 한다. - IntelliJ를 기반으로 한다. 2. 프로그램 설치 2-1. IntelliJ 플러그인 설치 - File >

dev-gabriel.tistory.com

        - AWS SQS 에 값이 넘어 왔을 경우 이를 처리할 소스 구현

package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;

import example.s3.FileUpload;

public class Hello implements RequestHandler<SQSEvent, Void> {
    @Override
    public Void handleRequest(SQSEvent sqsEvent, Context context) {
        for (SQSEvent.SQSMessage msg : sqsEvent.getRecords()) {
            FileUpload upload = new FileUpload();
            upload.run(msg.getBody(), context);	// s3 업로드 처리
        }
        return null;
    }
}

        - S3 파일 업로드 처리

public void run(String contents, Context context) {
    LambdaLogger logger = context.getLogger();

    try{
        String filePath = "저장될 s3 파일 위치";
        contents = "저장될 JSON 형태의 내용";
		
        // AWS S3 저장
        s3Client.putObject(getPutObjectRequest(filePath), RequestBody.fromString(contents));
    } catch (JsonProcessingException jpe) {
        logger.log("JsonProcessingException : " + jpe.getMessage());
        logger.log("JsonProcessingException - CONTENT : " + contents);
    }
}

4. 정리

- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 실시간으로 대기열(AWS SQS)에 전달해서 서버 자원과는 상관없는 구조로 나머지는 AWS 내부에서 AWS Lambda로 처리되어 S3에 저장되는 로직을 구현하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.