Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COST-797: Add processing for start and end dates in manifest #2603

Merged
merged 10 commits into from
Jan 22, 2021
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ help:
@echo " @param schema - (optional) schema name. Default: 'acct10001'."
@echo " superuser create a Django super user"
@echo " unittest run unittests"
@echo " local-upload-data upload data to Ingress if it is up and running locally"
@echo ""
@echo "--- Commands using Docker Compose ---"
@echo " docker-up run docker-compose up --build -d"
Expand Down Expand Up @@ -539,6 +540,12 @@ backup-local-db-dir:
@cd - >/dev/null
$(DOCKER_COMPOSE) start db

local-upload-data:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

curl -vvvv -F "upload=@$(file);type=application/vnd.redhat.hccm.$(basename $(basename $(notdir $(file))))+tgz" \
-H "x-rh-identity: eyJpZGVudGl0eSI6IHsiYWNjb3VudF9udW1iZXIiOiAiMTIzNDUiLCAiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICI1NDMyMSJ9fX0=" \
-H "x-rh-request_id: testtesttest" \
localhost:8080/api/ingress/v1/upload


restore-local-db-dir:
@cd $(TOPDIR)
Expand Down
5 changes: 5 additions & 0 deletions koku/masu/external/kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ def summarize_manifest(report_meta):
provider_uuid = report_meta.get("provider_uuid")
schema_name = report_meta.get("schema_name")
provider_type = report_meta.get("provider_type")
start_date = report_meta.get("start")
end_date = report_meta.get("end")

with ReportManifestDBAccessor() as manifest_accesor:
if manifest_accesor.manifest_ready_for_summary(manifest_id):
Expand All @@ -504,6 +506,9 @@ def summarize_manifest(report_meta):
"provider_uuid": provider_uuid,
"manifest_id": manifest_id,
}
if start_date and end_date:
report_meta["start"] = start_date
report_meta["end"] = end_date
async_id = summarize_reports.delay([report_meta])
return async_id

