Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Dec 13, 2024
1 parent 918b769 commit b911509
Show file tree
Hide file tree
Showing 3 changed files with 354 additions and 1 deletion.
2 changes: 1 addition & 1 deletion snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def run_job(job_spec: JobSpec) -> JobStatus:
if not job_spec.is_async:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED)
job_logger.info("[runner] job execution finished")
except BaseException:
except BaseException as e:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED)
job_logger.error("[runner] job execution failed")
job_logger.info(f"[runner] exception {traceback.format_exc()}")
Expand Down
54 changes: 54 additions & 0 deletions snuba/manual_jobs/scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from datetime import datetime
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ScrubIpFromEAPSpans(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
assert isinstance(params["organization_ids"], list)
assert all([isinstance(p, int) for p in params["organization_ids"]])
self._organization_ids = params["organization_ids"]
self._start_datetime = datetime.fromisoformat(params["start_datetime"])
self._end_datetime = datetime.fromisoformat(params["end_datetime"])

def _get_query(self, cluster_name: str | None) -> str:
organization_ids = ",".join([str(p) for p in self._organization_ids])
start_datetime = self._start_datetime.isoformat()
end_datetime = self._end_datetime.isoformat()
on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else ""
return f"""ALTER TABLE eap_spans_2_local
{on_cluster}
UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)))
WHERE organization_id IN [{organization_ids}]
AND _sort_timestamp > toDateTime('{start_datetime}')
AND _sort_timestamp <= toDateTime('{end_datetime}')"""

def execute(self, logger: JobLogger) -> None:
print("isitevenexecuting")
cluster = get_cluster(StorageSetKey.EAP_SPANS)
print(1)
storage_node = cluster.get_local_nodes()[0]
print(2)
connection = cluster.get_node_connection(
ClickhouseClientSettings.CLEANUP, storage_node
)
print(3)
if not cluster.is_single_node():
cluster_name = cluster.get_clickhouse_cluster_name()
else:
cluster_name = None
print(4)
query = self._get_query(cluster_name)
print("queryyyyy", query)
logger.info("Executing query: {query}")
result = connection.execute(query=query, settings={"mutations_sync": 0})
logger.info("complete")
logger.info(repr(result))
299 changes: 299 additions & 0 deletions tests/manual_jobs/test_scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
import pytest
from snuba.datasets.processors.replays_processor import to_datetime

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_status import JobStatus
from snuba.manual_jobs.runner import get_job_status, run_job
from snuba.manual_jobs.scrub_ips_from_eap_spans import ScrubIpFromEAPSpans
from tests.helpers import write_raw_unprocessed_events
from datetime import datetime
from typing import Any, Mapping
import random
import uuid
from datetime import datetime, timedelta, timezone


import pytest
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemColumnValues,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto
from sentry_protos.snuba.v1.request_common_pb2 import (
PageToken,
RequestMeta,
ResponseMeta,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
AttributeKey,
AttributeValue,
ExtrapolationMode,
Function,
VirtualColumnContext,
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import (
ComparisonFilter,
ExistsFilter,
OrFilter,
TraceItemFilter,
)

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.endpoint_trace_item_table import (
EndpointTraceItemTable,
_apply_labels_to_columns,
)
from tests.base import BaseApiTest
from tests.helpers import write_raw_unprocessed_events

_RELEASE_TAG = "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b"
_SERVER_NAME = "D23CXQ4GK2.local"


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_basic() -> None:
job_id = "abc"
run_job(
JobSpec(
job_id,
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)

assert get_job_status(job_id) == JobStatus.FINISHED


@pytest.mark.parametrize(
("jobspec"),
[
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, "b"],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:0",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:0",
},
),
],
)
@pytest.mark.redis_db
def test_fail_validation(jobspec: JobSpec) -> None:
with pytest.raises(Exception):
run_job(jobspec)


@pytest.mark.redis_db
def test_generate_query() -> None:
job = ScrubIpFromEAPSpans(
JobSpec(
"bassa",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)
assert (
job._get_query(None)
== """ALTER TABLE eap_spans_2_local
UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)))
WHERE organization_id IN [1,3,5,6]
AND _sort_timestamp > toDateTime('2024-12-01T00:00:00')
AND _sort_timestamp <= toDateTime('2024-12-10T00:00:00')"""
)


def _gen_message(
dt: datetime,
measurements: dict[str, dict[str, float]] | None = None,
tags: dict[str, str] | None = None,
) -> Mapping[str, Any]:
measurements = measurements or {}
tags = tags or {}
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"event_id": "d826225de75d42d6b2f01b957d51f18f",
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"sentry.release": _RELEASE_TAG,
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": random.choice([1, 100, 1000])},
**measurements,
},
"organization_id": 1,
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"release": _RELEASE_TAG,
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "123456781234567D",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"server_name": _SERVER_NAME,
"spans_over_limit": "False",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}

def _generate_request(ts: Any, hour_ago: int) -> TraceItemTableRequest:
return TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480",
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[
Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="server_name")
)
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="server_name"
)
)
)
],
)

@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_span_is_scrubbed() -> None:
BASE_TIME = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - timedelta(
minutes=180
)
spans_storage = get_storage(StorageKey("eap_spans"))
start = BASE_TIME
messages = [_gen_message(start - timedelta(minutes=i)) for i in range(2)]
write_raw_unprocessed_events(spans_storage, messages)

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = _generate_request(ts, hour_ago)
response = EndpointTraceItemTable().execute(message)

expected_response = TraceItemTableResponse(
column_values=[
TraceItemColumnValues(
attribute_name="server_name",
results=[AttributeValue(val_str=_SERVER_NAME) for _ in range(2)],
)
],
page_token=PageToken(offset=2),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)
assert response == expected_response # type: ignore

run_job(
JobSpec(
"plswork",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1],
"start_datetime": str(BASE_TIME - timedelta(hours=1)),
"end_datetime": str(BASE_TIME),
},
)
)
message = _generate_request(ts, hour_ago)
response = EndpointTraceItemTable().execute(message)
print(response)
assert False

0 comments on commit b911509

Please sign in to comment.