-
Notifications
You must be signed in to change notification settings - Fork 18
/
recommendation_server.py
187 lines (151 loc) · 7.18 KB
/
recommendation_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import random
import time
from concurrent import futures
import grpc
from random import sample # Keep this import for the sample function
from time import sleep
import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.grpc import (
GrpcInstrumentorClient,
GrpcInstrumentorServer,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from logger import getJSONLogger
logger = getJSONLogger('recommendationservice-server', log_filename='recommendationservice.log')
worker_pool = futures.ThreadPoolExecutor(max_workers=10)
tracer = trace.get_tracer(__name__)
def get_random_wait_time(max_time, buckets):
num = 0
val = max_time / buckets
for i in range(buckets):
num += random.random() * val
return num
def sleep_random(max_time):
rnd = get_random_wait_time(max_time, 4)
time.sleep(rnd / 1000)
def mock_database_call(max_time, name, query):
with tracer.start_as_current_span(name) as span:
span.set_attribute("db.statement", query)
span.set_attribute("db.name", "recommendation")
sleep_random(max_time)
# Simulate occasional database errors
if random.random() < 0.05: # 5% chance of error
logger.error(f"Database error occurred during {name}: Connection timeout")
raise Exception("Database connection timeout")
class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
def ListRecommendations(self, request, context):
with tracer.start_as_current_span("ListRecommendationsFunction"):
try:
mock_database_call(250,
"SELECT recommendation.products",
"SELECT * FROM products WHERE category IN (?)")
span = trace.get_current_span()
span.set_attribute("app.python.active_threads", len(worker_pool._threads))
span.set_attribute("app.python.pending_pool", worker_pool._work_queue.qsize())
max_responses = 5
# Simulate occasional warnings
if random.random() < 0.1: # 10% chance of warning
logger.warning("High number of active threads detected")
# fetch list of products from product catalog stub
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
product_ids = [x.id for x in cat_response.products]
filtered_products = list(set(product_ids) - set(request.product_ids))
num_products = len(filtered_products)
num_return = min(max_responses, num_products)
# Simulate occasional errors
if random.random() < 0.05: # 5% chance of error
logger.error("Failed to fetch product list: Product catalog service unavailable")
context.set_code(grpc.StatusCode.UNAVAILABLE)
context.set_details("Product catalog service is currently unavailable")
return demo_pb2.ListRecommendationsResponse()
# sample list of indices to return
indices = sample(range(num_products), num_return)
# fetch product ids from indices
prod_list = [filtered_products[i] for i in indices]
logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
# build and return response
response = demo_pb2.ListRecommendationsResponse()
response.product_ids.extend(prod_list)
return response
except Exception as e:
logger.error(f"Unexpected error in ListRecommendations: {str(e)}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details("An unexpected error occurred")
return demo_pb2.ListRecommendationsResponse()
def Check(self, request, context):
return health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.SERVING)
def Watch(self, request, context):
return health_pb2.HealthCheckResponse(
status=health_pb2.HealthCheckResponse.UNIMPLEMENTED)
if __name__ == "__main__":
logger.info("initializing recommendationservice")
try:
# Attempt to retrieve environment variables
service_name = os.environ.get("SERVICE_NAME")
pod_ip = os.environ.get("POD_IP")
# Check if the environment variables are provided
if service_name is None:
raise ValueError("Environment variable 'SERVICE_NAME' is missing.")
if pod_ip is None:
raise ValueError("Environment variable 'POD_IP' is missing.")
# create Resource attributes used by the OpenTelemetry SDK
resource = Resource(attributes={
"service.name": service_name,
"service.version": "0.1",
"ip": pod_ip
})
except ValueError as ve:
# Log the specific error and re-raise it
logger.error(f"Error creating resource: {str(ve)}")
raise
except Exception as e:
# Catch any other unexpected errors
logger.error("Error creating resource: " + str(e))
raise
else:
# create the OTLP exporter to send data an insecure OpenTelemetry Collector
otlp_exporter = OTLPSpanExporter(
endpoint=os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT'),
insecure=True
)
# create a Trace Provider
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
# set the Trace Provider to be used by the OpenTelemetry SDK
trace.set_tracer_provider(trace_provider)
# Add OpenTelemetry auto-instrumentation hooks for gRPC client and server communications
client_instrumentor = GrpcInstrumentorClient().instrument()
server_instrumentor = GrpcInstrumentorServer().instrument()
catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
if catalog_addr == "":
raise Exception('PRODUCT_CATALOG_SERVICE_ADDR environment variable not set')
logger.info("product catalog address: " + catalog_addr)
channel = grpc.insecure_channel(catalog_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel)
server = grpc.server(worker_pool)
# add class to gRPC server
service = RecommendationService()
demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
# start server
port = os.environ.get('PORT', "8080")
logger.info("listening on port: " + port)
server.add_insecure_port('[::]:' + port)
server.start()
# keep alive
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.stop(0)