애매한 잡학사전

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch 본문

DEV/AWS

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch

거대한 개발자 2022. 6. 28. 17:00
반응형

1. Flow

    - Batch 프로그램으로 별도의 시스템을 사용하지 않고 바로 S3로 데이터를 저장하는 프로세스

처리 프로세스


2. 환경 세팅

- 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.

    2-1. Batch

    - 일반 Java application (maven) 프로젝트 생성

    2-2. AWS S3

 

로그 데이터 처리를 위한 AWS S3 환경 구성

- 버킷 생성 : 버킷 이름 작성 후 나머지 옵션들은 그냥 기본으로 놓고 스크롤 제일 하단으로 내린 후 버킷 만들기 버튼 선택

dev-gabriel.tistory.com

    2-3. AWS Glue

 

로그 데이터 처리를 위한 AWS Glue 환경 구성

1. AWS Glue Crawler 추가 - 클롤러 추가 버튼 클릭 합니다. - 크롤러 이름을 입력 후 다음을 버튼 클릭합니다. - 크롤러 소스 타입 : 이미 테이블을 생성했기 때문에 Existing catalog tables 선택 후 다음 버..

dev-gabriel.tistory.com

    2-4. AWS Athena

 

로그 데이터 처리를 위한 AWS Athena 환경 구성

1. 데이터 원본 생성 - 콘솔을 접속해서 Athena 페이지로 이동 - 왼쪽 메뉴에서 데이터 원본 클릭 - 오른쪽에 데이터 원본 생성 버튼 클릭 ( 생성하려고 하는 데이터 원본이 AWS Glue 데이터 카탈로그

dev-gabriel.tistory.com


3. 개발 소스

    3-1. Batch 개발

        - Batch 클래스

package sample.run;

import sample.processing.LogProcessing;

public class Batch {
    public static void main(String... args){
        LogProcessing logProcessing = new LogProcessing();
        logProcessing.run();
    }
}

        - LogProcessing 클래스

package sample.processing;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import dbpia.aws.AwsS3FileUpload;
import dbpia.db.DatabaseManager;
import dbpia.util.StringUtils;

public class LogProcessing {
    /**
     * 데이터 베이스 조회
     * @param query 로그 데이터 조회 쿼리
     * @param migDay 로그 데이터 조회 날짜
     * @return 로그 데이터 목록
     */
    private List<Map<String, Object>> getDataList(String query, String migDay){
        DatabaseManager dm = new DatabaseManager();
        Map<String, String> param = new HashMap<>();
        param.put("1", migDay);    // 검색 일자

        // DB 조회
        return dm.select(query, param);
    }

    /**
     * AWS S3에 저장할 파일 분할 처리
     * @param paramList S3에 저장할 로그 데이터 목록
     * @param uploadSection S3에 저장할 로그 데이터 구분
     */
    private void awsS3PutObjectLogData(List<Map<String, Object>> paramList, String uploadSection){
        int listSize = paramList.size();

        // 리스트 사이즈가 listMaxSize 값 이상일 경우 파일 분할 저장
        if(listSize < 300000){
            new AwsS3FileUpload().run(paramList, uploadSection);
        } else {
            // 가용 프로세스 개수
            int runtimeProcessCnt = Runtime.getRuntime().availableProcessors();    
            // 목록을 가용 프로세스 개수 만큼 나눠서 처리
            int sizeOneOfList = listSize / runtimeProcessCnt;    

            // 가용 프로세스 개수 만큼 리스트 분할 처리
            for(int count = 0; count < runtimeProcessCnt; count++){
                // 리스트 시작
                int fromIndex = count == 0 ? count:(count * sizeOneOfList);
                // 리스트 마지막
                int toIndex = count == (runtimeProcessCnt-1) ? listSize:((count+1) * sizeOneOfList);

                List<Map<String, Object>> subList = paramList.subList(fromIndex, toIndex);

                new AwsS3FileUploadSample().run(subList, uploadSection);    // 파일 업로드 시작
            }
        }
    }

    /**
     * log 처리 시작
     */
    public void run(){
        // 처리 구분 값 세팅
        String section = "처리 구분";

        // 하루 전 날짜 세팅
        String yesterday = StringUtils.getYesterday("", 1);

        // DB 조회
        String query = "db 데이터 조회 쿼리";
        List<Map<String, Object>> resultList = getDataList(query, yesterday);;

        // AWS S3 파일 업로드 프로세스
        awsS3PutObjectLogData(resultList, section);
    }
}

        - AwsS3FileUploadSample 클래스

package sample.processing;

import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dbpia.util.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class AwsS3FileUploadSample {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final AwsBasicCredentials credentials;

    public AwsS3FileUploadSample(){
        String awsS3AccessKeyId = StringUtils.getProperty("AWS.s3.accessKeyId");
        String awsS3SecretAccessKey = StringUtils.getProperty("AWS.s3.secretAccessKey");
        this.credentials = AwsBasicCredentials.create(awsS3AccessKeyId, awsS3SecretAccessKey);
    }

    /**
     * AWS S3 client 생성
     * @return AWS S3 Client
     */
    private S3Client getS3Client() {
        return S3Client.builder()
            .region(Region.AP_NORTHEAST_2)
            .credentialsProvider(StaticCredentialsProvider.create(this.credentials))
            .httpClient(UrlConnectionHttpClient.builder().build())
            .build();
    }

    /**
     * AWS S3 버킷 생성
     * @param filePath Aws S3 File path
     * @return Aws S3 Buket
     */
    private PutObjectRequest getPutObjectRequest(String filePath) {
        String buketNm = "S3 버킷 이름"; // S3 버킷 이름
        return PutObjectRequest.builder()
            .bucket(buketNm)
            .key(filePath)
            .build();
    }

    /**
     * 파일 업로드 시작
     * @param listMap 업로드할 파일 목록
     * @param s3UploadSection 파일 업로드 구분
     */
    public void run(List<Map<String, Object>> listMap, String s3UploadSection) {
        try(S3Client s3Client = getS3Client()){
            PutObjectRequest putObjectRequest;
            RequestBody requestBody;

            if(s3UploadSection.equals("로그 구분1")){
                putObjectRequest = getPutObjectRequest("파일 경로");
                requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
            } else {    // 로그 구분 2
                putObjectRequest = getPutObjectRequest("파일 경로");
                requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
            }

            s3Client.putObject(putObjectRequest, requestBody);    // AWS S3 저장
        } catch (Exception ex){
            logger.error("Aws S3 File Upload Exception: " + ex.getMessage());
        }
    }
}

4. 정리

- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 다음날 0시에 배치 프로그램이 DB를 조회해서 최소한의 파일 분할로 파티션 처리를 한 후 S3에 저장되는 로직으로 구현 하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.

 

Comments