-
Notifications
You must be signed in to change notification settings - Fork 2
/
load_data_into_elasticsearch.py
65 lines (53 loc) · 1.99 KB
/
load_data_into_elasticsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import csv
import json
from os.path import abspath, join, dirname, exists
import tqdm
import urllib3
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
#----------------------------------------------------------------------------------------------------
# GLOBAL VARIABLES
#----------------------------------------------------------------------------------------------------
# Source file to upload to Elasticsearch
FILE_PATH = "./liquibase-logs-1.json"
# Elastic cloud parameters
ELASTIC_PASSWORD = "*****"
CLOUD_ID = "*****"
CHUNK_SIZE = 500
INDEX_NAME = "liquibase.logs.1"
#----------------------------------------------------------------------------------------------------
# FUNCTION DEFINITIONS
#----------------------------------------------------------------------------------------------------
def data_size():
with open(FILE_PATH) as f:
return sum([1 for _ in f]) - 1
def generate_actions():
count = 0
with open(FILE_PATH, 'r') as f:
for line in f:
doc = json.loads(line)
yield doc
#----------------------------------------------------------------------------------------------------
# MAIN LOGIC
#----------------------------------------------------------------------------------------------------
def main():
print("Loading dataset...")
number_of_docs = data_size()
es = Elasticsearch(
cloud_id=CLOUD_ID,
basic_auth=("elastic", ELASTIC_PASSWORD)
)
print("Creating an index...")
es.indices.delete(index=INDEX_NAME, ignore=[400, 404])
es.indices.create(index=INDEX_NAME, ignore=400)
print("Indexing documents...")
progress = tqdm.tqdm(unit="docs", total=number_of_docs)
successes = 0
for ok, action in streaming_bulk(
client=es, index=INDEX_NAME, actions=generate_actions(), chunk_size=CHUNK_SIZE,
):
progress.update(1)
successes += ok
print("Indexed %d/%d documents" % (successes, number_of_docs))
if __name__ == "__main__":
main()