DEV/AWS
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch
거대한 개발자
2022. 6. 28. 17:00
반응형
1. Flow
- Batch 프로그램으로 별도의 시스템을 사용하지 않고 바로 S3로 데이터를 저장하는 프로세스
2. 환경 세팅
- 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.
2-1. Batch
- 일반 Java application (maven) 프로젝트 생성
2-2. AWS S3
2-3. AWS Glue
2-4. AWS Athena
3. 개발 소스
3-1. Batch 개발
- Batch 클래스
package sample.run;
import sample.processing.LogProcessing;
public class Batch {
public static void main(String... args){
LogProcessing logProcessing = new LogProcessing();
logProcessing.run();
}
}
- LogProcessing 클래스
package sample.processing;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import dbpia.aws.AwsS3FileUpload;
import dbpia.db.DatabaseManager;
import dbpia.util.StringUtils;
public class LogProcessing {
/**
* 데이터 베이스 조회
* @param query 로그 데이터 조회 쿼리
* @param migDay 로그 데이터 조회 날짜
* @return 로그 데이터 목록
*/
private List<Map<String, Object>> getDataList(String query, String migDay){
DatabaseManager dm = new DatabaseManager();
Map<String, String> param = new HashMap<>();
param.put("1", migDay); // 검색 일자
// DB 조회
return dm.select(query, param);
}
/**
* AWS S3에 저장할 파일 분할 처리
* @param paramList S3에 저장할 로그 데이터 목록
* @param uploadSection S3에 저장할 로그 데이터 구분
*/
private void awsS3PutObjectLogData(List<Map<String, Object>> paramList, String uploadSection){
int listSize = paramList.size();
// 리스트 사이즈가 listMaxSize 값 이상일 경우 파일 분할 저장
if(listSize < 300000){
new AwsS3FileUpload().run(paramList, uploadSection);
} else {
// 가용 프로세스 개수
int runtimeProcessCnt = Runtime.getRuntime().availableProcessors();
// 목록을 가용 프로세스 개수 만큼 나눠서 처리
int sizeOneOfList = listSize / runtimeProcessCnt;
// 가용 프로세스 개수 만큼 리스트 분할 처리
for(int count = 0; count < runtimeProcessCnt; count++){
// 리스트 시작
int fromIndex = count == 0 ? count:(count * sizeOneOfList);
// 리스트 마지막
int toIndex = count == (runtimeProcessCnt-1) ? listSize:((count+1) * sizeOneOfList);
List<Map<String, Object>> subList = paramList.subList(fromIndex, toIndex);
new AwsS3FileUploadSample().run(subList, uploadSection); // 파일 업로드 시작
}
}
}
/**
* log 처리 시작
*/
public void run(){
// 처리 구분 값 세팅
String section = "처리 구분";
// 하루 전 날짜 세팅
String yesterday = StringUtils.getYesterday("", 1);
// DB 조회
String query = "db 데이터 조회 쿼리";
List<Map<String, Object>> resultList = getDataList(query, yesterday);;
// AWS S3 파일 업로드 프로세스
awsS3PutObjectLogData(resultList, section);
}
}
- AwsS3FileUploadSample 클래스
package sample.processing;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dbpia.util.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
public class AwsS3FileUploadSample {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final AwsBasicCredentials credentials;
public AwsS3FileUploadSample(){
String awsS3AccessKeyId = StringUtils.getProperty("AWS.s3.accessKeyId");
String awsS3SecretAccessKey = StringUtils.getProperty("AWS.s3.secretAccessKey");
this.credentials = AwsBasicCredentials.create(awsS3AccessKeyId, awsS3SecretAccessKey);
}
/**
* AWS S3 client 생성
* @return AWS S3 Client
*/
private S3Client getS3Client() {
return S3Client.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(StaticCredentialsProvider.create(this.credentials))
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
}
/**
* AWS S3 버킷 생성
* @param filePath Aws S3 File path
* @return Aws S3 Buket
*/
private PutObjectRequest getPutObjectRequest(String filePath) {
String buketNm = "S3 버킷 이름"; // S3 버킷 이름
return PutObjectRequest.builder()
.bucket(buketNm)
.key(filePath)
.build();
}
/**
* 파일 업로드 시작
* @param listMap 업로드할 파일 목록
* @param s3UploadSection 파일 업로드 구분
*/
public void run(List<Map<String, Object>> listMap, String s3UploadSection) {
try(S3Client s3Client = getS3Client()){
PutObjectRequest putObjectRequest;
RequestBody requestBody;
if(s3UploadSection.equals("로그 구분1")){
putObjectRequest = getPutObjectRequest("파일 경로");
requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
} else { // 로그 구분 2
putObjectRequest = getPutObjectRequest("파일 경로");
requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
}
s3Client.putObject(putObjectRequest, requestBody); // AWS S3 저장
} catch (Exception ex){
logger.error("Aws S3 File Upload Exception: " + ex.getMessage());
}
}
}
4. 정리
- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 다음날 0시에 배치 프로그램이 DB를 조회해서 최소한의 파일 분할로 파티션 처리를 한 후 S3에 저장되는 로직으로 구현 하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.