애매한 잡학사전

AWS S3 파일 업로드에 Dependency Injection 적용 본문

DEV/AWS

AWS S3 파일 업로드에 Dependency Injection 적용

거대한 개발자 2022. 10. 20. 14:22
반응형

 

아래 링크의 로그 데이터 처리하기에서 S3에 로그 파일을 업로드하는 배치 프로그램을 개발했었는데 비슷한 로그 파일 및

통계 프로그램들이 늘어서 비슷한 배치 프로그램도 계속 추가되어야 하는 상황이 발생하였습니다.

그래서 고민하다 DI 적용으로 코어 로직은 그대로 두고 옵션 형태로 조건만 변경해서 동작하는 방식으로 리팩터링을 진행하여 정리하고자 합니다.

 

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch

1. Flow - Batch 프로그램으로 별도의 시스템을 사용하지 않고 바로 S3로 데이터를 저장하는 프로세스 2. 환경 세팅 - 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습

dev-gabriel.tistory.com


순서는 다음과 같습니다.

1. option 인터페이스 생성

2. option 인터페이스 구현

3. aws s3 파일 업로드 클래스 리팩터링

4. 로그 처리 클래스 리팩터링

5. main 리팩터링


Option Interface 생성

전체 프로세스는 크게 2가지로 나눌 수 있습니다.

첫째. Database log data 조회

둘째. Aws S3 File upload

 

여기서 각각의 항목 중에서 변경되는 항목들을 분리합니다. 

첫 번째는 DB에서 조회하는 데이터들이 달라지기 때문에 쿼리가 변경될 것이고, 그에 따른 조건들이 생성되어야 하고, 두 번째는 S3에 업로드할 파일 경로와 json 형태로 변경 후 파티션 되어 있는 컬럼 데이터를 삭제하는 부분, 그리고 마지막은 Athena 최적화 시 분할 파일의 크기를 설정하는 부분이 될 것 같습니다.

 

다시 한번 정리하면 

1. DB 조회 쿼리

2. prepare statement 파라미터 생성

3. S3 File 업로드 경로

4. 파티셔닝 컬럼 삭제

5. 파일 용량 분할

이렇게 정리가 되는데 이를 기반으로 인터페이스를 생성합니다.

 

public interface StatisticsOptionInter {
    /**
     * 데이터베이스 조회 쿼리 가져오기
     * @return 쿼리
     */
    String getSearchQuery();

    /**
     * prepare statement parameter 생성
     * @return prepare statement parameter 값
     */
    Map<String, String> getMakeParam(String paramVal);

    /**
     * AWS Athena 에서 사용할 S3 file 저장 경로
     * @param partitionDate 파티션 처리할 날짜
     * @return 파일 path
     */
    String getS3FilePathForAthena(String partitionDate);

    /**
     * 파티셔닝 처리된 컬럼 삭제 후 json 형태로 변환 처리
     * @param contents 저장할 내용
     * @return json 형태 저장할 내용
     */
    String getStatisticsContents(List<Map<String, Object>> contents);
    
    /**
     * S3 저장 파일 분할 수
     * @return 파일 분할 수
     */
    int getSplitFileCount();
}

Option 인터페이스 구현

Option 인터페이스를 구현하는 클래스를 생성하고 각 메서드들을 Override 합니다.

getMakeParam 메서드에서 호출하는 MakeQueryParameter.getOneColumnParameter 메서드는 쿼리 파라미터의 개수가 달라서 별도의 클래스 파일로 구현하였고, getStatisticsContents 메소드에서 호출하는 ListUtils.getRemovingPartitioningStatisticsContents 메소드는 파티셔닝 되는 컬럼이 같은 경우가 있어서 별도의 클래스로 생성하였습니다. 

이 두 클래스는 참고용으로 제일 하단 부록에 기록해 두겠습니다. 

 

public class LogUseStatistics implements StatisticsOptionInter {
    @Override
    public int getSplitFileCount() {
        return 3;
    }

    @Override
    public String getSearchQuery() {
        return "로그 조회 쿼리";
    }

    @Override
    public Map<String, String> getMakeParam(String paramVal) {
        return MakeQueryParameter.getOneColumnParameter(paramVal, 2);
    }

