DEV/AWS

AWS Athena 사용해서 게시판 만들기 with Java, jQuery

거대한 개발자 2022. 6. 30. 11:26
반응형

개요

현재 시스템이 운영 DB와 통게, 로그 데이터가 한 곳에 있어 사용자가 많을 때는 통계나 로그 데이터를 활용할 수 없는 상황이었습니다. 그래서 AWS Athena를 적용 시켜 로그, 통계 데이터를 이관 처리를 하고 Web에서 활용할 수 있는 방법을 구현 하였고, 만족할만한 결과가 나왔습니다. 혹시 환경 구성이 궁금하시면 하단의 링크를 통해서 확인할 수 있습니다. 

그래서 이 블로그에 작성할 내용은 Java 에서 AWS Athena로 Query를 실행시켜 간단한 게시판을 만들어 보려고 합니다.


1. DataSearchInAthena 클래스

    1) AWS 접근 권한

        - AWS Athena accessKeyId 와 secretAccessKey 를 발급 받아서 AWS 접근할 수 있는 권한을 생성합니다.

/**
 * AWS Athena 접근 권한
 * @return AwsBasicCredentials
 */
private AwsBasicCredentials getAwsBasicCredentials() {
    final String awsAccessKeyId = "accessKeyId";
    final String awsSecretAccessKey = "secretAccessKey";
    return AwsBasicCredentials.create(awsAccessKeyId, awsSecretAccessKey);
}

    2) AWS Athena client

        - 위의 자격 증명으로 AWS Athena client 를 생성할 수 있는 메소드를 생성합니다.

/**
 * AWS Athena client 생성
 * @return AthenaClient
 */
private AthenaClient getAthenaClient(){
    return AthenaClient.builder()
        .credentialsProvider(StaticCredentialsProvider.create(getAwsBasicCredentials()))
        .region(Region.AP_NORTHEAST_2)
        .build();
}

    3) 쿼리 실행 후 실행ID 생성

        - 쿼리 실행을 요청하고 실행에 대한 id 를 반환하는 메소드를 생성합니다.

/**
 * 쿼리 실행 후 실행 id 반환
 * @param client athena client
 * @param athenaQuery 실행 쿼리
 * @return 실행 id
 */
private String getQueryExecutionId(AthenaClient client, String athenaQuery){
    StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
        .queryString(athenaQuery)
        .queryExecutionContext(QueryExecutionContext.builder().database(ATHENA_DATABASE).build())
        .resultConfiguration(ResultConfiguration.builder().outputLocation(ATHENA_OUTPUT_BUCKET).build())
        .build();
    return client.startQueryExecution(startQueryExecutionRequest).queryExecutionId();
}

    4) 쿼리 실행 상태 체크

        - AWS Athena 쿼리 실행 후 상태를 제어할 수 있는 메소드를 생성합니다.

/**
 * AWS Athena 쿼리 실행 완료까지 대기하거나 취소 메소드
 * @param athenaClient AthenaClient
 * @param queryExecutionId String
 * @throws InterruptedException interrupted exception
 */
private void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) 
    throws InterruptedException {
    GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
        .queryExecutionId(queryExecutionId)
        .build();

    GetQueryExecutionResponse getQueryExecutionResponse;

    boolean isQueryStillRunning = true;
    
    // Wait for an Amazon Athena query to complete, fail or to be cancelled
    while (isQueryStillRunning) {
        getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
        String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();

        if (queryState.equals(QueryExecutionState.FAILED.toString())) {
            throw new RuntimeException("The Amazon Athena query failed to run with error message: "
                + getQueryExecutionResponse.queryExecution().status().stateChangeReason());

        } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
            throw new RuntimeException("The Amazon Athena query was cancelled.");

        } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
            isQueryStillRunning = false;

        } else {
            // Sleep an amount of time before retrying again
            long SLEEP_AMOUNT_IN_MS = 1000;
            Thread.sleep(SLEEP_AMOUNT_IN_MS);
        }
    }
}

    5) 쿼리 실행 후 결과

        - 쿼리 실행 후 결과 값을 반환할 메소드를 작성합니다.

/**
 * Query 결과 가져오기
 * @param athenaClient Athena Client
 * @param queryExecutionId Query Id
 * @return GetQueryResultsIterable
 * @throws AthenaException athena exception
 */
