Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- 이클립스
- 배열
- 리눅스
- Java
- 스프링
- 자바
- 자바기초
- 알고리즘
- 스프링 기초
- 메소드
- 기초코딩
- 프로그래밍
- 제이쉘
- Elk
- eclips
- spring
- 자바프로그래밍
- JAVA기초
- 데이터베이스
- 초보코딩
- JShell
- github
- 프로그래밍기초
- 컴퓨터과학개론
- 자바 스프링
- 프로그래밍언어
- 클래스
- Git
- 코딩초보
- 초보코딩탈출
Archives
- Today
- Total
키보드워리어
[ELK] JSON 데이터를 파이썬으로 파싱 후 적재해보자. 본문
728x90
안녕하세요 블로그 방문자 여러분, 반갑습니다.
오늘은 데이터 파싱해보는 코드를 짜보고 적재하는 것까지 올려보려고 합니다.
현 상황)
9975건에 dummy.json 파일이 있다.
문제는 9975건이 모두 개행되어있지 않아 있다는 것.
python으로 개행시키고 적재해보자.
import json
import sys
#파일 확인(객체 줄 바꿔서 저장)
# 1. 파일을 전체적으로 문자열로 읽기
with open('/dummy_data.json', 'r', encoding='utf-8') as f: #utf-8로 읽기
data = f.read()
# 2. JSON 객체를 구분하는 문자열을 찾아서 줄 바꿈 문자로 교체
data = data.replace('},{', '},\n{')
# 3. 결과 문자열을 다시 파일에 쓰기
with open('/testdata/dummy_data2.json', 'w', encoding='utf-8') as f:
f.write(data)
# 4. 각 줄 출력
with open('/testdata/dummy_data2.json', 'r', encoding='utf-8') as f:
for line in f:
print(line)
#[객체 개수 확인]
with open('/testdata/dummy_data2.json', 'r', encoding='utf-8') as f:
#문자열을 dict타입으로 저장
json_string = f.read()
data = json.loads(json_string, strict=False)
#json.loadS: JSON 형식의 문자열을 받아 해당 문자열을 파이썬 자료구조로 변환. loadS는 "Load String"의 약자
# strict=True (기본값) 인 경우, 파서는 엄격한 JSON 규칙을 따르며, JSON 문자열 내에서 허용되지 않는 특수 문자 (예: '\x00'부터 '\x1f' 까지의 제어 문자)에 대한 오류를 발생.
if isinstance(data, list): # data가 리스트인 경우
#data가 list 타입이면(배열형식), json.loads(data) 실행 => json배열([])을 파이썬의 리스트(list)로 변환([{}, {}, ...], 각 요소 딕셔너리)
object_count = len(data)
else: # data는 dict 타입
# data 내에 리스트를 찾고 그 리스트의 객체 개수 계산
object_count = sum(len(value) for value in data.values() if isinstance(value, list))
#data.values(): data가 딕셔너리인 경우, data.values()는 딕셔너리의 모든 값들을 dict_values 타입으로 반환
#isinstance(value, list): dict_values가 list타입이면 리스트 요소 개수(len(values))가 sum()에 하나씩 전달
print(f"The JSON file contains {object_count} objects inside arrays.")
그러고 나서 개행시킨 json 파일을 elasticsearch에 적재해 보자.
def connect_es():
host = '********************
ca_certs = '/home/******/elasticsearch-8.10.3/config/certs/http_ca.crt'
userName = '*******'
passWord = '******'
verify_certs=false # #verify_certs: SSL 인증서를 검증할지 여부를 지정합니다. 여기서는 False로 설정되어 있어, 인증서 검증이 비활성화되어 있습니다.
#이러한 설정은 개발 또는 테스트 환경에서만 사용되어야 하며, 실제 운영 환경에서는 보안 위험으로 인해 사용되어서는 안 됩니다.
client = Elasticsearch(request_timeout=600,hosts=host, ca_certs=ca_certs, basic_auth=(userName, passWord))
return client
es 커넥트 함수
# 연결 테스트----------------------------------------
response = connect_es.ping()
#ping(): Elasticsearch 클라이언트를 사용하여 Elasticsearch 서버의 핑을 검사
if response:
print('Connection successful')
else:
sys.exit("Connection failed")
#------------------------------------------------
#[파일 열기]
with open('/dummy_data2.json', 'r', encoding='utf-8') as f:
json_string = f.read()
row_data = json.loads(json_string, strict=False)
#[인덱스 생성]
#connect_es.indices.create(index='dummy_data_json', ignore=400) #DeprecationWarning 뜨지만 생성됨
#[적재]
#1. 그대로 적재해보기 => 실패
#es.index(index='dummy_data_json', body=row_data)
#[실행 기록]
#413에러 코드(요청이 너무 커서 서버가 처리할 수 없다.) -> yml 파일 http.max_content_length 200mb로 변경(기본 100mb)
#-> 타임아웃 에러, 연결객체에 옵션 추가 -> gc overhead 경고 메시지 반복해서 뜨다가 팅김
#2. 데이터 나눠서 적재
from elasticsearch import helpers
#데이터 chunk로 나누기 (객체 100개씩 나눔)
def chunk_data(row_data, chunk_size): #row_data는 list
for i in range(0, len(row_data), chunk_size):
yield row_data[i:i + chunk_size]
#yield: row_data의 일부분(청크)이 슬라이싱되어 yield를 통해 제너레이터 객체(이터레이터 사용 가능)로 반환
#-> 다시 호출될 때 마지막 상태부터 실행을 계속함
def index_data(index_name, data):
actions = [
{
"_index": index_name, #_index키: 인덱스 이름
#"_source": doc,
"_source": {"dataList": doc}, #_source키: 데이터
}
for doc in data #chunk(list)의 dict 객체 개수 만큼 위의 dict객체 생성
]
helpers.bulk(es, actions) #actions 리스트 적재
#helpers.bulk(): Elasticsearch 클라이언트의 helpers 모듈에 있는 함수.
#여러 개의 문서를 한 번에 Elasticsearch에 삽입
# 데이터 청크로 나누고 색인
chunk_size = 100
for chunk in chunk_data(row_data["dataList"], chunk_size):
index_data("dummy_data_json", chunk)
#개수 확인 -----------------------------------------------------
#res = connect_es.count(index='dummy_data_json')
#print(f"Indexed {res['count']} documents")
#Indexed 9000 documents
결괏값 키바나에서 확인
[시각화 결과]
Top 15 values datalist title keword_ 9975건 생성
1-> elastic search upload 기능을 활용해서 올리려 하였으나 파싱 불가하였음.
2-> 사이즈를 수정하거나(8 버전 이후 1GB까지 가능) 파싱 된 데이터로 시도했으나 안되었음
3->이후 filebeat로 실행해보려 했으나 config에서 막혀서 동기들이 사용한 파이썬 방법으로 재시도.
3번의 시도 끝에 성공 결과를 공유드립니다.
이상 부족한 컨텐츠 읽어주셔서 감사드립니다.
내용 피드백이 있는 경우 댓글 부탁드립니다!
다음 포스팅 때 뵙겠습니다! 감사합니다 🤗
728x90
'ELK' 카테고리의 다른 글
[ELK] Collection Time 작업 (3) | 2023.11.23 |
---|---|
[ELK] ML 학습해보기 (2) | 2023.10.29 |
[ELK]logstash란? (0) | 2023.10.20 |
[ELK] 키바나 시각화 (2) | 2023.10.09 |
[ELK] kafka, filebeats 어떻게 설치할까? (0) | 2023.10.01 |