키보드워리어

[ELK] JSON 데이터를 파이썬으로 파싱 후 적재해보자. 본문

ELK

[ELK] JSON 데이터를 파이썬으로 파싱 후 적재해보자.

꽉 쥔 주먹속에 안경닦이 2023. 10. 27. 13:50
728x90

안녕하세요 블로그 방문자 여러분, 반갑습니다.

 

오늘은 데이터 파싱해보는 코드를 짜보고 적재하는 것까지 올려보려고 합니다.

 

현 상황)

9975건에 dummy.json 파일이 있다.

문제는 9975건이 모두 개행되어있지 않아 있다는 것.

 

python으로 개행시키고 적재해보자.

 

import json
import sys

#파일 확인(객체 줄 바꿔서 저장)

# 1. 파일을 전체적으로 문자열로 읽기
with open('/dummy_data.json', 'r', encoding='utf-8') as f: #utf-8로 읽기
    data = f.read()


# 2. JSON 객체를 구분하는 문자열을 찾아서 줄 바꿈 문자로 교체
data = data.replace('},{', '},\n{')

# 3. 결과 문자열을 다시 파일에 쓰기
with open('/testdata/dummy_data2.json', 'w', encoding='utf-8') as f:
    f.write(data)

# 4. 각 줄 출력
with open('/testdata/dummy_data2.json', 'r', encoding='utf-8') as f:
    for line in f:
        print(line)

#[객체 개수 확인]
with open('/testdata/dummy_data2.json', 'r', encoding='utf-8') as f:
    #문자열을 dict타입으로 저장
    json_string = f.read()
    data = json.loads(json_string, strict=False) 
    #json.loadS: JSON 형식의 문자열을 받아 해당 문자열을 파이썬 자료구조로 변환. loadS는 "Load String"의 약자
    # strict=True (기본값) 인 경우, 파서는 엄격한 JSON 규칙을 따르며, JSON 문자열 내에서 허용되지 않는 특수 문자 (예: '\x00'부터 '\x1f' 까지의 제어 문자)에 대한 오류를 발생.

if isinstance(data, list): # data가 리스트인 경우
    #data가 list 타입이면(배열형식), json.loads(data) 실행 => json배열([])을 파이썬의 리스트(list)로 변환([{}, {}, ...], 각 요소 딕셔너리)
    object_count = len(data)
else: # data는 dict 타입
    # data 내에 리스트를 찾고 그 리스트의 객체 개수 계산
    object_count = sum(len(value) for value in data.values() if isinstance(value, list))
    #data.values(): data가 딕셔너리인 경우, data.values()는 딕셔너리의 모든 값들을 dict_values 타입으로 반환
    #isinstance(value, list): dict_values가 list타입이면 리스트 요소 개수(len(values))가 sum()에 하나씩 전달

print(f"The JSON file contains {object_count} objects inside arrays.")

그러고 나서 개행시킨 json 파일을 elasticsearch에 적재해 보자.

def connect_es():
    host = '********************
    ca_certs = '/home/******/elasticsearch-8.10.3/config/certs/http_ca.crt'
    userName = '*******'
    passWord = '******'
	verify_certs=false # #verify_certs: SSL 인증서를 검증할지 여부를 지정합니다. 여기서는 False로 설정되어 있어, 인증서 검증이 비활성화되어 있습니다. 
    #이러한 설정은 개발 또는 테스트 환경에서만 사용되어야 하며, 실제 운영 환경에서는 보안 위험으로 인해 사용되어서는 안 됩니다.
    client = Elasticsearch(request_timeout=600,hosts=host, ca_certs=ca_certs, basic_auth=(userName, passWord))
    return client

es 커넥트 함수

 

# 연결 테스트----------------------------------------
response = connect_es.ping()
#ping(): Elasticsearch 클라이언트를 사용하여 Elasticsearch 서버의 핑을 검사
if response:
    print('Connection successful')
else:
    sys.exit("Connection failed")   
    
#------------------------------------------------

#[파일 열기]
with open('/dummy_data2.json', 'r', encoding='utf-8') as f:
    json_string = f.read()
    row_data = json.loads(json_string, strict=False) 
    
#[인덱스 생성]
#connect_es.indices.create(index='dummy_data_json', ignore=400)  #DeprecationWarning 뜨지만 생성됨

#[적재]
#1. 그대로 적재해보기 => 실패
#es.index(index='dummy_data_json', body=row_data)
#[실행 기록]
#413에러 코드(요청이 너무 커서 서버가 처리할 수 없다.) -> yml 파일 http.max_content_length 200mb로 변경(기본 100mb)
#-> 타임아웃 에러, 연결객체에 옵션 추가 -> gc overhead 경고 메시지 반복해서 뜨다가 팅김

#2. 데이터 나눠서 적재
from elasticsearch import helpers

#데이터 chunk로 나누기 (객체 100개씩 나눔)
def chunk_data(row_data, chunk_size): #row_data는 list
    for i in range(0, len(row_data), chunk_size):
        yield row_data[i:i + chunk_size]
        #yield: row_data의 일부분(청크)이 슬라이싱되어 yield를 통해 제너레이터 객체(이터레이터 사용 가능)로 반환 
        #-> 다시 호출될 때 마지막 상태부터 실행을 계속함

def index_data(index_name, data):
    actions = [
        {
            "_index": index_name, #_index키: 인덱스 이름
            #"_source": doc,
            "_source": {"dataList": doc}, #_source키: 데이터
        }
        for doc in data #chunk(list)의 dict 객체 개수 만큼 위의 dict객체 생성
    ]
    helpers.bulk(es, actions) #actions 리스트 적재
    #helpers.bulk(): Elasticsearch 클라이언트의 helpers 모듈에 있는 함수.
    #여러 개의 문서를 한 번에 Elasticsearch에 삽입

# 데이터 청크로 나누고 색인
chunk_size = 100
for chunk in chunk_data(row_data["dataList"], chunk_size):
    index_data("dummy_data_json", chunk)

#개수 확인 -----------------------------------------------------
#res = connect_es.count(index='dummy_data_json')
#print(f"Indexed {res['count']} documents")
#Indexed 9000 documents

결괏값 키바나에서 확인

 

[시각화 결과]


Top 15 values datalist title keword_ 9975건 생성

1-> elastic search upload 기능을 활용해서 올리려 하였으나 파싱 불가하였음.
2-> 사이즈를 수정하거나(8 버전 이후 1GB까지 가능) 파싱 된 데이터로 시도했으나 안되었음
3->이후 filebeat로 실행해보려 했으나 config에서 막혀서 동기들이 사용한 파이썬 방법으로 재시도.

3번의 시도 끝에 성공 결과를 공유드립니다.

 

9975건의 json을 kibana로 시각화
9975건의 json을 kibana로 시각화

이상  부족한 컨텐츠 읽어주셔서 감사드립니다.

내용 피드백이 있는 경우 댓글 부탁드립니다!

다음 포스팅 때 뵙겠습니다! 감사합니다 🤗

 
728x90

'ELK' 카테고리의 다른 글

[ELK] Collection Time 작업  (3) 2023.11.23
[ELK] ML 학습해보기  (2) 2023.10.29
[ELK]logstash란?  (0) 2023.10.20
[ELK] 키바나 시각화  (2) 2023.10.09
[ELK] kafka, filebeats 어떻게 설치할까?  (0) 2023.10.01