    @Override
    public String getS3FilePathForAthena(String partitionDate) {
        String strYyyy = StringUtils.getDateInDesiredFormat("yyyy", partitionDate);  // 년
        String strMm = StringUtils.getDateInDesiredFormat("M", partitionDate);       // 월
        String strDay = StringUtils.getDateInDesiredFormat("d", partitionDate);      // 일
        
        return "log/impressions/log-use-statistics/acss_yy=" + strYyyy + "/acss_mm=" + strMm 
            + "/acss_day=" + strDay + "/rgst_day=" 
            + StringUtils.getDateInDesiredFormat("yyyy-MM-dd", partitionDate)
            + "/log_impressions_" + System.currentTimeMillis() + ".json";
    }

    @Override
    public String getStatisticsContents(List<Map<String, Object>> contents) {
        return ListUtils.getRemovingPartitioningStatisticsContents(contents);
    }
}

AWS S3 파일 업로드 클래스 리팩터링

S3 파일 업로드 클래스도 인터페이스를 생성했지만 정리하면서 생각해보니 S3 파일 업로드 클래스는 인터페이스를 생성해서 사용할 필요가 없을 것 같은데 히스토리 관리 차원에서 생성하는 것으로 합니다.

public interface AwsS3FileUploadInter {
    int LIST_MAX_SIZE = 300000;
    void run(String filePath, String contents);
}

이렇게 기준이 되는 파일 사이즈와 파일 업로드 실행 메서드를 가지는 인터페이스를 생성합니다.

그리고 이 인터페이스를 구현하는 클래스를 생성합니다. 

AWS S3 연결 후 filePath에 contents 내용을 저장하는 클래스입니다.

public class AwsS3FileUpload implements AwsS3FileUploadInter{

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final AwsBasicCredentials credentials;

    public AwsS3FileUpload(){
        String awsS3AccessKeyId = StringUtils.getProperty("AWS.s3.accessKeyId");
        String awsS3SecretAccessKey = StringUtils.getProperty("AWS.s3.secretAccessKey");
        this.credentials = AwsBasicCredentials.create(awsS3AccessKeyId, awsS3SecretAccessKey);
    }

    /**
     * AWS S3 client 생성
     * @return AWS S3 Client
     */
    private S3Client getS3Client() {
        return S3Client.builder()
            .region(Region.AP_NORTHEAST_2)
            .credentialsProvider(StaticCredentialsProvider.create(this.credentials))
            .httpClient(UrlConnectionHttpClient.builder().build())
            .build();
    }

    /**
     * AWS S3 버킷 생성
     * @param filePath Aws S3 File path
     * @return Aws S3 Buket
     */
    private PutObjectRequest getPutObjectRequest(String filePath) {
        String buketNm = "버킷명";        // S3 버킷 이름
        return PutObjectRequest.builder()
            .bucket(buketNm)
            .key(filePath)
            .build();
    }

    /**
     * 파일 업로드 시작
     * @param filePath 파일 업로드 경로
     * @param contents 업로드할 내용
     */
    @Override
    public void run(String filePath, String contents){
        try(S3Client s3Client = getS3Client()){
            PutObjectRequest putObjectRequest = getPutObjectRequest(filePath);
            RequestBody requestBody = RequestBody.fromString(contents);
            s3Client.putObject(putObjectRequest, requestBody);    // AWS S3 저장
        } catch (Exception ex){
            logger.error("Aws S3 File Upload Exception: " + ex.getMessage());
        }
    }
}

로그 처리 클래스 리팩터링

Main 메서드에서 호출하기 위해 로그 처리 클래스도 아래와 같이 간단하게 인터페이스를 생성하였습니다.

public interface StatisticsInterface {
    /**
     * * 일배치 시작
     */
    void processingRun();

    /**
     * * 마이그레이션 시작
     * @param sDate 마이그레이션 시작일
     * @param eDate 마이그레이션 종료일
     */
    void processingMigRun(String sDate, String eDate);

    /**
     * * 실제 처리 작업
     * @param processingDate 처리일자
     */
    void realProcessing(String processingDate);
}

 

그리고 이 인터페이스를 구현합니다.

조회 Option 인터페이스와 S3 파일 업로드 인터페이스를 전역 변수로 하고 생성자에서 각각의 값을 할당합니다.

그리고 일 배 치는 D-1의 데이터가 취합되는 기능이고 마이그레이션은 입력한 시작일부터 종료일까지의 데이터를 취합하기 때문에 실제 처리되는 메서드는 하나로 처리할 수 있어서 각각의 메서드로 분리하였습니다. 

