1 /// Actor framework iplementation
2 /// Examles: [tagion.testbench.services]
3 module tagion.actor.actor;
4 
5 @safe:
6 
7 import std.typecons;
8 import core.exception : AssertError;
9 import core.thread;
10 import core.time;
11 import std.exception;
12 import std.format : format;
13 import std.meta;
14 import std.traits;
15 import std.traits;
16 import std.typecons;
17 import std.variant : Variant;
18 import tagion.actor.exceptions;
19 import tagion.hibon.HiBONRecord;
20 import tagion.logger;
21 import tagion.utils.Result;
22 import concurrency = tagion.utils.pretend_safe_concurrency;
23 import tagion.utils.pretend_safe_concurrency;
24 import tagion.actor.exceptions;
25 
26 import tagion.hibon.HiBONRecord;
27 
28 /**
29  * Message "Atom" type
30  * Examples:
31  * ---
32  * // As a type
33  * Msg!"hi";
34  * // As a value
35  * Msg!"hi"();
36  * ---
37  */
38 struct Msg(string name) {
39 }
40 
41 private struct ActorInfo {
42     private Ctrl[string] childrenState;
43     bool stop;
44 
45     @property @safe
46     bool task_name(string name) nothrow const {
47         return log.task_name(name);
48     }
49 
50     @property @safe
51     string task_name() const nothrow {
52         return log.task_name;
53     }
54 
55 }
56 
57 static ActorInfo thisActor;
58 
59 ///
60 unittest {
61     assert(thisActor.task_name is string.init, "task_name did not start as init");
62     enum dummy_name = "dummy_name";
63     scope (exit) {
64         unregister(dummy_name);
65     }
66     assert(thisActor.task_name = dummy_name, "setting name failed");
67     assert(thisActor.task_name = dummy_name, "setting name seconds time did not fallthrough");
68     assert(thisActor.task_name == dummy_name, "name was not the same as we set");
69     concurrency.spawn(() {
70         assert(!(thisActor.task_name = dummy_name), "Should not be able to set the same task name in another tid");
71     });
72     assert(locate(thisActor.task_name) is thisTid, "Name not registered");
73 }
74 
75 /* 
76  * Reguest type
77  * Will generate a random id if the ID type is a number
78  */
79 struct Request(string name, ID = uint) {
80     Msg!name msg;
81     ID id;
82     string task_name;
83 
84     static Request opCall() @safe nothrow {
85         import std.traits : isNumeric;
86         import tagion.utils.Random;
87 
88         Request!(name, ID) r;
89         r.msg = Msg!name();
90         static if (isNumeric!ID) {
91             r.id = generateId!ID;
92         }
93         assert(thisActor.task_name !is string.init, "The requester is not registered as a task");
94         r.task_name = thisActor.task_name;
95         return r;
96     }
97 
98     alias Response = .Response!(name, ID);
99 
100     /// Send back some data to the task who sent the request
101     void respond(Args...)(Args args) {
102         auto res = Response(msg, id);
103         auto tid = locate(task_name);
104         if (tid !is Tid.init) {
105             locate(task_name).send(res, args);
106         }
107     }
108 }
109 
110 /// 
111 struct Response(string name, ID = uint) {
112     Msg!name msg;
113     ID id;
114 }
115 
116 ///
117 @safe
118 unittest {
119     thisActor.task_name = "req_resp";
120     scope (exit) {
121         unregister("req_resp");
122     }
123     alias Some_req = Request!"some_req";
124     void some_responder(Some_req req) {
125         req.respond("hello");
126     }
127 
128     auto created_req = Some_req();
129     some_responder(created_req);
130     int received = receiveTimeout(Duration.zero, (Some_req.Response res, string _) {
131         assert(created_req.msg == res.msg, "request msg were not the same");
132         assert(created_req.id == res.id, "request id were not the same");
133     });
134     assert(received, "never received response");
135 }
136 
137 // State messages send to the supervisor
138 enum Ctrl {
139     UNKNOWN, // Unkwnown state
140     STARTING, // The actors is starting
141     ALIVE, /// The actor is running
142     END, /// The actor is stopping
143 }
144 
145 // Signals send from the supervisor to the direct children
146 enum Sig {
147     STOP,
148 }
149 
150 /// Control message sent to a supervisor
151 /// contains the Tid of the actor which send it and the state
152 alias CtrlMsg = Tuple!(string, "task_name", Ctrl, "ctrl");
153 
154 bool statusChildren(Ctrl ctrl) @safe nothrow {
155     foreach (val; thisActor.childrenState.byValue) {
156         if (val != ctrl) {
157             return false;
158         }
159     }
160     return true;
161 }
162 
163 /* 
164  * Waif for the spawned child Actors of this thread to be in Ctrl state.
165  * If it an Ctrl.END state it will free the children.
166  * Returns: true if all of the get in Ctrl state before the timeout
167  */
168 bool waitforChildren(Ctrl state, Duration timeout = 1.seconds) @safe nothrow {
169     const begin_time = MonoTime.currTime;
170     try {
171         while (!statusChildren(state) && MonoTime.currTime - begin_time <= timeout) {
172             if (thisActor.stop && state !is Ctrl.END) {
173                 return false;
174             }
175             receiveTimeout(
176                     timeout / thisActor.childrenState.length,
177                     &control,
178                     &signal,
179                     &ownerTerminated
180 
181             );
182         }
183         log("%s", thisActor.childrenState);
184         if (state is Ctrl.END) {
185             destroy(thisActor.childrenState);
186         }
187         return statusChildren(state);
188     }
189     catch (Exception e) {
190         return false;
191     }
192 }
193 
194 unittest {
195     enum task_name = "child_task";
196     assert(waitforChildren(Ctrl.ALIVE, Duration.min), "Waiting for no spawned tid, should always be true");
197     thisActor.childrenState[task_name] = Ctrl.STARTING;
198     assert(!waitforChildren(Ctrl.ALIVE, Duration.min), "should've timed out");
199     thisActor.childrenState[task_name] = Ctrl.ALIVE;
200     assert(waitforChildren(Ctrl.ALIVE, Duration.min));
201     thisActor.childrenState[task_name] = Ctrl.END;
202     assert(waitforChildren(Ctrl.END));
203     assert(thisActor.childrenState.length == 0, "childrenState should be cleaned up when checked that all of them have ended");
204 }
205 
206 /// Checks if a type has the required members to be an actor
207 enum bool isActor(A) = hasMember!(A, "task") && isCallable!(A.task) && isSafe!(A.task);
208 
209 enum bool isFailHandler(F)
210     = is(F : void function(TaskFailure))
211     || is(F : void delegate(TaskFailure));
212 
213 /// Stolen from std.concurrency;
214 template isSpawnable(F, T...) {
215     template isParamsImplicitlyConvertible(F1, F2, int i = 0) {
216         alias param1 = Parameters!F1;
217         alias param2 = Parameters!F2;
218         static if (param1.length != param2.length)
219             enum isParamsImplicitlyConvertible = false;
220         else static if (param1.length == i)
221             enum isParamsImplicitlyConvertible = true;
222         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
223             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
224                         F2, i + 1);
225         else
226             enum isParamsImplicitlyConvertible = false;
227     }
228 
229     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
230         && isParamsImplicitlyConvertible!(F, void function(T))
231         && (isFunctionPointer!F || !hasUnsharedAliasing!F);
232 }
233 
234 /**
235  * A "reference" to an actor that may or may not be spawned, we will never know
236  */
237 struct ActorHandle {
238     /// the name of the possibly running task
239     string task_name;
240 
241     private Tid _tid;
242     /// the tid of the spawned task
243     Tid tid() {
244         _tid = concurrency.locate(task_name);
245         return _tid;
246     }
247 
248     // Get the status of the task, asserts if the calling task did not spawn it
249     Ctrl state() nothrow {
250         if ((task_name in thisActor.childrenState) !is null) {
251             return thisActor.childrenState[task_name];
252         }
253         return Ctrl.UNKNOWN;
254     }
255 
256     /// Send a message to this task
257     void send(T...)(T args) @trusted {
258         try {
259             concurrency.send(_tid, args);
260         }
261         catch (AssertError _) {
262             concurrency.send(tid, args).collectException!AssertError;
263         }
264     }
265     /// Send a message to this task
266     void prioritySend(T...)(T args) @trusted {
267         try {
268             concurrency.prioritySend(_tid, args);
269         }
270         catch (AssertError _) {
271             concurrency.prioritySend(tid, args).collectException!AssertError;
272         }
273     }
274 }
275 
276 ActorHandle spawn(A, Args...)(immutable(A) actor, string name, Args args) @safe nothrow
277 if (isActor!A && isSpawnable!(typeof(A.task), Args)) {
278 
279     try {
280         Tid tid;
281         tid = concurrency.spawn((immutable(A) _actor, string name, Args args) @trusted nothrow{
282             // log.register(name);
283             thisActor.task_name = name;
284             thisActor.stop = false;
285             A actor = cast(A) _actor;
286             setState(Ctrl.STARTING); // Tell the owner that you are starting.
287             try {
288                 actor.task(args);
289                 // If the actor forgets to kill it's children we'll do it anyway
290                 if (!statusChildren(Ctrl.END)) {
291                     foreach (child_task_name, ctrl; thisActor.childrenState) {
292                         if (ctrl is Ctrl.ALIVE) {
293                             locate(child_task_name).send(Sig.STOP);
294                         }
295                     }
296                     waitforChildren(Ctrl.END);
297                 }
298             }
299             catch (Exception t) {
300                 fail(t);
301             } // This is bad but, We catch assert per thread because there is no message otherwise, when runnning multithreaded
302             catch (AssertError e) {
303                 import tagion.GlobalSignals;
304 
305                 log(e);
306                 stopsignal.set;
307             }
308             end;
309         }, actor, name, args);
310         thisActor.childrenState[name] = Ctrl.UNKNOWN;
311         log("spawning %s", name);
312         tid.setMaxMailboxSize(int.max, OnCrowding.throwException);
313         return ActorHandle(name);
314     }
315     catch (Exception e) {
316         assert(0, format("Exception: %s", e.msg));
317     }
318 }
319 
320 ActorHandle _spawn(A, Args...)(string name, Args args) @safe nothrow
321 if (isActor!A) {
322     try {
323         Tid tid;
324         tid = concurrency.spawn((string name, Args args) @trusted nothrow{
325             thisActor.task_name = name;
326             thisActor.stop = false;
327             try {
328                 A actor = A(args);
329                 setState(Ctrl.STARTING); // Tell the owner that you are starting.
330                 actor.task();
331                 // If the actor forgets to kill it's children we'll do it anyway
332                 if (!statusChildren(Ctrl.END)) {
333                     foreach (child_task_name, ctrl; thisActor.childrenState) {
334                         if (ctrl is Ctrl.ALIVE) {
335                             locate(child_task_name).send(Sig.STOP);
336                         }
337                     }
338                     waitforChildren(Ctrl.END);
339                 }
340             }
341             catch (Exception t) {
342                 fail(t);
343             } // This is bad but, We catch assert per thread because there is no message otherwise, when runnning multithreaded
344             catch (AssertError e) {
345                 import tagion.GlobalSignals;
346 
347                 log(e);
348                 stopsignal.set;
349             }
350             end;
351         }, name, args);
352         thisActor.childrenState[name] = Ctrl.UNKNOWN;
353         log("spawning %s", name);
354         tid.setMaxMailboxSize(int.max, OnCrowding.throwException);
355         return ActorHandle(name);
356     }
357     catch (Exception e) {
358         assert(0, format("Exception: %s", e.msg));
359     }
360 }
361 
362 ActorHandle spawn(A, Args...)(string name, Args args) @safe nothrow
363 if (isActor!A) {
364     immutable A actor;
365     return spawn(actor, name, args);
366 }
367 
368 /// Nullable and nothrow wrapper around ownerTid
369 Nullable!Tid tidOwner() @safe nothrow {
370     // tid is "null"
371     Nullable!Tid tid;
372     try {
373         // Tid is assigned
374         tid = ownerTid;
375     }
376     catch (Exception _) {
377     }
378     return tid;
379 }
380 
381 /// Send to the owner if there is one
382 void sendOwner(T...)(T vals) @safe {
383     if (!tidOwner.isNull) {
384         concurrency.send(tidOwner.get, vals);
385     }
386     else {
387         log.error("No owner tried to send a message to it");
388         log.error("%s", tuple(vals));
389     }
390 }
391 
392 static Topic taskfailure = Topic("taskfailure");
393 
394 /** 
395  * Send a TaskFailure up to the owner
396  * Silently fails if there is no owner
397  * Does NOT exit regular control flow
398 */
399 void fail(Throwable t) @trusted nothrow {
400     try {
401         debug (actor) {
402             log(t);
403         }
404         immutable tf = TaskFailure(thisActor.task_name, cast(immutable) t);
405         log(taskfailure, "taskfailure", tf); // taskfailrue event
406         ownerTid.prioritySend(tf);
407     }
408     catch (Exception e) {
409         log.fatal("Failed to deliver TaskFailure: \n
410                 %s\n\n
411                 Because:\n
412                 %s", t, e);
413         log.fatal("Stopping because we failed to deliver a TaskFailure to the supervisor");
414         thisActor.stop = true;
415     }
416 }
417 
418 /// send your state to your owner
419 void setState(Ctrl ctrl) @safe nothrow {
420     try {
421         log("setting state to %s", ctrl);
422         assert(thisActor.task_name !is string.init, "Can not set the state for a task with no name");
423         ownerTid.prioritySend(CtrlMsg(thisActor.task_name, ctrl));
424     }
425     catch (TidMissingException e) {
426         log.error("Failed to set state %s", e.message);
427     }
428     catch (Exception e) {
429         log.error("Failed to set state");
430         log(e);
431     }
432 }
433 
434 /// Cleanup and notify the supervisor that you have ended
435 void end() @trusted nothrow {
436     thisActor.stop = true;
437     assumeWontThrow(ThreadInfo.thisInfo.cleanup);
438     setState(Ctrl.END);
439 }
440 
441 /* 
442  * Params:
443  *   task_name = the name of the task
444  *   args = a list of message handlers for the task
445  */
446 void run(Args...)(Args args) @safe nothrow
447 if (allSatisfy!(isSafe, Args)) {
448     // Check if a failHandler was passed as an arg
449     static if (args.length == 1 && isFailHandler!(typeof(args[$ - 1]))) {
450         enum failhandler = () @safe {}; /// Use the fail handler passed through `args`
451     }
452     else {
453         enum failhandler = (TaskFailure tf) @safe {
454             if (!tidOwner.isNull) {
455                 ownerTid.prioritySend(tf);
456             }
457         };
458     }
459 
460     scope (failure) {
461         setState(Ctrl.END);
462     }
463 
464     setState(Ctrl.ALIVE); // Tell the owner that you are running
465     while (!thisActor.stop) {
466         try {
467             receive(
468                     args, // The message handlers you pass to your Actor template
469                     failhandler,
470                     &signal,
471                     &control,
472                     &ownerTerminated,
473                     &unknown,
474             );
475         }
476         catch (MailboxFull t) {
477             fail(t);
478             thisActor.stop = true;
479         }
480         catch (Exception t) {
481             fail(t);
482         }
483     }
484 }
485 
486 /** 
487  * 
488  * Params:
489  *   duration = the duration for the timeout
490  *   timeout = delegate function to call
491  *   args = normal message handlers for the task
492  */
493 void runTimeout(Args...)(Duration duration, void delegate() @safe timeout, Args args) nothrow
494 if (allSatisfy!(isSafe, Args)) {
495     // Check if a failHandler was passed as an arg
496     static if (args.length == 1 && isFailHandler!(typeof(args[$ - 1]))) {
497         enum failhandler = () @safe {}; /// Use the fail handler passed through `args`
498     }
499     else {
500         enum failhandler = (TaskFailure tf) @safe {
501             if (!tidOwner.isNull) {
502                 ownerTid.prioritySend(tf);
503             }
504         };
505     }
506 
507     scope (failure) {
508         setState(Ctrl.END);
509     }
510 
511     setState(Ctrl.ALIVE); // Tell the owner that you are running
512     while (!thisActor.stop) {
513         try {
514             const message = receiveTimeout(
515                     duration,
516                     args, // The message handlers you pass to your Actor template
517                     failhandler,
518                     &signal,
519                     &control,
520                     &ownerTerminated,
521                     &unknown,
522             );
523             if (!message) {
524                 timeout();
525             }
526         }
527         catch (MailboxFull t) {
528             fail(t);
529             thisActor.stop = true;
530         }
531         catch (Exception t) {
532             fail(t);
533         }
534     }
535 }
536 
537 void signal(Sig signal) @safe {
538     with (Sig) final switch (signal) {
539     case STOP:
540         thisActor.stop = true;
541         break;
542     }
543 }
544 
545 /// Controls message sent from the children.
546 void control(CtrlMsg msg) @safe {
547     thisActor.childrenState[msg.task_name] = msg.ctrl;
548 }
549 
550 /// Stops the actor if the supervisor stops
551 void ownerTerminated(OwnerTerminated) @safe {
552     log.trace("%s, Owner stopped... nothing to life for... stopping self", thisTid);
553     thisActor.stop = true;
554 }
555 
556 /**
557  * The default message handler, if it's an unknown messages it will send a FAIL to the owner.
558  * Params:
559  *   message = literally any message
560  */
561 void unknown(Variant message) @trusted {
562     throw new UnknownMessage("No delegate to deal with message: %s".format(message));
563 }