-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsecurityControl.py
151 lines (127 loc) · 5.36 KB
/
securityControl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import time
from multiprocessing import Process
from dataclasses import dataclass
from consts import (
MAX_CONTROL_PASSES,
SECURITY_CHECKED_FILE,
SECURITY_REJECTED_FILE,
MESSAGES,
LOCATIONS,
SECURITY_CHECKPOINTS_COUNT,
MAX_PASSENGERS_PER_CHECKPOINT,
)
from utils import (
ensure_files_exists,
read_passengers,
save_passengers,
timestamp,
append_passenger,
log,
terminate_process,
)
from luggageControl import get_file_name
class SecurityStation:
def __init__(self, station_id):
self.station_id = station_id
self.current_passengers = []
self.current_gender = None
self.file_name = get_file_name(station_id)
def can_add_passenger(self, passenger):
"""Sprawdź czy można dodać pasażera do stanowiska"""
if not self.current_passengers:
return True
return (
len(self.current_passengers) < MAX_PASSENGERS_PER_CHECKPOINT
and self.current_gender == passenger["gender"]
)
def add_passenger(self, passenger):
"""Dodaj pasażera do stanowiska"""
if not self.current_passengers:
self.current_gender = passenger["gender"]
self.current_passengers.append(passenger)
def requires_skip(self, passenger, waiting_queue):
"""Wymaga pominięcia pasażera"""
skip = waiting_queue[0]["id"] != passenger["id"]
if skip:
waiting_queue[0]["controlPassed"] += 1
return waiting_queue
def control(self):
"""Kontrola pasażerów na stanowisku"""
for passenger in self.current_passengers:
log(
f"{timestamp()} - {LOCATIONS.SECURITY}: {MESSAGES.SECURITY_CONTROL_BEGIN}"
)
if passenger["hasDangerousItems"]:
log(
f"{timestamp()} - {LOCATIONS.SECURITY}: Stanowisko {self.station_id}: Pasażer ID={passenger['id']} "
f"(przepuścił: {passenger.get('controlPassed', 0)} osób) Płeć={passenger['gender']} {MESSAGES.SECURITY_CONTROL_REJECT}"
)
append_passenger(SECURITY_REJECTED_FILE, passenger)
terminate_process(int(passenger["id"]), "security")
else:
log(
f"{timestamp()} - {LOCATIONS.SECURITY}: Stanowisko {self.station_id}: Pasażer ID={passenger['id']} "
f"(przepuścił: {passenger.get('controlPassed', 0)} osób) Płeć={passenger['gender']} {MESSAGES.SECURITY_CONTROL_OK}"
)
append_passenger(SECURITY_CHECKED_FILE, passenger)
self.current_passengers = []
self.current_gender = None
def get_next_passengers(self):
"""Wybierz pasażerów do kontroli"""
waiting_queue = read_passengers(self.file_name)
waiting_vips = [p for p in waiting_queue if p["isVIP"]]
passengers_with_skip_limit = [
p for p in waiting_queue if p["controlPassed"] >= MAX_CONTROL_PASSES
]
remaining_passengers = [
p
for p in waiting_queue
if p not in waiting_vips and p not in passengers_with_skip_limit
]
for passenger in passengers_with_skip_limit:
if self.can_add_passenger(passenger):
self.add_passenger(passenger)
waiting_queue.remove(passenger)
if len(self.current_passengers) == MAX_PASSENGERS_PER_CHECKPOINT:
break
for passenger in waiting_vips:
if self.can_add_passenger(passenger):
waiting_queue = self.requires_skip(passenger, waiting_queue)
self.add_passenger(passenger)
waiting_queue.remove(passenger)
if len(self.current_passengers) == MAX_PASSENGERS_PER_CHECKPOINT:
break
for passenger in remaining_passengers:
if self.can_add_passenger(passenger):
waiting_queue = self.requires_skip(passenger, waiting_queue)
self.add_passenger(passenger)
waiting_queue.remove(passenger)
if len(self.current_passengers) == MAX_PASSENGERS_PER_CHECKPOINT:
break
if self.current_passengers:
self.control()
save_passengers(self.file_name, waiting_queue)
class SecurityCheckpoint:
def __init__(self, num_stations: int = SECURITY_CHECKPOINTS_COUNT):
self.stations = [SecurityStation(i) for i in range(num_stations)]
def process_passengers(checkpoint: SecurityCheckpoint):
"""Główna funkcja przetwarzająca pasażerów na stanowiskach kontroli bezpieczeństwa"""
for station in checkpoint.stations:
station.get_next_passengers()
def check_security_continuously():
"""Główna pętla kontroli bezpieczeństwa, sprawdza czy pliki istnieją i przetwarza pasażerów w nieskończoność"""
ensure_files_exists([SECURITY_CHECKED_FILE, SECURITY_REJECTED_FILE])
checkpoint = SecurityCheckpoint()
while True:
process_passengers(checkpoint)
time.sleep(1)
if __name__ == "__main__":
print("Uruchamianie kontroli bezpieczeństwa...")
process = Process(target=check_security_continuously)
process.start()
try:
process.join()
except KeyboardInterrupt:
print("\nZatrzymywanie kontroli bezpieczeństwa...")
process.terminate()
print("Kontrola zatrzymana.")