파일 업로드 인터페이스에서 설정한 LIST_MAX_SIZE 보다 LIST 사이즈가 클 경우 Option에서 설정한 개수만큼 파일을 분할해서 S3에 업로드하는 방식으로 메서드를 구현하였습니다. 

public class StatisticsProcessing implements StatisticsInterface {
    private final StatisticsOptionInter processingQuery;    // 조회 쿼리
    private final AwsS3FileUploadInter fileUploadInter;    // S3 파일 업로드 처리

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public StatisticsProcessing(StatisticsOptionInter processingQuery, 
        AwsS3FileUploadInter fileUploadInter){
        this.processingQuery = Objects.requireNonNull(processingQuery);
        this.fileUploadInter = Objects.requireNonNull(fileUploadInter);
    }

    /**
     * 일배치 시작 *
     */
    @Override
    public void processingRun(){
        realProcessing(StringUtils.getYesterday("", 1));    // 하루 전 날짜 조회 후 처리 시작
    }

    /**
     * 마이그레이션 시작 *
     * @param sDate 마이그레이션 시작일
     * @param eDate 마이그레이션 종료일
     */
    @Override
    public void processingMigRun(String sDate, String eDate){
        int diffDate = StringUtils.getDaysDiff(sDate, eDate);    // 시작 / 종료 날짜 차이 조회
        for(int i=0; i<=diffDate; i++){    // 날짜 차이 만큼 반복문 실행
            // 시작일 부터 종료일까지 처리
            realProcessing(StringUtils.getTrargetDay(sDate, "yyyyMMdd", i));
        }
    }

    /**
     * 실제 처리 작업 진행 *
     * @param processingDate 처리일자
     */
    @Override
    public void realProcessing(String processingDate){
        DatabaseManageInter databaseManageInter = new DatabaseManager();
        List<Map<String, Object>> resultList =
            databaseManageInter.select(processingQuery.getSearchQuery(), 
                processingQuery.getMakeParam(processingDate));

        if(!resultList.isEmpty()){

            logger.info(processingDate + " = " + resultList.size());

            if(resultList.size() < fileUploadInter.LIST_MAX_SIZE){
                // list max size 보다 리스트가 작을 경우 파일 1개 생성 후 업로드 처리
                fileUploadInter.run(processingQuery.getS3FilePathForAthena(processingDate),
                    processingQuery.getStatisticsContents(resultList));

            } else {
                // list max size 보다 크면 AwsS3FileUpload.runtimeProcessCnt 개수만큼 나눠서 업로드 처리
                int splitFileCount = processingQuery.getSplitFileCount();
                int listSize = resultList.size();
                // 목록을 가용 프로세스 개수 만큼 나눠서 처리
                int sizeOneOfList = listSize / splitFileCount;

                // 가용 프로세스 개수 만큼 리스트 분할 처리
                for(int count = 0; count < splitFileCount; count++){
                    // 리스트 시작
                    int fromIndex = ListUtils.getFromIndex(count, sizeOneOfList);
                    // 리스트 마지막
                    int toIndex = ListUtils.getToIndex(count, splitFileCount, listSize);
                    // 파일 업로드 시작
                    fileUploadInter.run(processingQuery.getS3FilePathForAthena(processingDate)
                        , processingQuery.getStatisticsContents(resultList.subList(fromIndex, toIndex)));
                }
            }
        } else {
            logger.info(processingDate + " = 데이터 없음");
        }
    }
}

호출하는 main 메서드 리팩터링

아래의 소스코드와 같이 메인 메서드를 리팩터링 하면 각 Log에 따른 Option 인터페이스를 구현한 Option 클래스만 변경되고 StatisticsProcessing 에서는 무슨일을 하는지 상관없어지기 때문에 계속해서 사용할 수 있는 구조입니다.