Expand Down
13 changes: 9 additions & 4 deletions koku/masu/processor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,15 @@ def summarize_reports(reports_to_summarize):
# required.
with ReportManifestDBAccessor() as manifest_accesor:
if manifest_accesor.manifest_ready_for_summary(report.get("manifest_id")):
start_date = DateAccessor().today() - datetime.timedelta(days=2)
start_date = start_date.strftime("%Y-%m-%d")
end_date = DateAccessor().today().strftime("%Y-%m-%d")
if report.get("start") and report.get("end"):
LOG.info("using start and end dates from the manifest")
start_date = parser.parse(report.get("start")).strftime("%Y-%m-%d")
end_date = parser.parse(report.get("end")).strftime("%Y-%m-%d")
else:
LOG.info("generating start and end dates for manifest")
start_date = DateAccessor().today() - datetime.timedelta(days=2)
start_date = start_date.strftime("%Y-%m-%d")
end_date = DateAccessor().today().strftime("%Y-%m-%d")
LOG.info("report to summarize: %s", str(report))
update_summary_tables.delay(
report.get("schema_name"),
Expand Down Expand Up @@ -314,7 +320,6 @@ def update_summary_tables(schema_name, provider, provider_uuid, start_date, end_
LOG.info(stmt)

updater = ReportSummaryUpdater(schema_name, provider_uuid, manifest_id)

start_date, end_date = updater.update_daily_tables(start_date, end_date)
updater.update_summary_tables(start_date, end_date)

Expand Down
Binary file added koku/masu/test/data/ocp/payload2.tar.gz
Binary file not shown.
81 changes: 81 additions & 0 deletions koku/masu/test/external/test_kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ def setUp(self):
super().setUp()
logging.disable(logging.NOTSET)
payload_file = open("./koku/masu/test/data/ocp/payload.tar.gz", "rb")
payload_file_dates = open("./koku/masu/test/data/ocp/payload2.tar.gz", "rb")
bad_payload_file = open("./koku/masu/test/data/ocp/bad_payload.tar.gz", "rb")
no_manifest_file = open("./koku/masu/test/data/ocp/no_manifest.tar.gz", "rb")

self.tarball_file = payload_file.read()
payload_file.close()

self.dates_tarball = payload_file_dates.read()
payload_file_dates.close()

self.bad_tarball_file = bad_payload_file.read()
bad_payload_file.close()

Expand Down Expand Up @@ -437,6 +441,56 @@ def manifest_ready_for_summary(self, manifest_id):
msg_handler.summarize_manifest(report_meta)
mock_summarize_reports.assert_not_called()

def test_summarize_manifest_dates(self):
"""Test report summarization."""
report_meta = {
"schema_name": "test_schema",
"manifest_id": "1",
"provider_uuid": uuid.uuid4(),
"provider_type": "OCP",
"compression": "UNCOMPRESSED",
"file": "/path/to/file.csv",
"start": str(datetime.today()),
"end": str(datetime.today()),
}
expected_meta = {
"schema_name": report_meta.get("schema_name"),
"provider_type": report_meta.get("provider_type"),
"provider_uuid": report_meta.get("provider_uuid"),
"manifest_id": report_meta.get("manifest_id"),
"start": report_meta.get("start"),
"end": report_meta.get("end"),
}

class FakeManifest:
def __init__(self, num_processed_files=1, num_total_files=1):
self.num_processed_files = num_processed_files
self.num_total_files = num_total_files

def get_manifest_by_id(self, manifest_id):
return self

def manifest_ready_for_summary(self, manifest_id):
return self.num_processed_files == self.num_total_files

# Check when manifest is done
mock_manifest_accessor = FakeManifest(num_processed_files=2, num_total_files=2)

with patch("masu.external.kafka_msg_handler.ReportManifestDBAccessor") as mock_accessor:
mock_accessor.return_value.__enter__.return_value = mock_manifest_accessor
with patch("masu.external.kafka_msg_handler.summarize_reports.delay") as mock_summarize_reports:
msg_handler.summarize_manifest(report_meta)
mock_summarize_reports.assert_called_with([expected_meta])

# Check when manifest is not done
mock_manifest_accessor = FakeManifest(num_processed_files=1, num_total_files=2)

with patch("masu.external.kafka_msg_handler.ReportManifestDBAccessor") as mock_accessor:
mock_accessor.return_value.__enter__.return_value = mock_manifest_accessor
with patch("masu.external.kafka_msg_handler.summarize_reports.delay") as mock_summarize_reports:
msg_handler.summarize_manifest(report_meta)
mock_summarize_reports.assert_not_called()

def test_extract_payload(self):
"""Test to verify extracting payload is successful."""

Expand All @@ -462,6 +516,33 @@ def test_extract_payload(self):
shutil.rmtree(fake_dir)
shutil.rmtree(fake_pvc_dir)

def test_extract_payload_dates(self):
"""Test to verify extracting payload is successful."""

fake_account = {"provider_uuid": uuid.uuid4(), "provider_type": "OCP", "schema_name": "testschema"}
payload_url = "http://insights-upload.com/quarnantine/file_to_validate"
with requests_mock.mock() as m:
m.get(payload_url, content=self.dates_tarball)

fake_dir = tempfile.mkdtemp()
fake_pvc_dir = tempfile.mkdtemp()
with patch.object(Config, "INSIGHTS_LOCAL_REPORT_DIR", fake_dir):
with patch.object(Config, "TMP_DIR", fake_dir):
with patch(
"masu.external.kafka_msg_handler.get_account_from_cluster_id", return_value=fake_account
):
with patch("masu.external.kafka_msg_handler.create_manifest_entries", return_value=1):
with patch("masu.external.kafka_msg_handler.record_report_status", returns=None):
msg_handler.extract_payload(payload_url, "test_request_id")
expected_path = "{}/{}/{}/".format(
Config.INSIGHTS_LOCAL_REPORT_DIR,
"5997a261-f23e-45d1-8e01-ee3c765f3aec",
"20210101-20210201",
)
self.assertTrue(os.path.isdir(expected_path))
shutil.rmtree(fake_dir)
shutil.rmtree(fake_pvc_dir)

def test_extract_payload_no_account(self):
"""Test to verify extracting payload when no provider exists."""
payload_url = "http://insights-upload.com/quarnantine/file_to_validate"
Expand Down
13 changes: 12 additions & 1 deletion koku/masu/test/processor/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,18 @@ def test_summarize_reports_processing_list(self, mock_update_summary):
report_meta["provider_type"] = Provider.PROVIDER_OCP
report_meta["provider_uuid"] = self.ocp_test_provider_uuid
report_meta["manifest_id"] = 1
reports_to_summarize = [report_meta]

# add a report with start/end dates specified
report2_meta = {}
report2_meta["start_date"] = str(DateHelper().today)
report2_meta["schema_name"] = self.schema
report2_meta["provider_type"] = Provider.PROVIDER_OCP
report2_meta["provider_uuid"] = self.ocp_test_provider_uuid
report2_meta["manifest_id"] = 2
report2_meta["start"] = str(DateHelper().yesterday)
report2_meta["end"] = str(DateHelper().today)

reports_to_summarize = [report_meta, report2_meta]

summarize_reports(reports_to_summarize)
mock_update_summary.delay.assert_called()
Expand Down
8 changes: 7 additions & 1 deletion koku/masu/util/ocp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def get_report_details(report_directory):
payload_date: DateTime,
manifest_path: String,
uuid: String,
manifest_path: String"
manifest_path: String",
start: DateTime,
end: DateTime

"""
manifest_path = "{}/{}".format(report_directory, "manifest.json")
Expand All @@ -138,6 +140,10 @@ def get_report_details(report_directory):
payload_dict = json.load(file)
payload_dict["date"] = parser.parse(payload_dict["date"])
payload_dict["manifest_path"] = manifest_path
# parse start and end dates if in manifest
for field in ["start", "end"]:
if payload_dict.get(field):
payload_dict[field] = parser.parse(payload_dict[field])
except (OSError, IOError, KeyError) as exc:
LOG.error("Unable to extract manifest data: %s", exc)

Expand Down