-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathload_balancer_impl.cc
1119 lines (985 loc) · 49.9 KB
/
load_balancer_impl.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "source/common/upstream/load_balancer_impl.h"
#include <atomic>
#include <bitset>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/upstream.h"
#include "source/common/common/assert.h"
#include "source/common/common/logger.h"
#include "source/common/protobuf/utility.h"
#include "absl/container/fixed_array.h"
namespace Envoy {
namespace Upstream {
namespace {
static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled";
static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size";
static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold";
bool tooManyPreconnects(size_t num_preconnect_picks, uint32_t healthy_hosts) {
// Currently we only allow the number of preconnected connections to equal the
// number of healthy hosts.
return num_preconnect_picks >= healthy_hosts;
}
// Distributes load between priorities based on the per priority availability and the normalized
// total availability. Load is assigned to each priority according to how available each priority is
// adjusted for the normalized total availability.
//
// @param per_priority_load vector of loads that should be populated.
// @param per_priority_availability the percentage availability of each priority, used to determine
// how much load each priority can handle.
// @param total_load the amount of load that may be distributed. Will be updated with the amount of
// load remaining after distribution.
// @param normalized_total_availability the total availability, up to a max of 100. Used to
// scale the load when the total availability is less than 100%.
// @return the first available priority and the remaining load
std::pair<int32_t, size_t> distributeLoad(PriorityLoad& per_priority_load,
const PriorityAvailability& per_priority_availability,
size_t total_load, size_t normalized_total_availability) {
int32_t first_available_priority = -1;
for (size_t i = 0; i < per_priority_availability.get().size(); ++i) {
if (first_available_priority < 0 && per_priority_availability.get()[i] > 0) {
first_available_priority = i;
}
// Now assign as much load as possible to the high priority levels and cease assigning load
// when total_load runs out.
per_priority_load.get()[i] = std::min<uint32_t>(
total_load, per_priority_availability.get()[i] * 100 / normalized_total_availability);
total_load -= per_priority_load.get()[i];
}
return {first_available_priority, total_load};
}
// Returns true if the weights of all the hosts in the HostVector are equal.
bool hostWeightsAreEqual(const HostVector& hosts) {
if (hosts.size() <= 1) {
return true;
}
const uint32_t weight = hosts[0]->weight();
for (size_t i = 1; i < hosts.size(); ++i) {
if (hosts[i]->weight() != weight) {
return false;
}
}
return true;
}
} // namespace
std::pair<uint32_t, LoadBalancerBase::HostAvailability>
LoadBalancerBase::choosePriority(uint64_t hash, const HealthyLoad& healthy_per_priority_load,
const DegradedLoad& degraded_per_priority_load) {
hash = hash % 100 + 1; // 1-100
uint32_t aggregate_percentage_load = 0;
// As with tryChooseLocalLocalityHosts, this can be refactored for efficiency
// but O(N) is good enough for now given the expected number of priorities is
// small.
// We first attempt to select a priority based on healthy availability.
for (size_t priority = 0; priority < healthy_per_priority_load.get().size(); ++priority) {
aggregate_percentage_load += healthy_per_priority_load.get()[priority];
if (hash <= aggregate_percentage_load) {
return {static_cast<uint32_t>(priority), HostAvailability::Healthy};
}
}
// If no priorities were selected due to health, we'll select a priority based degraded
// availability.
for (size_t priority = 0; priority < degraded_per_priority_load.get().size(); ++priority) {
aggregate_percentage_load += degraded_per_priority_load.get()[priority];
if (hash <= aggregate_percentage_load) {
return {static_cast<uint32_t>(priority), HostAvailability::Degraded};
}
}
// The percentages should always add up to 100 but we have to have a return for the compiler.
IS_ENVOY_BUG("unexpected load error");
return {0, HostAvailability::Healthy};
}
LoadBalancerBase::LoadBalancerBase(
const PrioritySet& priority_set, ClusterStats& stats, Runtime::Loader& runtime,
Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
: stats_(stats), runtime_(runtime), random_(random),
default_healthy_panic_percent_(PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
common_config, healthy_panic_threshold, 100, 50)),
priority_set_(priority_set),
override_host_status_(LoadBalancerContextBase::createOverrideHostStatus(common_config)) {
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
recalculatePerPriorityState(host_set->priority(), priority_set_, per_priority_load_,
per_priority_health_, per_priority_degraded_, total_healthy_hosts_);
}
// Recalculate panic mode for all levels.
recalculatePerPriorityPanic();
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
[this](uint32_t priority, const HostVector&, const HostVector&) -> void {
recalculatePerPriorityState(priority, priority_set_, per_priority_load_,
per_priority_health_, per_priority_degraded_,
total_healthy_hosts_);
recalculatePerPriorityPanic();
stashed_random_.clear();
});
}
// The following cases are handled by
// recalculatePerPriorityState and recalculatePerPriorityPanic methods (normalized total health is
// sum of all priorities' health values and capped at 100).
// - normalized total health is = 100%. It means there are enough healthy hosts to handle the load.
// Do not enter panic mode, even if a specific priority has low number of healthy hosts.
// - normalized total health is < 100%. There are not enough healthy hosts to handle the load.
// Continue distributing the load among priority sets, but turn on panic mode for a given priority
// if # of healthy hosts in priority set is low.
// - all host sets are in panic mode. Situation called TotalPanic. Load distribution is
// calculated based on the number of hosts in each priority regardless of their health.
// - all hosts in all priorities are down (normalized total health is 0%). If panic
// threshold > 0% the cluster is in TotalPanic (see above). If panic threshold == 0
// then priorities are not in panic, but there are no healthy hosts to route to.
// In this case just mark P=0 as recipient of 100% of the traffic (nothing will be routed
// to P=0 anyways as there are no healthy hosts there).
void LoadBalancerBase::recalculatePerPriorityState(uint32_t priority,
const PrioritySet& priority_set,
HealthyAndDegradedLoad& per_priority_load,
HealthyAvailability& per_priority_health,
DegradedAvailability& per_priority_degraded,
uint32_t& total_healthy_hosts) {
per_priority_load.healthy_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
per_priority_load.degraded_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
per_priority_health.get().resize(priority_set.hostSetsPerPriority().size());
per_priority_degraded.get().resize(priority_set.hostSetsPerPriority().size());
total_healthy_hosts = 0;
// Determine the health of the newly modified priority level.
// Health ranges from 0-100, and is the ratio of healthy/degraded hosts to total hosts, modified
// by the overprovisioning factor.
HostSet& host_set = *priority_set.hostSetsPerPriority()[priority];
per_priority_health.get()[priority] = 0;
per_priority_degraded.get()[priority] = 0;
const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
if (host_count > 0) {
// Each priority level's health is ratio of healthy hosts to total number of hosts in a priority
// multiplied by overprovisioning factor of 1.4 and capped at 100%. It means that if all
// hosts are healthy that priority's health is 100%*1.4=140% and is capped at 100% which results
// in 100%. If 80% of hosts are healthy, that priority's health is still 100% (80%*1.4=112% and
// capped at 100%).
per_priority_health.get()[priority] = std::min<uint32_t>(
100, (host_set.overprovisioningFactor() * host_set.healthyHosts().size() / host_count));
// We perform the same computation for degraded hosts.
per_priority_degraded.get()[priority] = std::min<uint32_t>(
100, (host_set.overprovisioningFactor() * host_set.degradedHosts().size() / host_count));
}
// Now that we've updated health for the changed priority level, we need to calculate percentage
// load for all priority levels.
// First, determine if the load needs to be scaled relative to availability (healthy + degraded).
// For example if there are 3 host sets with 10% / 20% / 10% health and 20% / 10% / 0% degraded
// they will get 16% / 28% / 14% load to healthy hosts and 28% / 14% / 0% load to degraded hosts
// to ensure total load adds up to 100. Note the first healthy priority is receiving 2% additional
// load due to rounding.
//
// Sum of priority levels' health and degraded values may exceed 100, so it is capped at 100 and
// referred as normalized total availability.
const uint32_t normalized_total_availability =
calculateNormalizedTotalAvailability(per_priority_health, per_priority_degraded);
if (normalized_total_availability == 0) {
// Everything is terrible. There is nothing to calculate here.
// Let recalculatePerPriorityPanic and recalculateLoadInTotalPanic deal with
// load calculation.
return;
}
// We start of with a total load of 100 and distribute it between priorities based on
// availability. We first attempt to distribute this load to healthy priorities based on healthy
// availability.
const auto first_healthy_and_remaining =
distributeLoad(per_priority_load.healthy_priority_load_, per_priority_health, 100,
normalized_total_availability);
// Using the remaining load after allocating load to healthy priorities, distribute it based on
// degraded availability.
const auto remaining_load_for_degraded = first_healthy_and_remaining.second;
const auto first_degraded_and_remaining =
distributeLoad(per_priority_load.degraded_priority_load_, per_priority_degraded,
remaining_load_for_degraded, normalized_total_availability);
// Anything that remains should just be rounding errors, so allocate that to the first available
// priority, either as healthy or degraded.
const auto remaining_load = first_degraded_and_remaining.second;
if (remaining_load != 0) {
const auto first_healthy = first_healthy_and_remaining.first;
const auto first_degraded = first_degraded_and_remaining.first;
ASSERT(first_healthy != -1 || first_degraded != -1);
// Attempt to allocate the remainder to the first healthy priority first. If no such priority
// exist, allocate to the first degraded priority.
ASSERT(remaining_load < per_priority_load.healthy_priority_load_.get().size() +
per_priority_load.degraded_priority_load_.get().size());
if (first_healthy != -1) {
per_priority_load.healthy_priority_load_.get()[first_healthy] += remaining_load;
} else {
per_priority_load.degraded_priority_load_.get()[first_degraded] += remaining_load;
}
}
// The allocated load between healthy and degraded should be exactly 100.
ASSERT(100 == std::accumulate(per_priority_load.healthy_priority_load_.get().begin(),
per_priority_load.healthy_priority_load_.get().end(), 0) +
std::accumulate(per_priority_load.degraded_priority_load_.get().begin(),
per_priority_load.degraded_priority_load_.get().end(), 0));
for (auto& host_set : priority_set.hostSetsPerPriority()) {
total_healthy_hosts += host_set->healthyHosts().size();
}
}
// Method iterates through priority levels and turns on/off panic mode.
void LoadBalancerBase::recalculatePerPriorityPanic() {
per_priority_panic_.resize(priority_set_.hostSetsPerPriority().size());
const uint32_t normalized_total_availability =
calculateNormalizedTotalAvailability(per_priority_health_, per_priority_degraded_);
const uint64_t panic_threshold = std::min<uint64_t>(
100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
// This is corner case when panic is disabled and there is no hosts available.
// LoadBalancerBase::choosePriority method expects that the sum of
// load percentages always adds up to 100.
// To satisfy that requirement 100% is assigned to P=0.
// In reality no traffic will be routed to P=0 priority, because
// the panic mode is disabled and LoadBalancer will try to find
// a healthy node and none is available.
if (panic_threshold == 0 && normalized_total_availability == 0) {
per_priority_load_.healthy_priority_load_.get()[0] = 100;
return;
}
bool total_panic = true;
for (size_t i = 0; i < per_priority_health_.get().size(); ++i) {
// For each level check if it should run in panic mode. Never set panic mode if
// normalized total health is 100%, even when individual priority level has very low # of
// healthy hosts.
const HostSet& priority_host_set = *priority_set_.hostSetsPerPriority()[i];
per_priority_panic_[i] =
(normalized_total_availability == 100 ? false : isHostSetInPanic(priority_host_set));
total_panic = total_panic && per_priority_panic_[i];
}
// If all priority levels are in panic mode, load distribution
// is done differently.
if (total_panic) {
recalculateLoadInTotalPanic();
}
}
// recalculateLoadInTotalPanic method is called when all priority levels
// are in panic mode. The load distribution is done NOT based on number
// of healthy hosts in the priority, but based on number of hosts
// in each priority regardless of its health.
void LoadBalancerBase::recalculateLoadInTotalPanic() {
// First calculate total number of hosts across all priorities regardless
// whether they are healthy or not.
const uint32_t total_hosts_count =
std::accumulate(priority_set_.hostSetsPerPriority().begin(),
priority_set_.hostSetsPerPriority().end(), static_cast<size_t>(0),
[](size_t acc, const std::unique_ptr<Envoy::Upstream::HostSet>& host_set) {
return acc + host_set->hosts().size();
});
if (0 == total_hosts_count) {
// Backend is empty, but load must be distributed somewhere.
per_priority_load_.healthy_priority_load_.get()[0] = 100;
return;
}
// Now iterate through all priority levels and calculate how much
// load is supposed to go to each priority. In panic mode the calculation
// is based not on the number of healthy hosts but based on the number of
// total hosts in the priority.
uint32_t total_load = 100;
int32_t first_noempty = -1;
for (size_t i = 0; i < per_priority_panic_.size(); i++) {
const HostSet& host_set = *priority_set_.hostSetsPerPriority()[i];
const auto hosts_num = host_set.hosts().size();
if ((-1 == first_noempty) && (0 != hosts_num)) {
first_noempty = i;
}
const uint32_t priority_load = 100 * hosts_num / total_hosts_count;
per_priority_load_.healthy_priority_load_.get()[i] = priority_load;
per_priority_load_.degraded_priority_load_.get()[i] = 0;
total_load -= priority_load;
}
// Add the remaining load to the first not empty load.
per_priority_load_.healthy_priority_load_.get()[first_noempty] += total_load;
// The total load should come up to 100%.
ASSERT(100 == std::accumulate(per_priority_load_.healthy_priority_load_.get().begin(),
per_priority_load_.healthy_priority_load_.get().end(), 0));
}
std::pair<HostSet&, LoadBalancerBase::HostAvailability>
LoadBalancerBase::chooseHostSet(LoadBalancerContext* context, uint64_t hash) const {
if (context) {
const auto priority_loads = context->determinePriorityLoad(
priority_set_, per_priority_load_, Upstream::RetryPriority::defaultPriorityMapping);
const auto priority_and_source = choosePriority(hash, priority_loads.healthy_priority_load_,
priority_loads.degraded_priority_load_);
return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
priority_and_source.second};
}
const auto priority_and_source = choosePriority(hash, per_priority_load_.healthy_priority_load_,
per_priority_load_.degraded_priority_load_);
return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
priority_and_source.second};
}
ZoneAwareLoadBalancerBase::ZoneAwareLoadBalancerBase(
const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterStats& stats,
Runtime::Loader& runtime, Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
: LoadBalancerBase(priority_set, stats, runtime, random, common_config),
local_priority_set_(local_priority_set),
routing_enabled_(PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
common_config.zone_aware_lb_config(), routing_enabled, 100, 100)),
min_cluster_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(common_config.zone_aware_lb_config(),
min_cluster_size, 6U)),
fail_traffic_on_panic_(common_config.zone_aware_lb_config().fail_traffic_on_panic()) {
ASSERT(!priority_set.hostSetsPerPriority().empty());
resizePerPriorityState();
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
[this](uint32_t priority, const HostVector&, const HostVector&) -> void {
// Update cross priority host map for fast host searching.
cross_priority_host_map_ = priority_set_.crossPriorityHostMap();
// Make sure per_priority_state_ is as large as priority_set_.hostSetsPerPriority()
resizePerPriorityState();
// If P=0 changes, regenerate locality routing structures. Locality based routing is
// disabled at all other levels.
if (local_priority_set_ && priority == 0) {
regenerateLocalityRoutingStructures();
}
});
if (local_priority_set_) {
// Multiple priorities are unsupported for local priority sets.
// In order to support priorities correctly, one would have to make some assumptions about
// routing (all local Envoys fail over at the same time) and use all priorities when computing
// the locality routing structure.
ASSERT(local_priority_set_->hostSetsPerPriority().size() == 1);
local_priority_set_member_update_cb_handle_ = local_priority_set_->addPriorityUpdateCb(
[this](uint32_t priority, const HostVector&, const HostVector&) -> void {
ASSERT(priority == 0);
// If the set of local Envoys changes, regenerate routing for P=0 as it does priority
// based routing.
regenerateLocalityRoutingStructures();
});
}
}
void ZoneAwareLoadBalancerBase::regenerateLocalityRoutingStructures() {
ASSERT(local_priority_set_);
stats_.lb_recalculate_zone_structures_.inc();
// resizePerPriorityState should ensure these stay in sync.
ASSERT(per_priority_state_.size() == priority_set_.hostSetsPerPriority().size());
// We only do locality routing for P=0
uint32_t priority = 0;
PerPriorityState& state = *per_priority_state_[priority];
// Do not perform any calculations if we cannot perform locality routing based on non runtime
// params.
if (earlyExitNonLocalityRouting()) {
state.locality_routing_state_ = LocalityRoutingState::NoLocalityRouting;
return;
}
HostSet& host_set = *priority_set_.hostSetsPerPriority()[priority];
ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality());
const size_t num_localities = host_set.healthyHostsPerLocality().get().size();
ASSERT(num_localities > 0);
// It is worth noting that all of the percentages calculated are orthogonal from
// how much load this priority level receives, percentageLoad(priority).
//
// If the host sets are such that 20% of load is handled locally and 80% is residual, and then
// half the hosts in all host sets go unhealthy, this priority set will
// still send half of the incoming load to the local locality and 80% to residual.
//
// Basically, fairness across localities within a priority is guaranteed. Fairness across
// localities across priorities is not.
absl::FixedArray<uint64_t> local_percentage(num_localities);
calculateLocalityPercentage(localHostSet().healthyHostsPerLocality(), local_percentage.begin());
absl::FixedArray<uint64_t> upstream_percentage(num_localities);
calculateLocalityPercentage(host_set.healthyHostsPerLocality(), upstream_percentage.begin());
// If we have lower percent of hosts in the local cluster in the same locality,
// we can push all of the requests directly to upstream cluster in the same locality.
if (upstream_percentage[0] >= local_percentage[0]) {
state.locality_routing_state_ = LocalityRoutingState::LocalityDirect;
return;
}
state.locality_routing_state_ = LocalityRoutingState::LocalityResidual;
// If we cannot route all requests to the same locality, calculate what percentage can be routed.
// For example, if local percentage is 20% and upstream is 10%
// we can route only 50% of requests directly.
state.local_percent_to_route_ = upstream_percentage[0] * 10000 / local_percentage[0];
// Local locality does not have additional capacity (we have already routed what we could).
// Now we need to figure out how much traffic we can route cross locality and to which exact
// locality we should route. Percentage of requests routed cross locality to a specific locality
// needed be proportional to the residual capacity upstream locality has.
//
// residual_capacity contains capacity left in a given locality, we keep accumulating residual
// capacity to make search for sampled value easier.
// For example, if we have the following upstream and local percentage:
// local_percentage: 40000 40000 20000
// upstream_percentage: 25000 50000 25000
// Residual capacity would look like: 0 10000 5000. Now we need to sample proportionally to
// bucket sizes (residual capacity). For simplicity of finding where specific
// sampled value is, we accumulate values in residual capacity. This is what it will look like:
// residual_capacity: 0 10000 15000
// Now to find a locality to route (bucket) we could simply iterate over residual_capacity
// searching where sampled value is placed.
state.residual_capacity_.resize(num_localities);
// Local locality (index 0) does not have residual capacity as we have routed all we could.
state.residual_capacity_[0] = 0;
for (size_t i = 1; i < num_localities; ++i) {
// Only route to the localities that have additional capacity.
if (upstream_percentage[i] > local_percentage[i]) {
state.residual_capacity_[i] =
state.residual_capacity_[i - 1] + upstream_percentage[i] - local_percentage[i];
} else {
// Locality with index "i" does not have residual capacity, but we keep accumulating previous
// values to make search easier on the next step.
state.residual_capacity_[i] = state.residual_capacity_[i - 1];
}
}
}
void ZoneAwareLoadBalancerBase::resizePerPriorityState() {
const uint32_t size = priority_set_.hostSetsPerPriority().size();
while (per_priority_state_.size() < size) {
// Note for P!=0, PerPriorityState is created with NoLocalityRouting and never changed.
per_priority_state_.push_back(std::make_unique<PerPriorityState>());
}
}
bool ZoneAwareLoadBalancerBase::earlyExitNonLocalityRouting() {
// We only do locality routing for P=0.
HostSet& host_set = *priority_set_.hostSetsPerPriority()[0];
if (host_set.healthyHostsPerLocality().get().size() < 2) {
return true;
}
// lb_local_cluster_not_ok is bumped for "Local host set is not set or it is
// panic mode for local cluster".
if (!host_set.healthyHostsPerLocality().hasLocalLocality() ||
host_set.healthyHostsPerLocality().get()[0].empty()) {
stats_.lb_local_cluster_not_ok_.inc();
return true;
}
// Same number of localities should be for local and upstream cluster.
if (host_set.healthyHostsPerLocality().get().size() !=
localHostSet().healthyHostsPerLocality().get().size()) {
stats_.lb_zone_number_differs_.inc();
return true;
}
// Do not perform locality routing for small clusters.
const uint64_t min_cluster_size =
runtime_.snapshot().getInteger(RuntimeMinClusterSize, min_cluster_size_);
if (host_set.healthyHosts().size() < min_cluster_size) {
stats_.lb_zone_cluster_too_small_.inc();
return true;
}
return false;
}
HostStatusSet LoadBalancerContextBase::createOverrideHostStatus(
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) {
HostStatusSet override_host_status;
if (!common_config.has_override_host_status()) {
// No override host status and 'Healthy' and 'Degraded' will be applied by default.
override_host_status.set(static_cast<size_t>(Host::Health::Healthy));
override_host_status.set(static_cast<size_t>(Host::Health::Degraded));
return override_host_status;
}
for (auto single_status : common_config.override_host_status().statuses()) {
switch (static_cast<envoy::config::core::v3::HealthStatus>(single_status)) {
PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
case envoy::config::core::v3::HealthStatus::UNKNOWN:
case envoy::config::core::v3::HealthStatus::HEALTHY:
override_host_status.set(static_cast<size_t>(Host::Health::Healthy));
break;
case envoy::config::core::v3::HealthStatus::UNHEALTHY:
case envoy::config::core::v3::HealthStatus::DRAINING:
case envoy::config::core::v3::HealthStatus::TIMEOUT:
override_host_status.set(static_cast<size_t>(Host::Health::Unhealthy));
break;
case envoy::config::core::v3::HealthStatus::DEGRADED:
override_host_status.set(static_cast<size_t>(Host::Health::Degraded));
break;
}
}
return override_host_status;
}
HostConstSharedPtr LoadBalancerContextBase::selectOverrideHost(const HostMap* host_map,
HostStatusSet status,
LoadBalancerContext* context) {
if (context == nullptr) {
return nullptr;
}
auto override_host = context->overrideHostToSelect();
if (!override_host.has_value()) {
return nullptr;
}
if (host_map == nullptr) {
return nullptr;
}
auto host_iter = host_map->find(override_host.value());
// The override host cannot be found in the host map.
if (host_iter == host_map->end()) {
return nullptr;
}
HostConstSharedPtr host = host_iter->second;
ASSERT(host != nullptr);
if (status[static_cast<size_t>(host->health())]) {
return host;
}
return nullptr;
}
HostConstSharedPtr ZoneAwareLoadBalancerBase::chooseHost(LoadBalancerContext* context) {
HostConstSharedPtr host = LoadBalancerContextBase::selectOverrideHost(
cross_priority_host_map_.get(), override_host_status_, context);
if (host != nullptr) {
return host;
}
const size_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
for (size_t i = 0; i < max_attempts; ++i) {
host = chooseHostOnce(context);
// If host selection failed or the host is accepted by the filter, return.
// Otherwise, try again.
// Note: in the future we might want to allow retrying when chooseHostOnce returns nullptr.
if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
return host;
}
}
// If we didn't find anything, return the last host.
return host;
}
bool LoadBalancerBase::isHostSetInPanic(const HostSet& host_set) const {
uint64_t global_panic_threshold = std::min<uint64_t>(
100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
double healthy_percent =
host_count == 0 ? 0.0 : 100.0 * host_set.healthyHosts().size() / host_count;
double degraded_percent =
host_count == 0 ? 0.0 : 100.0 * host_set.degradedHosts().size() / host_count;
// If the % of healthy hosts in the cluster is less than our panic threshold, we use all hosts.
if ((healthy_percent + degraded_percent) < global_panic_threshold) {
return true;
}
return false;
}
void ZoneAwareLoadBalancerBase::calculateLocalityPercentage(
const HostsPerLocality& hosts_per_locality, uint64_t* ret) {
uint64_t total_hosts = 0;
for (const auto& locality_hosts : hosts_per_locality.get()) {
total_hosts += locality_hosts.size();
}
// TODO(snowp): Should we ignore excluded hosts here too?
size_t i = 0;
for (const auto& locality_hosts : hosts_per_locality.get()) {
ret[i++] = total_hosts > 0 ? 10000ULL * locality_hosts.size() / total_hosts : 0;
}
}
uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& host_set) const {
PerPriorityState& state = *per_priority_state_[host_set.priority()];
ASSERT(state.locality_routing_state_ != LocalityRoutingState::NoLocalityRouting);
// At this point it's guaranteed to be at least 2 localities & local exists.
const size_t number_of_localities = host_set.healthyHostsPerLocality().get().size();
ASSERT(number_of_localities >= 2U);
ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality());
// Try to push all of the requests to the same locality first.
if (state.locality_routing_state_ == LocalityRoutingState::LocalityDirect) {
stats_.lb_zone_routing_all_directly_.inc();
return 0;
}
ASSERT(state.locality_routing_state_ == LocalityRoutingState::LocalityResidual);
// If we cannot route all requests to the same locality, we already calculated how much we can
// push to the local locality, check if we can push to local locality on current iteration.
if (random_.random() % 10000 < state.local_percent_to_route_) {
stats_.lb_zone_routing_sampled_.inc();
return 0;
}
// At this point we must route cross locality as we cannot route to the local locality.
stats_.lb_zone_routing_cross_zone_.inc();
// This is *extremely* unlikely but possible due to rounding errors when calculating
// locality percentages. In this case just select random locality.
if (state.residual_capacity_[number_of_localities - 1] == 0) {
stats_.lb_zone_no_capacity_left_.inc();
return random_.random() % number_of_localities;
}
// Random sampling to select specific locality for cross locality traffic based on the
// additional capacity in localities.
uint64_t threshold = random_.random() % state.residual_capacity_[number_of_localities - 1];
// This potentially can be optimized to be O(log(N)) where N is the number of localities.
// Linear scan should be faster for smaller N, in most of the scenarios N will be small.
// TODO(htuch): is there a bug here when threshold == 0? Seems like we pick
// local locality in that situation. Probably should start iterating at 1.
int i = 0;
while (threshold > state.residual_capacity_[i]) {
i++;
}
return i;
}
absl::optional<ZoneAwareLoadBalancerBase::HostsSource>
ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context, uint64_t hash) const {
auto host_set_and_source = chooseHostSet(context, hash);
// The second argument tells us which availability we should target from the selected host set.
const auto host_availability = host_set_and_source.second;
auto& host_set = host_set_and_source.first;
HostsSource hosts_source;
hosts_source.priority_ = host_set.priority();
// If the selected host set has insufficient healthy hosts, return all hosts (unless we should
// fail traffic on panic, in which case return no host).
if (per_priority_panic_[hosts_source.priority_]) {
stats_.lb_healthy_panic_.inc();
if (fail_traffic_on_panic_) {
return absl::nullopt;
} else {
hosts_source.source_type_ = HostsSource::SourceType::AllHosts;
return hosts_source;
}
}
// If we're doing locality weighted balancing, pick locality.
absl::optional<uint32_t> locality;
if (host_availability == HostAvailability::Degraded) {
locality = host_set.chooseDegradedLocality();
} else {
locality = host_set.chooseHealthyLocality();
}
if (locality.has_value()) {
auto source_type = localitySourceType(host_availability);
if (!source_type) {
return absl::nullopt;
}
hosts_source.source_type_ = source_type.value();
hosts_source.locality_index_ = locality.value();
return hosts_source;
}
// If we've latched that we can't do priority-based routing, return healthy or degraded hosts
// for the selected host set.
if (per_priority_state_[host_set.priority()]->locality_routing_state_ ==
LocalityRoutingState::NoLocalityRouting) {
auto source_type = sourceType(host_availability);
if (!source_type) {
return absl::nullopt;
}
hosts_source.source_type_ = source_type.value();
return hosts_source;
}
// Determine if the load balancer should do zone based routing for this pick.
if (!runtime_.snapshot().featureEnabled(RuntimeZoneEnabled, routing_enabled_)) {
auto source_type = sourceType(host_availability);
if (!source_type) {
return absl::nullopt;
}
hosts_source.source_type_ = source_type.value();
return hosts_source;
}
if (isHostSetInPanic(localHostSet())) {
stats_.lb_local_cluster_not_ok_.inc();
// If the local Envoy instances are in global panic, and we should not fail traffic, do
// not do locality based routing.
if (fail_traffic_on_panic_) {
return absl::nullopt;
} else {
auto source_type = sourceType(host_availability);
if (!source_type) {
return absl::nullopt;
}
hosts_source.source_type_ = source_type.value();
return hosts_source;
}
}
auto source_type = localitySourceType(host_availability);
if (!source_type) {
return absl::nullopt;
}
hosts_source.source_type_ = source_type.value();
hosts_source.locality_index_ = tryChooseLocalLocalityHosts(host_set);
return hosts_source;
}
const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) const {
const HostSet& host_set = *priority_set_.hostSetsPerPriority()[hosts_source.priority_];
switch (hosts_source.source_type_) {
case HostsSource::SourceType::AllHosts:
return host_set.hosts();
case HostsSource::SourceType::HealthyHosts:
return host_set.healthyHosts();
case HostsSource::SourceType::DegradedHosts:
return host_set.degradedHosts();
case HostsSource::SourceType::LocalityHealthyHosts:
return host_set.healthyHostsPerLocality().get()[hosts_source.locality_index_];
case HostsSource::SourceType::LocalityDegradedHosts:
return host_set.degradedHostsPerLocality().get()[hosts_source.locality_index_];
}
PANIC_DUE_TO_CORRUPT_ENUM;
}
EdfLoadBalancerBase::EdfLoadBalancerBase(
const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterStats& stats,
Runtime::Loader& runtime, Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config,
const absl::optional<envoy::config::cluster::v3::Cluster::SlowStartConfig> slow_start_config,
TimeSource& time_source)
: ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random,
common_config),
seed_(random_.random()),
slow_start_window_(slow_start_config.has_value()
? std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
slow_start_config.value().slow_start_window()))
: std::chrono::milliseconds(0)),
aggression_runtime_(
slow_start_config.has_value() && slow_start_config.value().has_aggression()
? absl::optional<Runtime::Double>({slow_start_config.value().aggression(), runtime})
: absl::nullopt),
time_source_(time_source), latest_host_added_time_(time_source_.monotonicTime()),
slow_start_min_weight_percent_(slow_start_config.has_value()
? PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(
slow_start_config.value(), min_weight_percent, 10) /
100.0
: 0.1) {
// We fully recompute the schedulers for a given host set here on membership change, which is
// consistent with what other LB implementations do (e.g. thread aware).
// The downside of a full recompute is that time complexity is O(n * log n),
// so we will need to do better at delta tracking to scale (see
// https://github.com/envoyproxy/envoy/issues/2874).
priority_update_cb_ = priority_set.addPriorityUpdateCb(
[this](uint32_t priority, const HostVector&, const HostVector&) { refresh(priority); });
member_update_cb_ = priority_set.addMemberUpdateCb(
[this](const HostVector& hosts_added, const HostVector&) -> void {
if (isSlowStartEnabled()) {
recalculateHostsInSlowStart(hosts_added);
}
});
}
void EdfLoadBalancerBase::initialize() {
for (uint32_t priority = 0; priority < priority_set_.hostSetsPerPriority().size(); ++priority) {
refresh(priority);
}
}
void EdfLoadBalancerBase::recalculateHostsInSlowStart(const HostVector& hosts) {
auto current_time = time_source_.monotonicTime();
// TODO(nezdolik): linear scan can be improved with using flat hash set for hosts in slow start.
for (const auto& host : hosts) {
auto host_create_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(current_time - host->creationTime());
// Check if host existence time is within slow start window.
if (host->creationTime() > latest_host_added_time_ &&
host_create_duration <= slow_start_window_ &&
host->health() == Upstream::Host::Health::Healthy) {
latest_host_added_time_ = host->creationTime();
}
}
}
void EdfLoadBalancerBase::refresh(uint32_t priority) {
const auto add_hosts_source = [this](HostsSource source, const HostVector& hosts) {
// Nuke existing scheduler if it exists.
auto& scheduler = scheduler_[source] = Scheduler{};
refreshHostSource(source);
if (isSlowStartEnabled()) {
recalculateHostsInSlowStart(hosts);
}
// Check if the original host weights are equal and no hosts are in slow start mode, in that
// case EDF creation is skipped. When all original weights are equal and no hosts are in slow
// start mode we can rely on unweighted host pick to do optimal round robin and least-loaded
// host selection with lower memory and CPU overhead.
if (hostWeightsAreEqual(hosts) && noHostsAreInSlowStart()) {
// Skip edf creation.
return;
}
scheduler.edf_ = std::make_unique<EdfScheduler<const Host>>();
// Populate scheduler with host list.
// TODO(mattklein123): We must build the EDF schedule even if all of the hosts are currently
// weighted 1. This is because currently we don't refresh host sets if only weights change.
// We should probably change this to refresh at all times. See the comment in
// BaseDynamicClusterImpl::updateDynamicHostList about this.
for (const auto& host : hosts) {
// We use a fixed weight here. While the weight may change without
// notification, this will only be stale until this host is next picked,
// at which point it is reinserted into the EdfScheduler with its new
// weight in chooseHost().
scheduler.edf_->add(hostWeight(*host), host);
}
// Cycle through hosts to achieve the intended offset behavior.
// TODO(htuch): Consider how we can avoid biasing towards earlier hosts in the schedule across
// refreshes for the weighted case.
if (!hosts.empty()) {
for (uint32_t i = 0; i < seed_ % hosts.size(); ++i) {
auto host =
scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); });
}
}
};
// Populate EdfSchedulers for each valid HostsSource value for the host set at this priority.
const auto& host_set = priority_set_.hostSetsPerPriority()[priority];
add_hosts_source(HostsSource(priority, HostsSource::SourceType::AllHosts), host_set->hosts());
add_hosts_source(HostsSource(priority, HostsSource::SourceType::HealthyHosts),
host_set->healthyHosts());
add_hosts_source(HostsSource(priority, HostsSource::SourceType::DegradedHosts),
host_set->degradedHosts());
for (uint32_t locality_index = 0;
locality_index < host_set->healthyHostsPerLocality().get().size(); ++locality_index) {
add_hosts_source(
HostsSource(priority, HostsSource::SourceType::LocalityHealthyHosts, locality_index),
host_set->healthyHostsPerLocality().get()[locality_index]);
}
for (uint32_t locality_index = 0;
locality_index < host_set->degradedHostsPerLocality().get().size(); ++locality_index) {
add_hosts_source(
HostsSource(priority, HostsSource::SourceType::LocalityDegradedHosts, locality_index),
host_set->degradedHostsPerLocality().get()[locality_index]);
}
}
bool EdfLoadBalancerBase::isSlowStartEnabled() {
return slow_start_window_ > std::chrono::milliseconds(0);
}
bool EdfLoadBalancerBase::noHostsAreInSlowStart() {
if (!isSlowStartEnabled()) {
return true;
}
auto current_time = time_source_.monotonicTime();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
current_time - latest_host_added_time_) <= slow_start_window_) {
return false;
}
return true;
}
HostConstSharedPtr EdfLoadBalancerBase::peekAnotherHost(LoadBalancerContext* context) {
if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) {
return nullptr;
}
const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(true));
if (!hosts_source) {
return nullptr;
}
auto scheduler_it = scheduler_.find(*hosts_source);
// We should always have a scheduler for any return value from
// hostSourceToUse() via the construction in refresh();
ASSERT(scheduler_it != scheduler_.end());
auto& scheduler = scheduler_it->second;
// As has been commented in both EdfLoadBalancerBase::refresh and
// BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
// whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
// weights of 2 or more hosts differ.
if (scheduler.edf_ != nullptr) {
return scheduler.edf_->peekAgain([this](const Host& host) { return hostWeight(host); });
} else {
const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
if (hosts_to_use.empty()) {
return nullptr;
}
return unweightedHostPeek(hosts_to_use, *hosts_source);
}
}
HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* context) {
const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(false));
if (!hosts_source) {
return nullptr;
}
auto scheduler_it = scheduler_.find(*hosts_source);
// We should always have a scheduler for any return value from
// hostSourceToUse() via the construction in refresh();
ASSERT(scheduler_it != scheduler_.end());
auto& scheduler = scheduler_it->second;
// As has been commented in both EdfLoadBalancerBase::refresh and
// BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
// whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
// weights of 2 or more hosts differ.
if (scheduler.edf_ != nullptr) {
auto host = scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); });
return host;
} else {
const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
if (hosts_to_use.empty()) {
return nullptr;
}
return unweightedHostPick(hosts_to_use, *hosts_source);
}
}
double EdfLoadBalancerBase::applyAggressionFactor(double time_factor) {
if (aggression_ == 1.0 || time_factor == 1.0) {
return time_factor;
} else {
return std::pow(time_factor, 1.0 / aggression_);
}
}
double EdfLoadBalancerBase::applySlowStartFactor(double host_weight, const Host& host) {
auto host_create_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.monotonicTime() - host.creationTime());
if (host_create_duration < slow_start_window_ &&
host.health() == Upstream::Host::Health::Healthy) {
aggression_ = aggression_runtime_ != absl::nullopt ? aggression_runtime_.value().value() : 1.0;
if (aggression_ <= 0.0 || std::isnan(aggression_)) {