public static void main(String... args){

    StatisticsInterface statisticsInterface;
    StatisticsOptionInter queryInter;
    AwsS3FileUploadInter awsS3FileUploadInter = new AwsS3FileUpload();
    
    // StatisticsProcessing 객체 생성 시 option class 객체와 AwsS3FileUpload 객체 생성 후 파라미터로 전달 처리
    switch (args[0]){
        case "LOG1":
            queryInter = new LogUseStatisticsOption();
            statisticsInterface = new StatisticsProcessing(queryInter, awsS3FileUploadInter);
            statisticsInterface.processingRun();
            break;
        case "LOG1_MIG":
            queryInter = new LogUseStatisticsOption();
            statisticsInterface = new StatisticsProcessing(queryInter, awsS3FileUploadInter);
            statisticsInterface.processingMIgRun(args[1], args[2]);
            break;
        case "LOG2":
            queryInter = new SecondLogUseStatisticsOption();
            statisticsInterface = new StatisticsProcessing(queryInter, awsS3FileUploadInter);
            statisticsInterface.processingRun();
            break;
        case "LOG2_MIG":
            queryInter = new SecondLogUseStatisticsOption();
            statisticsInterface = new StatisticsProcessing(queryInter, awsS3FileUploadInter);
            statisticsInterface.processingMigRun(args[1], args[2]);
            break;
    }       
}

이상으로 S3에 File Upload 시 DI 적용 리팩터링을 해보았습니다. 

최대한 알아보기 쉽게 정리한다고 했는데 혹시 궁금한 사항이 있으면 댓글 남겨주시면 정성껏 답변 드리도록 하겠습니다. 

더 나은 방법이 있으면 댓글 달아주시면 감사하겠습니다.

 

 

 


부록

MakeQueryParameter 클래스

public class MakeQueryParameter {
    /**
     * 파라미터 값 하나가 여러 조건으로 들어갈 경우의 파라미터 생성
     * @param paramValue 조건 값
     * @param paramCnt 중복 개수
     * @return 파라미터 Map
     */
    public static Map<String, String> getOneColumnParameter(String paramValue, int paramCnt){
        Map<String, String> retParam = new HashMap<>();
        int cnt = 0;
        while(cnt < paramCnt){
            retParam.put(String.valueOf(++cnt), paramValue);
        }
        return retParam;
    }

    /**
     * 파라미터 값 3개가 여러 조건으로 들어갈 경우 파라미터 생성
     * @param paramValue1 조건 값 1
     * @param paramValue2 조건 값 2
     * @param paramValue3 조건 값 3
     * @param paramCnt 중복 개수
     * @return 파라미터 Map
     */
    public static Map<String, String> getThreeColumnParameter(String paramValue1, String paramValue2
        , String paramValue3, int paramCnt){
        Map<String, String> retParam = new HashMap<>();
        int cnt = 0;
        while(cnt < paramCnt){
            retParam.put(String.valueOf(++cnt), paramValue1);	// 검색 연도
            retParam.put(String.valueOf(++cnt), paramValue2);	// 검색 월
            retParam.put(String.valueOf(++cnt), paramValue3);	// 검색 일
        }
        return retParam;
    }
}

ListUtils 클래스

public class ListUtils {
    private static final Logger logger = LoggerFactory.getLogger(ListUtils.class);

    /**
     * 리스트 시작 index 가져오기
     * @param count 현재 count
     * @param sizeOneOfList list size
     * @return 시작 index
     */
    public static int getFromIndex(int count, int sizeOneOfList){
        return count == 0 ? count:(count * sizeOneOfList);
    }

    /**
     * 리스트 마지막 index 가져오기
     * @param count 현재 count
     * @param baseCnt 기준 count
     * @param listSize 목록 size
     * @return 마지막 index
     */
    public static int getToIndex(int count, int baseCnt, int listSize){
        int sizeOneOfList = listSize / baseCnt;
        return count == (baseCnt-1) ? listSize:((count+1) * sizeOneOfList);
    }

    /**
     * 파티션 처리된 컬럼 내용 삭제 처리
     * 파티션 컬럼이 acss_yy, acss_mm, acss_day, rgst_day 일 경우 사용 ( 통계 관련 )
     * @param contents contents 리스트
     * @return 파티션 처리된 컬럼이 제거된 리스트
     */
    public static String getRemovingPartitioningStatisticsContents(List<Map<String, Object>> contents){
        StringBuilder jsonData = new StringBuilder();

        if(!contents.isEmpty()){
            JsonStringUtils jsonStringUtils = new JsonStringUtils();

            // convert map to json and return data
            for(Map<String, Object> map : contents){
                // Remove the partitioning data key in the map
                map.remove("acss_yy");
                map.remove("acss_mm");
                map.remove("acss_day");
                map.remove("rgst_day");
                jsonData.append(jsonStringUtils.convertMapToJson(map));
            }
        }
        return jsonData.toString().replaceAll("}\\{", "}" + System.getProperty("line.separator") +"\\{");
    }
}

 

Comments