-
-
Notifications
You must be signed in to change notification settings - Fork 417
/
actor.c
1337 lines (1130 loc) · 40.9 KB
/
actor.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#define PONY_WANT_ATOMIC_DEFS
#include "actor.h"
#include "../sched/scheduler.h"
#include "../sched/cpu.h"
#include "../mem/pool.h"
#include "../gc/cycle.h"
#include "../gc/trace.h"
#include "ponyassert.h"
#include <assert.h>
#include <string.h>
#include <dtrace.h>
#ifdef USE_RUNTIMESTATS
#include <stdio.h>
#endif
#ifdef USE_VALGRIND
#include <valgrind/helgrind.h>
#endif
// default actor batch size
#define PONY_SCHED_BATCH 100
// Ignore padding at the end of the type.
pony_static_assert((offsetof(pony_actor_t, gc) + sizeof(gc_t)) ==
sizeof(pony_actor_pad_t), "Wrong actor pad size!");
static bool actor_noblock = false;
enum
{
FLAG_BLOCKED = 1 << 0,
FLAG_BLOCKED_SENT = 1 << 1,
FLAG_SYSTEM = 1 << 2,
FLAG_UNSCHEDULED = 1 << 3,
FLAG_CD_CONTACTED = 1 << 4,
FLAG_RC_OVER_ZERO_SEEN = 1 << 5,
FLAG_PINNED = 1 << 6,
};
enum
{
SYNC_FLAG_PENDINGDESTROY = 1 << 0,
SYNC_FLAG_OVERLOADED = 1 << 1,
SYNC_FLAG_UNDER_PRESSURE = 1 << 2,
SYNC_FLAG_MUTED = 1 << 3,
};
#ifdef USE_RUNTIMESTATS
void print_actor_stats(pony_actor_t* actor)
{
printf("Actor stats for actor: %zu, "
"heap memory allocated: %ld, "
"heap memory used: %ld, "
"heap num allocated: %ld, "
"heap realloc counter: %ld, "
"heap alloc counter: %ld, "
"heap free counter: %ld, "
"heap gc counter: %ld, "
"system cpu: %ld, "
"app cpu: %ld, "
"garbage collection marking cpu: %ld, "
"garbage collection sweeping cpu: %ld, "
"messages sent counter: %ld, "
"system messages processed counter: %ld, "
"app messages processed counter: %ld\n",
(uintptr_t)actor,
actor->actorstats.heap_mem_allocated,
actor->actorstats.heap_mem_used,
actor->actorstats.heap_num_allocated,
actor->actorstats.heap_realloc_counter,
actor->actorstats.heap_alloc_counter,
actor->actorstats.heap_free_counter,
actor->actorstats.heap_gc_counter,
actor->actorstats.system_cpu,
actor->actorstats.app_cpu,
actor->actorstats.gc_mark_cpu,
actor->actorstats.gc_sweep_cpu,
actor->actorstats.messages_sent_counter,
actor->actorstats.system_messages_processed_counter,
actor->actorstats.app_messages_processed_counter
);
}
#endif
// The sync flags of a given actor cannot be mutated from more than one actor at
// once, so these operations need not be atomic RMW.
static bool has_sync_flag_any(pony_actor_t* actor, uint8_t check_flags)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
return (flags & check_flags) != 0;
}
static bool has_sync_flag(pony_actor_t* actor, uint8_t flag)
{
return has_sync_flag_any(actor, flag);
}
static void set_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
atomic_store_explicit(&actor->sync_flags, flags | flag, memory_order_release);
}
static void unset_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
atomic_store_explicit(&actor->sync_flags, flags & (uint8_t)~flag,
memory_order_release);
}
// The internal flags of a given actor are only ever read or written by a
// single scheduler at a time and so need not be synchronization safe (atomics).
static bool has_internal_flag(pony_actor_t* actor, uint8_t flag)
{
return (actor->internal_flags & flag) != 0;
}
static void set_internal_flag(pony_actor_t* actor, uint8_t flag)
{
actor->internal_flags = actor->internal_flags | flag;
}
static void unset_internal_flag(pony_actor_t* actor, uint8_t flag)
{
actor->internal_flags = actor->internal_flags & (uint8_t)~flag;
}
//
// Mute/Unmute/Check mute status functions
//
// For backpressure related muting and unmuting to work correctly, the following
// rules have to be maintained.
//
// 1. Across schedulers, an actor should never been seen as muted when it is not
// in fact muted.
// 2. It's ok for a muted actor to be seen as unmuted in a transient fashion
// across actors
//
// If rule #1 is violated, we might end up deadlocking because an actor was
// muted for sending to an actor that might never be unmuted (because it isn't
// muted). The actor muted actor would continue to remain muted and the actor
// incorrectly seen as muted became actually muted and then unmuted.
//
// If rule #2 is violated, then a muted actor will receive from 1 to a few
// additional messages and the sender won't be muted. As this is a transient
// situtation that should be shortly rectified, there's no harm done.
//
// Our handling of atomic operations in `mute_actor`
// and `unmute_actor` are to assure that both rules aren't violated.
static void mute_actor(pony_actor_t* actor)
{
set_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_MUTED, (uintptr_t)actor);
}
void ponyint_unmute_actor(pony_actor_t* actor)
{
unset_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_UNMUTED, (uintptr_t)actor);
}
static bool triggers_muting(pony_actor_t* actor)
{
return has_sync_flag_any(actor, SYNC_FLAG_OVERLOADED |
SYNC_FLAG_UNDER_PRESSURE | SYNC_FLAG_MUTED);
}
static void actor_setoverloaded(pony_actor_t* actor)
{
pony_assert(!ponyint_is_cycle(actor));
set_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED, (uintptr_t)actor);
}
static void actor_unsetoverloaded(pony_actor_t* actor)
{
pony_ctx_t* ctx = pony_ctx();
unset_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED_CLEARED, (uintptr_t)actor);
if (!has_sync_flag(actor, SYNC_FLAG_UNDER_PRESSURE))
{
ponyint_sched_start_global_unmute(ctx->scheduler->index, actor);
}
}
static void maybe_mark_should_mute(pony_ctx_t* ctx, pony_actor_t* to)
{
if(ctx->current != NULL)
{
// only mute a sender IF:
// 1. the receiver is overloaded/under pressure/muted
// AND
// 2. the sender isn't overloaded or under pressure
// AND
// 3. we are sending to another actor (as compared to sending to self)
if(triggers_muting(to) &&
!has_sync_flag_any(ctx->current, SYNC_FLAG_OVERLOADED |
SYNC_FLAG_UNDER_PRESSURE) &&
ctx->current != to)
{
ponyint_sched_mute(ctx, ctx->current, to);
}
}
}
#ifndef PONY_NDEBUG
static bool well_formed_msg_chain(pony_msg_t* first, pony_msg_t* last)
{
// A message chain is well formed if last is reachable from first and is the
// end of the chain. first should also be the start of the chain but we can't
// verify that.
if((first == NULL) || (last == NULL) ||
(atomic_load_explicit(&last->next, memory_order_relaxed) != NULL))
return false;
pony_msg_t* m1 = first;
pony_msg_t* m2 = first;
while((m1 != NULL) && (m2 != NULL))
{
if(m2 == last)
return true;
m2 = atomic_load_explicit(&m2->next, memory_order_relaxed);
if(m2 == last)
return true;
if(m2 == NULL)
return false;
m1 = atomic_load_explicit(&m1->next, memory_order_relaxed);
m2 = atomic_load_explicit(&m2->next, memory_order_relaxed);
if(m1 == m2)
return false;
}
return false;
}
#endif
static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
{
if(!ponyint_heap_startgc(&actor->heap
#ifdef USE_RUNTIMESTATS
, actor))
#else
))
#endif
return;
#ifdef USE_RUNTIMESTATS
uint64_t used_cpu = ponyint_sched_cpu_used(ctx);
ctx->schedulerstats.misc_cpu += used_cpu;
#endif
DTRACE2(GC_START, (uintptr_t)ctx->scheduler, (uintptr_t)actor);
ponyint_gc_mark(ctx);
if(actor->type->trace != NULL)
actor->type->trace(ctx, actor);
ponyint_mark_done(ctx);
#ifdef USE_RUNTIMESTATS
used_cpu = ponyint_sched_cpu_used(ctx);
ctx->schedulerstats.actor_gc_mark_cpu += used_cpu;
actor->actorstats.gc_mark_cpu += used_cpu;
#endif
ponyint_heap_endgc(&actor->heap
#ifdef USE_RUNTIMESTATS
, actor);
#else
);
#endif
DTRACE2(GC_END, (uintptr_t)ctx->scheduler, (uintptr_t)actor);
#ifdef USE_RUNTIMESTATS
used_cpu = ponyint_sched_cpu_used(ctx);
ctx->schedulerstats.actor_gc_sweep_cpu += used_cpu;
actor->actorstats.gc_sweep_cpu += used_cpu;
#endif
}
static void send_unblock(pony_actor_t* actor)
{
// Send unblock before continuing.
unset_internal_flag(actor, FLAG_BLOCKED | FLAG_BLOCKED_SENT);
ponyint_cycle_unblock(actor);
}
static void send_block(pony_ctx_t* ctx, pony_actor_t* actor)
{
pony_assert(ctx->current == actor);
// Try and run GC because we're blocked and sending a block message
// to the CD. This will try and free any memory the actor has in its
// heap that wouldn't get freed otherwise until the actor is
// destroyed or happens to receive more work via application messages
// that eventually trigger a GC which may not happen for a long time
// (or ever). Do this BEFORE sending the message or else we might be
// GCing while the CD destroys us.
pony_triggergc(ctx);
try_gc(ctx, actor);
// We're blocked, send block message.
set_internal_flag(actor, FLAG_BLOCKED_SENT);
set_internal_flag(actor, FLAG_CD_CONTACTED);
ponyint_cycle_block(actor, &actor->gc);
}
static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
pony_msg_t* msg)
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.num_inflight_messages--;
#endif
switch(msg->id)
{
case ACTORMSG_ACQUIRE:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgp_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;
#ifdef USE_RUNTIMESTATS
ctx->schedulerstats.mem_used_actors -= (sizeof(actorref_t)
+ ponyint_objectmap_total_mem_size(&((actorref_t*)m->p)->map));
ctx->schedulerstats.mem_allocated_actors -= (POOL_ALLOC_SIZE(actorref_t)
+ ponyint_objectmap_total_alloc_size(&((actorref_t*)m->p)->map));
#endif
if(ponyint_gc_acquire(&actor->gc, (actorref_t*)m->p) &&
has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(actor);
}
return false;
}
case ACTORMSG_RELEASE:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgp_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;
#ifdef USE_RUNTIMESTATS
ctx->schedulerstats.mem_used_actors -= (sizeof(actorref_t)
+ ponyint_objectmap_total_mem_size(&((actorref_t*)m->p)->map));
ctx->schedulerstats.mem_allocated_actors -= (POOL_ALLOC_SIZE(actorref_t)
+ ponyint_objectmap_total_alloc_size(&((actorref_t*)m->p)->map));
#endif
if(ponyint_gc_release(&actor->gc, (actorref_t*)m->p) &&
has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(actor);
}
return false;
}
case ACTORMSG_ACK:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgi_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return false;
}
case ACTORMSG_CONF:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgi_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
// We've sent a block message, send confirm.
pony_msgi_t* m = (pony_msgi_t*)msg;
ponyint_cycle_ack(m->i);
}
return false;
}
case ACTORMSG_ISBLOCKED:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msg_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif
// this actor should not already be marked as pendingdestroy
// or else we could end up double freeing it
// this assertion is to ensure this invariant is not
// accidentally broken due to code changes
pony_assert(!ponyint_actor_pendingdestroy(actor));
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED)
&& !has_internal_flag(actor, FLAG_BLOCKED_SENT)
&& (actor->gc.rc > 0))
{
// We're blocked, send block message if:
// - the actor hasn't already sent one
// - the actor aren't a zombie aka rc == 0
//
// Sending multiple "i'm blocked" messages to the cycle detector
// will result in actor potentially being freed more than once.
send_block(ctx, actor);
}
return false;
}
case ACTORMSG_BLOCK:
{
// runtimestats messages tracked in cycle detector
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return false;
}
case ACTORMSG_UNBLOCK:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgp_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return false;
}
case ACTORMSG_DESTROYED:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msgp_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return false;
}
case ACTORMSG_CHECKBLOCKED:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= sizeof(pony_msg_t);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return false;
}
default:
{
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages -= POOL_SIZE(msg->index);
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_SIZE(msg->index);
#endif
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
// send unblock if we've sent a block
send_unblock(actor);
}
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
return true;
}
}
}
// return true if mute occurs
static bool maybe_should_mute(pony_actor_t* actor)
{
// if we become muted as a result of handling a message, bail out now.
// we aren't set to "muted" at this point. setting to muted during a
// a behavior can lead to race conditions that might result in a
// deadlock.
// Given that actor's are not run when they are muted, then when we
// started out batch, actor->muted would have been 0. If any of our
// message sends would result in the actor being muted, that value will
// have changed to greater than 0.
//
// We will then set the actor to "muted". Once set, any actor sending
// a message to it will be also be muted unless said sender is marked
// as overloaded.
//
// The key points here is that:
// 1. We can't set the actor to "muted" until after its finished running
// a behavior.
// 2. We should bail out from running the actor and return false so that
// it won't be rescheduled.
if(actor->muted > 0)
{
mute_actor(actor);
return true;
}
return false;
}
static bool batch_limit_reached(pony_actor_t* actor, bool polling)
{
if(!has_sync_flag(actor, SYNC_FLAG_OVERLOADED) && !polling)
{
// If we hit our batch size, consider this actor to be overloaded
// only if we're not polling from C code.
// Overloaded actors are allowed to send to other overloaded actors
// and to muted actors without being muted themselves.
actor_setoverloaded(actor);
}
return true;
}
bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
{
pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED));
ctx->current = actor;
size_t batch = PONY_SCHED_BATCH;
pony_msg_t* msg;
size_t app = 0;
// check to see at the start of a run, and most importantly, the first time
// we run if our GC is over 0. If it is 0 the first time we run, then the
// actor starts as an orphan with no references to it. There is no way that
// the cycle detector will find out about an orphan actor unless it either
// contacts the cycle detector itself (which we don't do) or the orphan
// contacts another and needs to participate in the cycle detection protocol.
// if an actors rc never goes above 0, it will be able to safely delete itself
// even in the presence of the cycle detector.
if (!actor_noblock && actor->gc.rc > 0)
set_internal_flag(actor, FLAG_RC_OVER_ZERO_SEEN);
// If we have been scheduled, the head will not be marked as empty.
pony_msg_t* head = atomic_load_explicit(&actor->q.head, memory_order_acquire);
while((msg = ponyint_actor_messageq_pop(&actor->q
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current
#endif
)) != NULL)
{
#ifdef USE_RUNTIMESTATS
uint64_t used_cpu = ponyint_sched_cpu_used(ctx);
ctx->schedulerstats.misc_cpu += used_cpu;
#endif
bool app_msg = handle_message(ctx, actor, msg);
// if an actors rc never goes above 0, it will be able to safely delete
// itself even in the presence of the cycle detector. This is one of two
// checks to see if that invariant is in place for a given actor.
if (!actor_noblock && actor->gc.rc > 0)
set_internal_flag(actor, FLAG_RC_OVER_ZERO_SEEN);
#ifdef USE_RUNTIMESTATS
used_cpu = ponyint_sched_cpu_used(ctx);
#endif
if(app_msg)
{
#ifdef USE_RUNTIMESTATS
ctx->schedulerstats.actor_app_cpu += used_cpu;
actor->actorstats.app_cpu += used_cpu;
actor->actorstats.app_messages_processed_counter++;
#endif
// If we handle an application message, try to gc.
app++;
try_gc(ctx, actor);
// maybe mute actor; returns true if mute occurs
if(maybe_should_mute(actor))
return false;
// if we've reached our batch limit
// or if we're polling where we want to stop after one app message
if(app == batch || polling)
return batch_limit_reached(actor, polling);
}
else
{
#ifdef USE_RUNTIMESTATS
ctx->schedulerstats.actor_system_cpu += used_cpu;
actor->actorstats.system_cpu += used_cpu;
actor->actorstats.system_messages_processed_counter++;
#endif
}
// Stop handling a batch if we reach the head we found when we were
// scheduled.
if(msg == head)
break;
}
// We didn't hit our app message batch limit. We now believe our queue to be
// empty, but we may have received further messages.
pony_assert(app < batch);
pony_assert(!has_sync_flag(actor, SYNC_FLAG_MUTED));
if(has_sync_flag(actor, SYNC_FLAG_OVERLOADED))
{
// if we were overloaded and didn't process a full batch, set ourselves as
// no longer overloaded. Once this is done:
// 1- sending to this actor is no longer grounds for an actor being muted
// 2- this actor can no longer send to other actors free from muting should
// the receiver be overloaded or muted
actor_unsetoverloaded(actor);
}
try_gc(ctx, actor);
// If we have processed any application level messages, defer blocking.
if(app > 0)
return true;
// note that we're logically blocked
if(!has_internal_flag(actor, FLAG_BLOCKED | FLAG_SYSTEM | FLAG_BLOCKED_SENT))
{
set_internal_flag(actor, FLAG_BLOCKED);
}
if (has_internal_flag(actor, FLAG_BLOCKED))
{
if (actor->gc.rc == 0)
{
// Here, we is what we know to be true:
//
// - the actor is blocked
// - the actor likely has no messages in its queue
// - there's no references to this actor
//
if (actor_noblock || !has_internal_flag(actor, FLAG_RC_OVER_ZERO_SEEN))
{
// When 'actor_noblock` is true, the cycle detector isn't running.
// this means actors won't be garbage collected unless we take special
// action. Therefore if `noblock` is on, we should garbage collect the
// actor
//
// When the cycle detector is running, it is still safe to locally
// delete if our RC has never been above 0 because the cycle detector
// can't possibly know about the actor's existence so, if it's message
// queue is empty, doing a local delete is safe.
if(ponyint_messageq_isempty(&actor->q))
{
// The actors queue is empty which means this actor is a zombie
// and can be reaped.
// mark the queue as empty or else destroy will hang
bool empty = ponyint_messageq_markempty(&actor->q);
// make sure the queue is actually empty as expected
pony_assert(empty);
// "Locally delete" the actor.
ponyint_actor_setpendingdestroy(actor);
ponyint_actor_final(ctx, actor);
ponyint_actor_sendrelease(ctx, actor);
ponyint_actor_destroy(actor);
// make sure the scheduler will not reschedule this actor
return !empty;
}
} else {
// The cycle detector is running so we have to ensure that it doesn't
// send a message to the actor while we're in the process of telling
// it that it is safe to destroy the actor.
//
// Before we check if our queue is empty, we need to obtain the
// "critical delete" atomic for this actor. The cycle detector will
// bail from sending any messages if it can't obtain the atomic.
// Similarly, if the actor can't obtain the atomic here, then we do not
// attempt any "I can be destroyed" operations as the cycle detector is
// in the process of sending us a message.
if (ponyint_acquire_cycle_detector_critical(actor))
{
if(ponyint_messageq_isempty(&actor->q))
{
// At this point the actors queue is empty and the cycle detector
// will not send it any more messages because we "own" the barrier
// for sending cycle detector messages to this actor.
ponyint_actor_setpendingdestroy(actor);
// Tell cycle detector that this actor is a zombie and will not get
// any more messages/work and can be reaped.
// Mark the actor as FLAG_BLOCKED_SENT and send a BLOCKED message
// to speed up reaping otherwise waiting for the cycle detector
// to get around to asking if we're blocked could result in
// unnecessary memory growth.
//
// We're blocked, send block message telling the cycle detector
// to reap this actor (because its `rc == 0`).
// This is concurrency safe because, only the cycle detector might
// have a reference to this actor (rc is 0) so another actor can not
// send it an application message that results this actor becoming
// unblocked (which would create a race condition) and we've also
// ensured that the cycle detector will not send this actor any more
// messages (which would also create a race condition).
send_block(ctx, actor);
// mark the queue as empty or else destroy will hang
bool empty = ponyint_messageq_markempty(&actor->q);
// make sure the queue is actually empty as expected
pony_assert(empty);
// "give up" critical section ownership
ponyint_release_cycle_detector_critical(actor);
// make sure the scheduler will not reschedule this actor
return !empty;
} else {
// "give up" critical section ownership
ponyint_release_cycle_detector_critical(actor);
}
}
}
} else {
// gc is greater than 0
if (!actor_noblock && !has_internal_flag(actor, FLAG_CD_CONTACTED))
{
// The cycle detector is running and we've never contacted it ourselves,
// so let's it know we exist in case it is unaware.
if (ponyint_acquire_cycle_detector_critical(actor))
{
send_block(ctx, actor);
// "give up" critical section ownership
ponyint_release_cycle_detector_critical(actor);
}
}
}
}
// If we mark the queue as empty, then it is no longer safe to do any
// operations on this actor that aren't concurrency safe so make
// `ponyint_messageq_markempty` the last thing we do.
// Return true (i.e. reschedule immediately) if our queue isn't empty.
return !ponyint_messageq_markempty(&actor->q);
}
void ponyint_actor_destroy(pony_actor_t* actor)
{
pony_assert(has_sync_flag(actor, SYNC_FLAG_PENDINGDESTROY));
// Make sure the actor being destroyed has finished marking its queue
// as empty. Otherwise, it may spuriously see that tail and head are not
// the same and fail to mark the queue as empty, resulting in it getting
// rescheduled.
pony_msg_t* head = NULL;
do
{
head = atomic_load_explicit(&actor->q.head, memory_order_relaxed);
} while(((uintptr_t)head & (uintptr_t)1) != (uintptr_t)1);
atomic_thread_fence(memory_order_acquire);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&actor->q.head);
#endif
ponyint_messageq_destroy(&actor->q, false);
ponyint_gc_destroy(&actor->gc);
ponyint_heap_destroy(&actor->heap);
#ifdef USE_RUNTIMESTATS
pony_ctx_t* ctx = pony_ctx();
ctx->schedulerstats.mem_used_actors -= actor->type->size;
ctx->schedulerstats.mem_allocated_actors -= ponyint_pool_used_size(actor->type->size);
ctx->schedulerstats.destroyed_actors_counter++;
print_actor_stats(actor);
#endif
// Free variable sized actors correctly.
ponyint_pool_free_size(actor->type->size, actor);
}
gc_t* ponyint_actor_gc(pony_actor_t* actor)
{
return &actor->gc;
}
heap_t* ponyint_actor_heap(pony_actor_t* actor)
{
return &actor->heap;
}
bool ponyint_actor_pendingdestroy(pony_actor_t* actor)
{
return has_sync_flag(actor, SYNC_FLAG_PENDINGDESTROY);
}
void ponyint_actor_setpendingdestroy(pony_actor_t* actor)
{
// This is thread-safe, even though the flag is set from the cycle detector.
// The function is only called after the cycle detector has detected a true
// cycle and an actor won't change its flags if it is part of a true cycle.
// The synchronisation is done through the ACK message sent by the actor to
// the cycle detector.
set_sync_flag(actor, SYNC_FLAG_PENDINGDESTROY);
}
void ponyint_actor_final(pony_ctx_t* ctx, pony_actor_t* actor)
{
// This gets run while the cycle detector is handling a message. Set the
// current actor before running anything.
pony_actor_t* prev = ctx->current;
ctx->current = actor;
// Run the actor finaliser if it has one.
if(actor->type->final != NULL)
actor->type->final(actor);
// Run all outstanding object finalisers.
ponyint_heap_final(&actor->heap);
// Restore the current actor.
ctx->current = prev;
}
void ponyint_actor_sendrelease(pony_ctx_t* ctx, pony_actor_t* actor)
{
ponyint_gc_sendrelease(ctx, &actor->gc);
}
void ponyint_actor_setsystem(pony_actor_t* actor)
{
set_internal_flag(actor, FLAG_SYSTEM);
}
void ponyint_actor_setnoblock(bool state)
{
actor_noblock = state;
}
bool ponyint_actor_getnoblock()
{
return actor_noblock;
}
PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type,
bool orphaned)
{
pony_assert(type != NULL);
// allocate variable sized actors correctly
pony_actor_t* actor = (pony_actor_t*)ponyint_pool_alloc_size(type->size);
memset(actor, 0, type->size);
actor->type = type;
#ifdef USE_RUNTIMESTATS
ctx->schedulerstats.mem_used_actors += type->size;
ctx->schedulerstats.mem_allocated_actors += ponyint_pool_used_size(type->size);
ctx->schedulerstats.created_actors_counter++;
#endif
ponyint_messageq_init(&actor->q);
ponyint_heap_init(&actor->heap);
ponyint_gc_done(&actor->gc);
if(ctx->current != NULL && !orphaned)
{
// Do not set an rc if the actor is orphaned. The compiler determined that
// there are no references to this actor. By not setting a non-zero RC, we
// will GC the actor sooner and lower overall memory usage.
actor->gc.rc = GC_INC_MORE;
ponyint_gc_createactor(ctx->current, actor);
} else {
// no creator, so the actor isn't referenced by anything
actor->gc.rc = 0;
}
DTRACE2(ACTOR_ALLOC, (uintptr_t)ctx->scheduler, (uintptr_t)actor);
return actor;
}
// this is currently only used for two purposes:
// * to destroy the cycle detector
// * to destroy the temporary actor created to run primitive finalisers
// that happens after the schedulers have exited and the cycle detector
// has been destroyed
//
// as a result, this does not need to be concurrency safe
// or tell the cycle detector of what it is doing
PONY_API void ponyint_destroy(pony_ctx_t* ctx, pony_actor_t* actor)
{
(void)ctx;
// This destroys an actor immediately.
// The finaliser is not called.
ponyint_actor_setpendingdestroy(actor);
ponyint_actor_destroy(actor);
}
PONY_API pony_msg_t* pony_alloc_msg(uint32_t index, uint32_t id)
{
#ifdef USE_RUNTIMESTATS
pony_ctx_t* ctx = pony_ctx();
if(ctx->current)
ctx->current->actorstats.messages_sent_counter++;
#endif
#ifdef USE_RUNTIMESTATS_MESSAGES
ctx->schedulerstats.mem_used_inflight_messages += POOL_SIZE(index);
ctx->schedulerstats.mem_allocated_inflight_messages += POOL_SIZE(index);
ctx->schedulerstats.num_inflight_messages++;
#endif
pony_msg_t* msg = (pony_msg_t*)ponyint_pool_alloc(index);
msg->index = index;
msg->id = id;
#ifndef PONY_NDEBUG
atomic_store_explicit(&msg->next, NULL, memory_order_relaxed);
#endif
return msg;
}
PONY_API pony_msg_t* pony_alloc_msg_size(size_t size, uint32_t id)
{
return pony_alloc_msg((uint32_t)ponyint_pool_index(size), id);
}
PONY_API void pony_sendv(pony_ctx_t* ctx, pony_actor_t* to, pony_msg_t* first,
pony_msg_t* last, bool has_app_msg)
{
// The function takes a prebuilt chain instead of varargs because the latter
// is expensive and very hard to optimise.
pony_assert(well_formed_msg_chain(first, last));
// Make sure we're not trying to send a message to an actor that is about
// to be destroyed.
pony_assert(!ponyint_actor_pendingdestroy(to));
if(DTRACE_ENABLED(ACTOR_MSG_SEND))
{
pony_msg_t* m = first;
while(m != last)
{
DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, m->id,
(uintptr_t)ctx->current, (uintptr_t)to);
m = atomic_load_explicit(&m->next, memory_order_relaxed);
}
DTRACE4(ACTOR_MSG_SEND, (uintptr_t)ctx->scheduler, last->id,
(uintptr_t)ctx->current, (uintptr_t)to);
}
if(has_app_msg)
maybe_mark_should_mute(ctx, to);
if(ponyint_actor_messageq_push(&to->q, first, last
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current, to
#endif