애매한 잡학사전

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with REDIS 본문

DEV/AWS

AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with REDIS

거대한 개발자 2022. 6. 27. 17:46
반응형

1. Flow

 - Redis, Batch 프로그램 사용

처리 프로세스


2. 환경 세팅

- 내용이 너무 거대해지는 것 같아서 별도로 정리하였습니다. 각 링크를 참고하시면 되겠습니다.

    2-1. Redis

 

개발(로컬) PC에 REDIS 테스트 환경 세팅

- 2022년 6월 27일 Redis for windows 는 공식적으로 지원하지 않기 때문에 개발pc에서 테스트를 하고 싶을 경우 Microsoft Store에서 Ubuntu 22.04 LTS 를 설치해서 사용할 수 있습니다. - 공식 홈페이지 Redis R..

dev-gabriel.tistory.com

    2-2. AWS S3

 

로그 데이터 처리를 위한 AWS S3 환경 구성

- 버킷 생성 : 버킷 이름 작성 후 나머지 옵션들은 그냥 기본으로 놓고 스크롤 제일 하단으로 내린 후 버킷 만들기 버튼 선택

dev-gabriel.tistory.com

    2-3. AWS Athena

 

로그 데이터 처리를 위한 AWS Athena 환경 구성

1. 데이터 원본 생성 - 콘솔을 접속해서 Athena 페이지로 이동 - 왼쪽 메뉴에서 데이터 원본 클릭 - 오른쪽에 데이터 원본 생성 버튼 클릭 ( 생성하려고 하는 데이터 원본이 AWS Glue 데이터 카탈로그

dev-gabriel.tistory.com

    2-4. AWS Glue 

 

로그 데이터 처리를 위한 AWS Glue 환경 구성

1. AWS Glue Crawler 추가 - 클롤러 추가 버튼 클릭 합니다. - 크롤러 이름을 입력 후 다음을 버튼 클릭합니다. - 크롤러 소스 타입 : 이미 테이블을 생성했기 때문에 Existing catalog tables 선택 후 다음 버..

dev-gabriel.tistory.com


3. 소스 개발

    3-1. WEB 개발

    - 유저가 기능 클릭 시 WAS에서 Redis로 log 데이터 실시간 저장 기능

/**
 * 레디스 key expire 값 세팅
 *
 * @param hour   초기화할 시간
 * @param minute 초기화할 분
 * @param second 초기화할 초
 * @return 남은 시간 : millisecond
 */
public int getRedisResetTime(int hour, int minute, int second) {
	LocalDateTime nowDate = LocalDateTime.now();        // 현재 시간
	LocalDateTime tomorrowDate = nowDate.plusDays(1);   // 내일 시간
	LocalDateTime resetDateTime = LocalDateTime.of(tomorrowDate.getYear(), tomorrowDate.getMonth().getValue(), tomorrowDate.getDayOfMonth(), hour, minute, second);  // 리셋 시간 ( 다음날 아침 8시 )
	Duration duration = Duration.between(nowDate, resetDateTime);   // 두 시간 차이 계산
	return (int) duration.getSeconds();
}

/**
* Map 형태를 json string 형태로 변경 
*/
private String convertMapToJsonString(Map<String, String> params) throws JsonProcessingException {
    return new ObjectMapper().writeValueAsString(params);
}

public void sendMessageRedis(Map<String, String> params){
    try {
        // 로그 자료 redis 저장 처리, key expire 시간 ( 새벽 1시 )
        RedisConnectionFactory redisConnectionFactory = new RedisConnectionFactory(12, getRedisResetTime(1, 0, 0));
        // 현재 날짜
        String currentDate = EgovDateUtil.getToday();
        // redis key 생성
        String statKey = redisConnectionFactory.createRedisKey(new String[]{"myKeyName", currentDate});
        // param map json string 형태로 변환
        String contents = convertMapToJsonString(params);
        // key 에 해당하는 value 유무 체크
        String chk = redisConnectionFactory.nonClusterGet(statKey);
        if (chk != null) {
            contents = System.getProperty("line.separator") + contents; // 값이 있을 경우 줄바꿈 추가
        }
    
        // redis 저장
        redisConnectionFactory.nonClusterAppend(statKey, contents);
    } catch(RedisConnectionException redisConnectionException){
        logger.error("Exception on append data to REDIS in statistics " + redisConnectionException.getMessage());
    }
}

 

    3-2. Batch 프로그램 개발

    - Redis 에서 데이터 조회 후 S3에 저장

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dbpia.util.RedisConnectionFactory;
import dbpia.util.StringUtils;
import io.lettuce.core.RedisConnectionException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class AwsS3FileUpload {    

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static S3Client s3Client;
    private final String folderName;

    public AwsS3FileUpload(String folderName){
        this.folderName = folderName;
        String accessKeyId = StringUtils.getProperty("AWS.athena.s3.bucket.accessKeyId");
        String secretAccessKey = StringUtils.getProperty("AWS.athena.s3.bucket.secretAccessKey");

        AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);

        s3Client = S3Client.builder()
            .region(Region.AP_NORTHEAST_2)
            .credentialsProvider(StaticCredentialsProvider.create(credentials))
            .build();
    }

    /**
     * 로그 데이터 조회
     * @param searchDay 로그 데이터 생성 날짜
     * @return 로그 데이터 JSON
     */
    private String getLogData(String searchDay){
        String retValue = "";

        try{
            RedisConnectionFactory factory = new RedisConnectionFactory(12);
            String statKey = factory.createRedisKey(new String[]{folderName, searchDay});
            retValue = factory.nonClusterGet(statKey);

        } catch(RedisConnectionException redisConnectionException){
            logger.error("Failed to connection to Redis server : " + redisConnectionException.getMessage());
        } catch(Exception ex){
            logger.error("AwsS3FileUpload.getLogData Exception : " + ex);
        }

        return retValue;
    }

    /**
     * log 파일 저장 위치
     * @param searchDay 검색 일자
     * @return 파일 경로
     */
    private String getFilePath(String searchDay){
        return "log/impressions/" + folderName + "/yyyymmdd=" + searchDay + "/log_impressions_" + searchDay + ".log";
    }

    /**
     * log 자료 Athena upload
     */
    public void startToFileUpload(){
        String bucketName = StringUtils.getProperty("AWS.athena.s3.bucket.name");
        String searchDay = StringUtils.getYesterday("", 1); // yyyymmdd 형태 어제 날짜 조회

        // Redis 데이터 조회
        String statJson = getLogData(searchDay);

        if(statJson != null && !statJson.isEmpty()){
            // aws s3 upload 처리
            PutObjectRequest objReq = PutObjectRequest.builder()
                .bucket(bucketName)
                .key(getFilePath(searchDay))
                .build();

            s3Client.putObject(objReq, RequestBody.fromString(statJson));
            logger.info("File upload Success to AWS S3 : " + statJson);
        } else {
            logger.error("AwsS3FileUpload.startToFileUpload : no " + folderName + " data");
        }
    }
}
public static void main(String... args){
    // Redis log data AWS S3 저장
    AwsS3FileUpload awsS3FileUpload = new AwsS3FileUpload(args[0]);
    awsS3FileUpload.startToFileUpload();
}

    3-3. AWS Glue

- 위의 2-4. AWS Glue 에서 설정한 구성대로 크롤러 실행 시 AWS S3에서 데이터를 조회해서 2-3.AWS Athena 에서 구성한 테이블로 데이터가 저장 됩니다.


4. 정리

- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 실시간으로 Redis 서버에 저장 되는 구조로 만들고 

배치 프로그램으로 다음날 새벽에 전일 데이터를 조회해서 S3로 저장하는 로직을 구현하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.

 

 

 

 

 

 

 

 

 

 

 

Comments