반응형
Notice
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- intellij
- AWS Athena
- Log
- #jQuery
- AWS Glue
- Git
- 자바8
- function
- 인텔리J
- aws S3
- Java
- ibsheet
- jQuery
- 환경구성
- 자바
- AWS SQS
- 카이호스트만
- naver smartEditor
- 환경 구성
- athena
- AWS
- 로그 데이터
- Study
- s3
- db
- 자바스크립트
- 아이비시트
- JavaScript
- aws lambda
- java8
Archives
- Today
- Total
애매한 잡학사전
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch 본문
반응형
1. Flow
- Batch 프로그램으로 별도의 시스템을 사용하지 않고 바로 S3로 데이터를 저장하는 프로세스
2. 환경 세팅
- 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.
2-1. Batch
- 일반 Java application (maven) 프로젝트 생성
2-2. AWS S3
2-3. AWS Glue
2-4. AWS Athena
3. 개발 소스
3-1. Batch 개발
- Batch 클래스
package sample.run;
import sample.processing.LogProcessing;
public class Batch {
public static void main(String... args){
LogProcessing logProcessing = new LogProcessing();
logProcessing.run();
}
}
- LogProcessing 클래스
package sample.processing;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import dbpia.aws.AwsS3FileUpload;
import dbpia.db.DatabaseManager;
import dbpia.util.StringUtils;
public class LogProcessing {
/**
* 데이터 베이스 조회
* @param query 로그 데이터 조회 쿼리
* @param migDay 로그 데이터 조회 날짜
* @return 로그 데이터 목록
*/
private List<Map<String, Object>> getDataList(String query, String migDay){
DatabaseManager dm = new DatabaseManager();
Map<String, String> param = new HashMap<>();
param.put("1", migDay); // 검색 일자
// DB 조회
return dm.select(query, param);
}
/**
* AWS S3에 저장할 파일 분할 처리
* @param paramList S3에 저장할 로그 데이터 목록
* @param uploadSection S3에 저장할 로그 데이터 구분
*/
private void awsS3PutObjectLogData(List<Map<String, Object>> paramList, String uploadSection){
int listSize = paramList.size();
// 리스트 사이즈가 listMaxSize 값 이상일 경우 파일 분할 저장
if(listSize < 300000){
new AwsS3FileUpload().run(paramList, uploadSection);
} else {
// 가용 프로세스 개수
int runtimeProcessCnt = Runtime.getRuntime().availableProcessors();
// 목록을 가용 프로세스 개수 만큼 나눠서 처리
int sizeOneOfList = listSize / runtimeProcessCnt;
// 가용 프로세스 개수 만큼 리스트 분할 처리
for(int count = 0; count < runtimeProcessCnt; count++){
// 리스트 시작
int fromIndex = count == 0 ? count:(count * sizeOneOfList);
// 리스트 마지막
int toIndex = count == (runtimeProcessCnt-1) ? listSize:((count+1) * sizeOneOfList);
List<Map<String, Object>> subList = paramList.subList(fromIndex, toIndex);
new AwsS3FileUploadSample().run(subList, uploadSection); // 파일 업로드 시작
}
}
}
/**
* log 처리 시작
*/
public void run(){
// 처리 구분 값 세팅
String section = "처리 구분";
// 하루 전 날짜 세팅
String yesterday = StringUtils.getYesterday("", 1);
// DB 조회
String query = "db 데이터 조회 쿼리";
List<Map<String, Object>> resultList = getDataList(query, yesterday);;
// AWS S3 파일 업로드 프로세스
awsS3PutObjectLogData(resultList, section);
}
}
- AwsS3FileUploadSample 클래스
package sample.processing;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dbpia.util.StringUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
public class AwsS3FileUploadSample {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final AwsBasicCredentials credentials;
public AwsS3FileUploadSample(){
String awsS3AccessKeyId = StringUtils.getProperty("AWS.s3.accessKeyId");
String awsS3SecretAccessKey = StringUtils.getProperty("AWS.s3.secretAccessKey");
this.credentials = AwsBasicCredentials.create(awsS3AccessKeyId, awsS3SecretAccessKey);
}
/**
* AWS S3 client 생성
* @return AWS S3 Client
*/
private S3Client getS3Client() {
return S3Client.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(StaticCredentialsProvider.create(this.credentials))
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
}
/**
* AWS S3 버킷 생성
* @param filePath Aws S3 File path
* @return Aws S3 Buket
*/
private PutObjectRequest getPutObjectRequest(String filePath) {
String buketNm = "S3 버킷 이름"; // S3 버킷 이름
return PutObjectRequest.builder()
.bucket(buketNm)
.key(filePath)
.build();
}
/**
* 파일 업로드 시작
* @param listMap 업로드할 파일 목록
* @param s3UploadSection 파일 업로드 구분
*/
public void run(List<Map<String, Object>> listMap, String s3UploadSection) {
try(S3Client s3Client = getS3Client()){
PutObjectRequest putObjectRequest;
RequestBody requestBody;
if(s3UploadSection.equals("로그 구분1")){
putObjectRequest = getPutObjectRequest("파일 경로");
requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
} else { // 로그 구분 2
putObjectRequest = getPutObjectRequest("파일 경로");
requestBody = RequestBody.fromString("저장할 JSON 형태의 데이터 listMap 데이터를 변환");
}
s3Client.putObject(putObjectRequest, requestBody); // AWS S3 저장
} catch (Exception ex){
logger.error("Aws S3 File Upload Exception: " + ex.getMessage());
}
}
}
4. 정리
- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 다음날 0시에 배치 프로그램이 DB를 조회해서 최소한의 파일 분할로 파티션 처리를 한 후 S3에 저장되는 로직으로 구현 하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.
'DEV > AWS' 카테고리의 다른 글
AWS Athena 사용해서 게시판 만들기 with Java, jQuery (0) | 2022.06.30 |
---|---|
AWS를 이용한 대용량 Log 데이터 처리하기 시행착오 정리 (0) | 2022.06.29 |
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch, AWS Lambda, AWS SQS (0) | 2022.06.28 |
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with AWS Lambda, AWS SQS (0) | 2022.06.28 |
로그 데이터 처리를 위한 AWS SQS 환경 구성 (0) | 2022.06.28 |
Comments