DEV/AWS
AWS Athena, Glue, S3 활용으로 로그 데이터 처리하기 with Batch, AWS Lambda, AWS SQS
거대한 개발자
2022. 6. 28. 15:33
반응형
1. Flow
- Batch, AWS SQS, AWS Lambda 사용
2. 환경 구성
- 내용이 너무 많아 별도로 정리 하였습니다. 각 링크를 참고하시면 되겠습니다.
2-1. Batch
- 일반 Java application (maven) 프로젝트 생성
2-2. AWS SQS
2-3. AWS Lambda
2-4. AWS S3
2-5. AWS Glue
2-6. AWS Athena
3. 소스 개발
3-1. Java Application 개발
public void run(int start, int end){
DatabaseManager dm = new DatabaseManager();
DatabaseQuery query = new DatabaseQuery();
Map<String, String> param = new HashMap<>();
param.put("1", StringUtils.getYesterday("", 1)); // 검색 일자 (어제 날짜)
// DB 조회
List<Map<String, Object>> resultList = dm.select(query.getDataList(), param);
AwsSqs awsSqs = new AwsSqs();
// 한 줄씩 json 형태로 변경 후 AWS SQS에 전송 처리
for(Map<String, Object> tmp : resultList){
try(SqsClient client = awsSqs.getSqsClient()){
if(awsSqs.queueUrl.isEmpty()){
awsSqs.createQueue(client, "queueName"); // 큐 생성
}
// param map json string 형태로 변환 큐에 전송
awsSqs.sendMessage(client, new ObjectMapper().writeValueAsString(tmp));
} catch (SqsException sqsException){
logger.error("AWS Simple Queue Service Exception : "
+ sqsException.getMessage());
} catch (JsonProcessingException jsonProcessingException){
logger.error("jsonProcessingException : "
+ jsonProcessingException.getMessage());
}
}
logger.info("date = " + searchDay + ", total list size = " + resultList.size() + " 종료");
}
- DatabaseManager select method
/**
* DB 조회 내용 값 세팅
* @param resultList 반환 list
* @param rs result set
* @param metaData result meta data
* @throws SQLException sql exception
*/
private void getResultList(List<Map<String, Object>> resultList, ResultSet rs
, ResultSetMetaData metaData) throws SQLException {
Map<String, Object> row;
int columnCount = metaData.getColumnCount();
while(rs.next()){
row = new HashMap<>();
for(int i=1; i <= columnCount; i++){
String catalogName = metaData.getColumnName(i);
Object rsVal = rs.getObject(i);
row.put(catalogName, rsVal);
}
resultList.add(row);
}
}
/**
* select query 실행 with Param
* @param query select query
* @param paramVal parameter
* @return 조회 내용
*/
public List<Map<String, Object>> select(String query, Map<String, String> paramVal){
List<Map<String, Object>> resultList = new ArrayList<>();
Connection conn = getConnection();
PreparedStatement stmt = null;
ResultSet rs = null;
try{
stmt = conn.prepareStatement(query);
for(int cnt = 1; cnt <= paramVal.size(); cnt++){
stmt.setString(cnt, paramVal.get(String.valueOf(cnt)));
}
rs = stmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
getResultList(resultList, rs, metaData);
} catch (SQLException sqlException) {
logger.error("select sqlException : " + sqlException.getMessage());
} catch (Exception ex){
logger.error("select Exception : " + ex.getMessage());
} finally {
close(rs, stmt, conn);
}
return resultList;
}
3-2. AWS Lambda 개발
- 아래 링크의 3.2 AWS Lambda 항목 참고
4. 정리
- WEB에서 유저가 기능 클릭할 때마다 DB에는 백업으로 저장되고, 다음날 0시에 배치 프로그램이 DB를 조회해서 한번에 Json 형태로 변경 후 SQS에 전달하려고 하였는데 데이터 용량이 큰 경우에는 지원하지 않아서 라인 1개씩 대기열(AWS SQS)에 전달해서 서버 자원과는 상관없는 구조로 나머지는 AWS 내부에서 AWS Lambda로 처리되어 S3에 저장되는 로직을 구현하였고, AWS S3의 데이터를 AWS Glue 크롤러가 조회해서 AWS Athena 테이블로 저장되는 방식으로 구현하였습니다.