private GetQueryResultsIterable getQueryResults(AthenaClient athenaClient, String queryExecutionId)
    throws AthenaException {

    return athenaClient.getQueryResultsPaginator(GetQueryResultsRequest.builder()
        .queryExecutionId(queryExecutionId).build());
}

    6) 쿼리 실행 결과를 List Map 형태로 변경

        - 쿼리 실행 결과는 GetQueryResultsIterable 형태로 반환 되는데 이를 List map 형태로 변경할 메소드를 생성합니다.

/**
 * 쿼리 조회 후 결과 list map 처리
 * @param client Athena client
 * @param queryExecutionId query id
 * @return 결과 list map
 * @throws AthenaException athena exception
 */
private List<Map<String, String>> getListMapProcess(AthenaClient client, String queryExecutionId) 
    throws AthenaException {

    List<ColumnInfo> columnInfoList = null;    // 컬럼 정보 리스트
    List<Map<String, String>> list = new ArrayList<>();    // 데이터 값 저장 리스트
    String[] columns = null;    // 컬럼 정보 리스트에서 컬럼명만 가져오기 위한 변수
    // 쿼리 처리 후 결과
    GetQueryResultsIterable getQueryResultsIterable = getQueryResults(client, queryExecutionId);
    // 쿼리 결과 list map 처리
    for (GetQueryResultsResponse result : getQueryResultsIterable) {
        // 컬럼명 생성 ( 1번만 처리 )
        if(columnInfoList == null){
            columnInfoList = result.resultSet().resultSetMetadata().columnInfo();
            columns = new String[Objects.requireNonNull(columnInfoList).size()];
            int cnt = 0;
            for(ColumnInfo col : columnInfoList){
                columns[cnt] = col.name();
                cnt++;
            }
        }
        // 데이터 정보 저장
        List<Row> row = result.resultSet().rows();    // 데이터 정보 리스트
        Map<String, String> map;
        for (Row myRow : row) {
            int valueCnt = 0;
            map = new HashMap<>();
            for (Datum data : myRow.data()) {
                map.put(columns[valueCnt], data.varCharValue());
                valueCnt++;
            }
            list.add(map);
        }
    }
    list.remove(0); // 컬럼명 삭제 처리
    return list;
}

    7) 데이터 목록 조회

        - 위의 메소드 들을 기반으로 AWS Athena 데이터 조회를 위한 메소드를 생성합니다.

/**
 * 목록 조회
 * @param query 목록 조회 쿼리
 * @return 조회 목록
 */
public List<Map<String, String>> getListMap(String query){
    List<Map<String, String>> resultListMap = null;

    try (AthenaClient client = getAthenaClient()) {
        // 쿼리 실행 후 실행id 값 세팅
        String queryExecutionId = getQueryExecutionId(client, query);
        // athena 쿼리 실행 완료까지 대기
        waitForQueryToComplete(client, queryExecutionId);
        // 쿼리 실행 후 결과
        resultListMap = getListMapProcess(client, queryExecutionId);
    } catch(Exception ex){
        ex.printStackTrace();
    }
    return resultListMap;
}

2. 쿼리문 작성 및 Athena 호출

- 조회 쿼리 작성 후 DataSearchInAthena 객체 생성 후 조회 메소드 호출 합니다.

- s3BucketUrl 변수는 Athena 실행 결과를 저장할 S3 경로를 지정합니다.

/**
 * list map 을 json 형태로 변경
 * @param params list
 * @return json string
 */
private String convertListMapToJson(List<Map<String, String>> params) {
    String jsonStr = "";
    try {
        // JSON 형태로 변환
        ObjectMapper objectMapper = new ObjectMapper();
        jsonStr = objectMapper.writeValueAsString(params);
    } catch (JsonProcessingException jpe){
        logger.error("convertListMapToJson JsonProcessingException ::: " + jpe.getMessage());
    }
    return jsonStr;
}

/**
 * AWS Athena DB 에서 목록 조회
 * @param params 검색 조건
 * @return json 형태의 데이터 목록
 */
public String getDataList(Map<String, String> params){
    String s3BucketUrl = "Athena output s3 파일 경로";
    // AWS Athena 연결
    DataSearchInAthena athena = new DataSearchInAthena("DB명", s3BucketUrl);
    String query =
        "select row_number() over(order by user_name, user_age) as rowNum\n"
            + " user_name as usrName\n"
            + " user_age as usrAge\n"
            + " user_sex as usrSex\n" 
            + " from athena_database_table\n";
    return convertListMapToJson(athena.getListMap(query));
}

