애매한 잡학사전

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch, AWS Lambda, AWS SQS 본문

DEV/AWS

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch, AWS Lambda, AWS SQS

거대한 개발자 2022. 6. 28. 15:33
반응형

1. Flow

    - Batch, AWS SQS, AWS Lambda 사용

처리 프로세스


2. 환경 구성

    - 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.

    2-1. Batch

        - 일반 Java application (maven) 프로젝트 생성

    2-2. AWS SQS

 

로그 데이터 처리를 위한 AWS SQS 환경 구성

1. 대기열 생성 - AWS 콘솔 접속 후 AWS SQS 페이지로 이동 - 대기열 생성 버튼 클릭 - 대기열 세부 정보 입력 - 구성 정보 입력 (default 유지, 필요시 수정) - 액세스 정책 선택 (default 유지) - 대기열..

dev-gabriel.tistory.com

    2-3. AWS Lambda

 

로그 데이터 처리를 위한 AWS Lambda 환경 구성

1. 람다 함수 생성 - AWS 콘솔 접속, lambda 검색 후 서비스 선택 - 왼쪽 메뉴에서 함수 선택 후 오른쪽 목록에서 함수 생성 버튼 클릭 - 함수 생성 기본 정보 입력 - 런타임은 각자 상황에 맞는 언어를

dev-gabriel.tistory.com

    2-4. AWS S3

 

로그 데이터 처리를 위한 AWS S3 환경 구성

- 버킷 생성 : 버킷 이름 작성 후 나머지 옵션들은 그냥 기본으로 놓고 스크롤 제일 하단으로 내린 후 버킷 만들기 버튼 선택

dev-gabriel.tistory.com

    2-5. AWS Glue

 

로그 데이터 처리를 위한 AWS Glue 환경 구성

1. AWS Glue Crawler 추가 - 클롤러 추가 버튼 클릭 합니다. - 크롤러 이름을 입력 후 다음을 버튼 클릭합니다. - 크롤러 소스 타입 : 이미 테이블을 생성했기 때문에 Existing catalog tables 선택 후 다음 버..

dev-gabriel.tistory.com

    2-6. AWS Athena

 

로그 데이터 처리를 위한 AWS Athena 환경 구성

1. 데이터 원본 생성 - 콘솔을 접속해서 Athena 페이지로 이동 - 왼쪽 메뉴에서 데이터 원본 클릭 - 오른쪽에 데이터 원본 생성 버튼 클릭 ( 생성하려고 하는 데이터 원본이 AWS Glue 데이터 카탈로그

dev-gabriel.tistory.com


3. 소스 개발

    3-1. Java Application 개발

public void run(int start, int end){
    DatabaseManager dm = new DatabaseManager();
    DatabaseQuery query = new DatabaseQuery();
    Map<String, String> param = new HashMap<>();

    param.put("1", StringUtils.getYesterday("", 1)); // 검색 일자 (어제 날짜)
    
    // DB 조회
    List<Map<String, Object>> resultList = dm.select(query.getDataList(), param);

    AwsSqs awsSqs = new AwsSqs();

    // 한 줄씩 json 형태로 변경 후 AWS SQS에 전송 처리
    for(Map<String, Object> tmp : resultList){
        try(SqsClient client = awsSqs.getSqsClient()){
            if(awsSqs.queueUrl.isEmpty()){
                awsSqs.createQueue(client, "queueName"); // 큐 생성
            }
            // param map json string 형태로 변환 큐에 전송
            awsSqs.sendMessage(client, new ObjectMapper().writeValueAsString(tmp));   
        } catch (SqsException sqsException){
            logger.error("AWS Simple Queue Service Exception : " 
                + sqsException.getMessage());
        } catch (JsonProcessingException jsonProcessingException){
            logger.error("jsonProcessingException : " 
                + jsonProcessingException.getMessage());
        }
    }

    logger.info("date = " + searchDay + ", total list size = " + resultList.size() + " 종료");
}

    - DatabaseManager select method

/**
 * DB 조회 내용 값 세팅
 * @param resultList 반환 list
 * @param rs result set
 * @param metaData result meta data
 * @throws SQLException sql exception
 */
private void getResultList(List<Map<String, Object>> resultList, ResultSet rs
        , ResultSetMetaData metaData) throws SQLException {
    Map<String, Object> row;
    int columnCount = metaData.getColumnCount();

    while(rs.next()){
        row = new HashMap<>();
        for(int i=1; i <= columnCount; i++){

            String catalogName = metaData.getColumnName(i);
            Object rsVal = rs.getObject(i);

            row.put(catalogName, rsVal);
        }
        resultList.add(row);
    }
}

/**
 * select query 실행 with Param
 * @param query select query
 * @param paramVal parameter
 * @return 조회 내용
 */
public List<Map<String, Object>> select(String query, Map<String, String> paramVal){

    List<Map<String, Object>> resultList = new ArrayList<>();

    Connection conn = getConnection();
    PreparedStatement stmt = null;
    ResultSet rs = null;

    try{
        stmt = conn.prepareStatement(query);

        for(int cnt = 1; cnt <= paramVal.size(); cnt++){
            stmt.setString(cnt, paramVal.get(String.valueOf(cnt)));
        }

        rs = stmt.executeQuery();

        ResultSetMetaData metaData = rs.getMetaData();
        getResultList(resultList, rs, metaData);

    } catch (SQLException sqlException) {
        logger.error("select sqlException : " + sqlException.getMessage());
    } catch (Exception ex){
        logger.error("select Exception : " + ex.getMessage());
    } finally {
        close(rs, stmt, conn);
    }

    return resultList;
}

    3-2. AWS Lambda 개발

- 아래 링크의 3.2 AWS Lambda 항목 참고

 

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with AWS Lambda, AWS SQS

1. Flow - AWS SQS, AWS Lambda 사용 2. 환경 세팅 - 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다. 2-1. AWS SQS 로그 데이터 처리를 위한 AWS SQS 환경 구성 1. 대기열 생성 - A..

dev-gabriel.tistory.com


4. 정리

- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 다음날 0시에 배치 프로그램이 DB를 조회해서 한번에 Json 형태로 변경 후 SQS에 전달하려고 하였는데 데이터 용량이 큰 경우에는 지원하지 않아서 라인 1개씩 대기열(AWS SQS)에 전달해서 서버 자원과는 상관없는 구조로 나머지는 AWS 내부에서 AWS Lambda로 처리되어 S3에 저장되는 로직을 구현하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.

 

Comments