반응형
Notice
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- java8
- Java
- function
- 로그 데이터
- 자바스크립트
- ibsheet
- Git
- athena
- aws S3
- 자바
- Study
- aws lambda
- 인텔리J
- Log
- s3
- naver smartEditor
- db
- 환경구성
- 환경 구성
- AWS Athena
- JavaScript
- 자바8
- AWS
- jQuery
- #jQuery
- 카이호스트만
- AWS Glue
- intellij
- 아이비시트
- AWS SQS
Archives
- Today
- Total
애매한 잡학사전
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with REDIS 본문
반응형
1. Flow
- Redis, Batch 프로그램 사용
2. 환경 세팅
- 내용이 너무 거대해지는 것 같아서 별도로 정리하였습니다. 각 링크를 참고하시면 되겠습니다.
2-1. Redis
2-2. AWS S3
2-3. AWS Athena
2-4. AWS Glue
3. 소스 개발
3-1. WEB 개발
- 유저가 기능 클릭 시 WAS에서 Redis로 log 데이터 실시간 저장 기능
/**
* 레디스 key expire 값 세팅
*
* @param hour 초기화할 시간
* @param minute 초기화할 분
* @param second 초기화할 초
* @return 남은 시간 : millisecond
*/
public int getRedisResetTime(int hour, int minute, int second) {
LocalDateTime nowDate = LocalDateTime.now(); // 현재 시간
LocalDateTime tomorrowDate = nowDate.plusDays(1); // 내일 시간
LocalDateTime resetDateTime = LocalDateTime.of(tomorrowDate.getYear(), tomorrowDate.getMonth().getValue(), tomorrowDate.getDayOfMonth(), hour, minute, second); // 리셋 시간 ( 다음날 아침 8시 )
Duration duration = Duration.between(nowDate, resetDateTime); // 두 시간 차이 계산
return (int) duration.getSeconds();
}
/**
* Map 형태를 json string 형태로 변경
*/
private String convertMapToJsonString(Map<String, String> params) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(params);
}
public void sendMessageRedis(Map<String, String> params){
try {
// 로그 자료 redis 저장 처리, key expire 시간 ( 새벽 1시 )
RedisConnectionFactory redisConnectionFactory = new RedisConnectionFactory(12, getRedisResetTime(1, 0, 0));
// 현재 날짜
String currentDate = EgovDateUtil.getToday();
// redis key 생성
String statKey = redisConnectionFactory.createRedisKey(new String[]{"myKeyName", currentDate});
// param map json string 형태로 변환
String contents = convertMapToJsonString(params);
// key 에 해당하는 value 유무 체크
String chk = redisConnectionFactory.nonClusterGet(statKey);
if (chk != null) {
contents = System.getProperty("line.separator") + contents; // 값이 있을 경우 줄바꿈 추가
}
// redis 저장
redisConnectionFactory.nonClusterAppend(statKey, contents);
} catch(RedisConnectionException redisConnectionException){
logger.error("Exception on append data to REDIS in statistics " + redisConnectionException.getMessage());
}
}
3-2. Batch 프로그램 개발
- Redis 에서 데이터 조회 후 S3에 저장
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dbpia.util.RedisConnectionFactory;
import dbpia.util.StringUtils;
import io.lettuce.core.RedisConnectionException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
public class AwsS3FileUpload {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static S3Client s3Client;
private final String folderName;
public AwsS3FileUpload(String folderName){
this.folderName = folderName;
String accessKeyId = StringUtils.getProperty("AWS.athena.s3.bucket.accessKeyId");
String secretAccessKey = StringUtils.getProperty("AWS.athena.s3.bucket.secretAccessKey");
AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
s3Client = S3Client.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.build();
}
/**
* 로그 데이터 조회
* @param searchDay 로그 데이터 생성 날짜
* @return 로그 데이터 JSON
*/
private String getLogData(String searchDay){
String retValue = "";
try{
RedisConnectionFactory factory = new RedisConnectionFactory(12);
String statKey = factory.createRedisKey(new String[]{folderName, searchDay});
retValue = factory.nonClusterGet(statKey);
} catch(RedisConnectionException redisConnectionException){
logger.error("Failed to connection to Redis server : " + redisConnectionException.getMessage());
} catch(Exception ex){
logger.error("AwsS3FileUpload.getLogData Exception : " + ex);
}
return retValue;
}
/**
* log 파일 저장 위치
* @param searchDay 검색 일자
* @return 파일 경로
*/
private String getFilePath(String searchDay){
return "log/impressions/" + folderName + "/yyyymmdd=" + searchDay + "/log_impressions_" + searchDay + ".log";
}
/**
* log 자료 Athena upload
*/
public void startToFileUpload(){
String bucketName = StringUtils.getProperty("AWS.athena.s3.bucket.name");
String searchDay = StringUtils.getYesterday("", 1); // yyyymmdd 형태 어제 날짜 조회
// Redis 데이터 조회
String statJson = getLogData(searchDay);
if(statJson != null && !statJson.isEmpty()){
// aws s3 upload 처리
PutObjectRequest objReq = PutObjectRequest.builder()
.bucket(bucketName)
.key(getFilePath(searchDay))
.build();
s3Client.putObject(objReq, RequestBody.fromString(statJson));
logger.info("File upload Success to AWS S3 : " + statJson);
} else {
logger.error("AwsS3FileUpload.startToFileUpload : no " + folderName + " data");
}
}
}
public static void main(String... args){
// Redis log data AWS S3 저장
AwsS3FileUpload awsS3FileUpload = new AwsS3FileUpload(args[0]);
awsS3FileUpload.startToFileUpload();
}
3-3. AWS Glue
- 위의 2-4. AWS Glue 에서 설정한 구성대로 크롤러 실행 시 AWS S3에서 데이터를 조회해서 2-3.AWS Athena 에서 구성한 테이블로 데이터가 저장 됩니다.
4. 정리
- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 실시간으로 Redis 서버에 저장 되는 구조로 만들고
배치 프로그램으로 다음날 새벽에 전일 데이터를 조회해서 S3로 저장하는 로직을 구현하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.
'DEV > AWS' 카테고리의 다른 글
로그 데이터 처리를 위한 AWS SQS 환경 구성 (0) | 2022.06.28 |
---|---|
로그 데이터 처리를 위한 AWS Lambda 환경 구성 (0) | 2022.06.28 |
로그 데이터 처리를 위한 AWS Glue 환경 구성 (0) | 2022.06.27 |
로그 데이터 처리를 위한 AWS S3 환경 구성 (0) | 2022.06.27 |
로그 데이터 처리를 위한 AWS Athena 환경 구성 (0) | 2022.06.27 |
Comments