Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ingestion #592

Merged
merged 4 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions dataGeneration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The following code in this directory can be used to easily ingest data into an O

### Python

Python 3.7 or above is required
Python 3.8 or above is required

### pip

Expand All @@ -22,7 +22,11 @@ pip install -r requirements.txt

### Quick Start

In order to execute the script you must have a running OpenSearch cluster so you can supply an endpoint for the data to be ingested too. The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed throughout. The dataset created will have two categorical fields to test a multi-entity AD (of type `keyword`) and two fields that can act as the two features fields (cpuTime and jvmGcTime). These two fields are of type `double`.
In order to execute the script you must have a running OpenSearch cluster, so you can supply an endpoint for the data to be ingested too.

The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed.

The dataset created will have two categorical fields to test a multi-entity AD (`host` and `process` of type `keyword`) and two fields that can act as the two features fields (`cpuTime` and `jvmGcTime` of type `double`).

### Example Request:

Expand All @@ -43,8 +47,8 @@ In order to execute the script you must have a running OpenSearch cluster so you
| --bulk-size | Number of documents per bulk request | 3000 | No
| --ingestion-frequency | How often each respective document is indexed (in seconds) | 600 | No
| --points | Total number of points in time ingested | 1008 | No
| --number-of-host | number of 'host' entities | 1000 | No
| --number-of-process | number of 'process' entities | 1000 | No
| --number-of-host | number of 'host' entities (host is one of the categorical field that an entity is defined by) | 1000 | No
| --number-of-process | number of 'process' entities (process is one of the categorical field that an entity is defined by)| 1000 | No
| --number-of-historical-days | number of day of historical data to ingest | 2 | No
| --username | username for authentication if security is true | admin | No
| --password | password for authentication if security is true | admin | No
Expand Down
160 changes: 83 additions & 77 deletions dataGeneration/generate-cosine-data-multi-entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@
'''
Generate index INDEX_NAME
'''
def create_index(es, INDEX_NAME, shard_number):
def create_index(os, INDEX_NAME, shard_number):
# First, delete the index if it exists
print("Deleting index if it exists...")
es.indices.delete(index=INDEX_NAME, ignore=[400, 404])
os.indices.delete(index=INDEX_NAME, ignore=[400, 404])

# Next, create the index
print("Creating index \"{}\"...".format(INDEX_NAME))
Expand Down Expand Up @@ -112,9 +112,7 @@ def create_index(es, INDEX_NAME, shard_number):
}
}
}

es.indices.create(index=INDEX_NAME, body=request_body)

os.indices.create(index=INDEX_NAME, body=request_body)

'''
Posts a document(s) to the index
Expand Down Expand Up @@ -154,62 +152,66 @@ def post_log_stream(index_value, time_intervals, sample_per_interval, max_number
j = (int)(min_number / service_number)
index = j * service_number - 1

try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries = 0
while keep_loop and retries < 10 and j < host_number:
try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries += 1
client[thread_index] = create_client(SECURITY, URL)

def split(a, n):
k, m = divmod(len(a), n)
Expand All @@ -228,46 +230,51 @@ def create_cosine(total_entities, base_dimension, period, amplitude):
return cosine_param

'''
Main entry method for script
Create OpenSearch client
'''
def main():
global client
if SECURITY and URL.strip() == 'localhost':
for i in range(0, THREADS):
client.append(OpenSearch(
def create_client(security, URL):
if security and URL.strip() == 'localhost':
return OpenSearch(
hosts=[URL],
use_ssl=True,
verify_certs=False,
http_auth=(USERNAME, PASSWORD),
scheme="https",
connection_class=RequestsHttpConnection
))
elif SECURITY:
for i in range(0, THREADS):
client.append(OpenSearch(
)
elif security:
return OpenSearch(
hosts=[{'host': URL, 'port': 443}],
use_ssl=True,
verify_certs=False,
http_auth=(USERNAME, PASSWORD),
scheme="https",
port=443,
connection_class=RequestsHttpConnection
))
)
elif URL.strip() == 'localhost':
for i in range(0, THREADS):
client.append(OpenSearch(
return OpenSearch(
hosts=[{'host': URL, 'port': 9200}],
use_ssl=False,
verify_certs=False,
connection_class=RequestsHttpConnection
))
)
else:
es = OpenSearch(
return OpenSearch(
hosts=[{'host': URL, 'port': 80}],
use_ssl=False,
verify_certs=False,
connection_class=RequestsHttpConnection
)

'''
Main entry method for script
'''
def main():
global client
for i in range(0, THREADS):
client.append(create_client(SECURITY, URL))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only saw you used the refactored create_client code, not anything else I sent to you. Does my fix solve the dropping entity issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I actually used the wrong file to ingest data, not the one with adding more retry logic in a loop. It worked for ingesting 1b documents, 7 days of 1440 points a day. I will try using the right file now with more retry logic for 14 days of 1440 points a day


create_index(client[0], INDEX_NAME, SHARD_NUMBER)

total_entities = HOST_NUMBER * PROCESS_NUMBER
Expand All @@ -291,4 +298,3 @@ def main():

if __name__ == "__main__":
main()

6 changes: 3 additions & 3 deletions dataGeneration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
numpy==1.21.2
opensearch_py==1.1.0
numpy==1.23.0
opensearch_py==2.0.0
retry==0.9.2
scipy==1.7.1
scipy==1.8.0
urllib3==1.26.9