- DB 명은 Athena Database 명을 입력합니다.

AWS Athena 데이터 베이스


3. Controller 클래스

- Controller 클래스 내부에 목록 조회 메소드를 생성합니다. 

- 목록 조회 시 View 화면에서 한글이 깨지는 현상이 발생할 경우 produces 옵션을 추가하면 정상적으로 출력 됩니다.

/**
 * 목록 조회
 * @param params 검색 조건
 * @return json 형태의 데이터 목록
 */
@ResponseBody
@RequestMapping(value="/search/srhList.do", 
                produces="application/x-www-form-urlencoded; charset=UTF-8")
public String searchDataList(@RequestParam Map<String, String> params){
    return searchService.getDataList(params);
}

4. JSP

- 간단하게 출력할 목록 화면을 작성하고 jquery ajax 로 목록 조회 controller의 메소드를 호출합니다.

<%@ page contentType="text/html;charset=UTF-8" pageEncoding="utf-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<html>
<head>
    <title>AWS Athena 목록 조회</title>
</head>
<body>
    <div>
        <form name="srhForm">
            <button type="button" name="btnSrh">검색</button>
        </form>
    </div>
    <table>
        <thead>
            <tr>
                <th>No</th>
                <th>이름</th>
                <th>나이</th>
                <th>성별</th>
            </tr>
        </thead>
        <tbody id="tbodyList">
            <tr>
                <td colspan="4">조회된 데이터가 없습니다.</td>
            </tr>
        </tbody>
    </table>

    <%-- Progress bar --%>
    <div id="progressBar" 
         style="position: fixed; top:0; left: 0; bottom: 0; right: 0;
         background: rgba(163, 163, 163, 0.6);display:none;z-index:2000">
        <div style="position: absolute;top: calc(50vh - 100px); left: calc(50vw - 200px);
        background-color: white;display: flex; justify-content: center;align-items: center;
        border-radius: 10px;width: 400px;height: 100px;">
            처리 중.......
        </div>
    </div>

    <script type="text/javascript" src="<c:url value="/js/jquery-1.12.4.min.js"/>"></script>
    <script type="text/javascript">
        <%-- 목록 조회 --%>
        var srhList = function(){
            var params = {};

            $.ajax({
                url: '/search/srhList.do',
                type: 'POST',
                dataType: 'json',
                data:params,
                async: true,
                beforeSend: function(){
                    $('#progressBar').show();
                },
                success: function(data){
                    var tbody = $('#tbodyList');
                    var contents = '';

                    $.each(data, function(idx, val){
                        contents += '<tr>';
                        contents += '<td>'+val.rowNum+'</td>';
                        contents += '<td>'+val.usrName+'</td>';
                        contents += '<td>'+val.usrAge+'</td>';
                        contents += '<td>'+val.usrSex+'</td>';
                        contents += '</tr>';
                    });

                    if(contents !== ''){
                        tbody.empty();
                        tbody.append(contents);
                    }
                },
                error: function(data){
                    console.log('error : ', data);
                },
                complete: function(){
                    $('#progressBar').hide();
                }
            });
        };

        <%-- 검색 버튼 클릭 시 목록 조회 처리 --%>
        $(document).on('click', 'button[name=btnSrh]', function(){
            srhList();
        });
    </script>
</body>
</html>

5. 정리

- 이상으로 AWS Athena 의 DB를 조회하여 목록을 출력하는 기능을 구현해 보았습니다. 

다음 블로그는 이 목록에서 페이징 처리를 추가하는 방법을 정리해 보겠습니다. 


※ 참고

 

로그 데이터 처리를 위한 AWS Athena 환경 구성

1. 데이터 원본 생성 - 콘솔을 접속해서 Athena 페이지로 이동 - 왼쪽 메뉴에서 데이터 원본 클릭 - 오른쪽에 데이터 원본 생성 버튼 클릭 ( 생성하려고 하는 데이터 원본이 AWS Glue 데이터 카탈로그

dev-gabriel.tistory.com

 

 

Amazon Athena란 무엇인가요? - Amazon Athena

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com