1 /// Actor framework iplementation 2 /// Examles: [tagion.testbench.services] 3 module tagion.actor.actor; 4 5 @safe: 6 7 import std.typecons; 8 import core.exception : AssertError; 9 import core.thread; 10 import core.time; 11 import std.exception; 12 import std.format : format; 13 import std.meta; 14 import std.traits; 15 import std.traits; 16 import std.typecons; 17 import std.variant : Variant; 18 import tagion.actor.exceptions; 19 import tagion.hibon.HiBONRecord; 20 import tagion.logger; 21 import tagion.utils.Result; 22 import concurrency = tagion.utils.pretend_safe_concurrency; 23 import tagion.utils.pretend_safe_concurrency; 24 import tagion.actor.exceptions; 25 26 import tagion.hibon.HiBONRecord; 27 28 /** 29 * Message "Atom" type 30 * Examples: 31 * --- 32 * // As a type 33 * Msg!"hi"; 34 * // As a value 35 * Msg!"hi"(); 36 * --- 37 */ 38 struct Msg(string name) { 39 } 40 41 private struct ActorInfo { 42 private Ctrl[string] childrenState; 43 bool stop; 44 45 @property @safe 46 bool task_name(string name) nothrow const { 47 return log.task_name(name); 48 } 49 50 @property @safe 51 string task_name() const nothrow { 52 return log.task_name; 53 } 54 55 } 56 57 static ActorInfo thisActor; 58 59 /// 60 unittest { 61 assert(thisActor.task_name is string.init, "task_name did not start as init"); 62 enum dummy_name = "dummy_name"; 63 scope (exit) { 64 unregister(dummy_name); 65 } 66 assert(thisActor.task_name = dummy_name, "setting name failed"); 67 assert(thisActor.task_name = dummy_name, "setting name seconds time did not fallthrough"); 68 assert(thisActor.task_name == dummy_name, "name was not the same as we set"); 69 concurrency.spawn(() { 70 assert(!(thisActor.task_name = dummy_name), "Should not be able to set the same task name in another tid"); 71 }); 72 assert(locate(thisActor.task_name) is thisTid, "Name not registered"); 73 } 74 75 /* 76 * Reguest type 77 * Will generate a random id if the ID type is a number 78 */ 79 struct Request(string name, ID = uint) { 80 Msg!name msg; 81 ID id; 82 string task_name; 83 84 static Request opCall() @safe nothrow { 85 import std.traits : isNumeric; 86 import tagion.utils.Random; 87 88 Request!(name, ID) r; 89 r.msg = Msg!name(); 90 static if (isNumeric!ID) { 91 r.id = generateId!ID; 92 } 93 assert(thisActor.task_name !is string.init, "The requester is not registered as a task"); 94 r.task_name = thisActor.task_name; 95 return r; 96 } 97 98 alias Response = .Response!(name, ID); 99 100 /// Send back some data to the task who sent the request 101 void respond(Args...)(Args args) { 102 auto res = Response(msg, id); 103 auto tid = locate(task_name); 104 if (tid !is Tid.init) { 105 locate(task_name).send(res, args); 106 } 107 } 108 } 109 110 /// 111 struct Response(string name, ID = uint) { 112 Msg!name msg; 113 ID id; 114 } 115 116 /// 117 @safe 118 unittest { 119 thisActor.task_name = "req_resp"; 120 scope (exit) { 121 unregister("req_resp"); 122 } 123 alias Some_req = Request!"some_req"; 124 void some_responder(Some_req req) { 125 req.respond("hello"); 126 } 127 128 auto created_req = Some_req(); 129 some_responder(created_req); 130 int received = receiveTimeout(Duration.zero, (Some_req.Response res, string _) { 131 assert(created_req.msg == res.msg, "request msg were not the same"); 132 assert(created_req.id == res.id, "request id were not the same"); 133 }); 134 assert(received, "never received response"); 135 } 136 137 // State messages send to the supervisor 138 enum Ctrl { 139 UNKNOWN, // Unkwnown state 140 STARTING, // The actors is starting 141 ALIVE, /// The actor is running 142 END, /// The actor is stopping 143 } 144 145 // Signals send from the supervisor to the direct children 146 enum Sig { 147 STOP, 148 } 149 150 /// Control message sent to a supervisor 151 /// contains the Tid of the actor which send it and the state 152 alias CtrlMsg = Tuple!(string, "task_name", Ctrl, "ctrl"); 153 154 bool statusChildren(Ctrl ctrl) @safe nothrow { 155 foreach (val; thisActor.childrenState.byValue) { 156 if (val != ctrl) { 157 return false; 158 } 159 } 160 return true; 161 } 162 163 /* 164 * Waif for the spawned child Actors of this thread to be in Ctrl state. 165 * If it an Ctrl.END state it will free the children. 166 * Returns: true if all of the get in Ctrl state before the timeout 167 */ 168 bool waitforChildren(Ctrl state, Duration timeout = 1.seconds) @safe nothrow { 169 const begin_time = MonoTime.currTime; 170 try { 171 while (!statusChildren(state) && MonoTime.currTime - begin_time <= timeout) { 172 if (thisActor.stop && state !is Ctrl.END) { 173 return false; 174 } 175 receiveTimeout( 176 timeout / thisActor.childrenState.length, 177 &control, 178 &signal, 179 &ownerTerminated 180 181 ); 182 } 183 log("%s", thisActor.childrenState); 184 if (state is Ctrl.END) { 185 destroy(thisActor.childrenState); 186 } 187 return statusChildren(state); 188 } 189 catch (Exception e) { 190 return false; 191 } 192 } 193 194 unittest { 195 enum task_name = "child_task"; 196 assert(waitforChildren(Ctrl.ALIVE, Duration.min), "Waiting for no spawned tid, should always be true"); 197 thisActor.childrenState[task_name] = Ctrl.STARTING; 198 assert(!waitforChildren(Ctrl.ALIVE, Duration.min), "should've timed out"); 199 thisActor.childrenState[task_name] = Ctrl.ALIVE; 200 assert(waitforChildren(Ctrl.ALIVE, Duration.min)); 201 thisActor.childrenState[task_name] = Ctrl.END; 202 assert(waitforChildren(Ctrl.END)); 203 assert(thisActor.childrenState.length == 0, "childrenState should be cleaned up when checked that all of them have ended"); 204 } 205 206 /// Checks if a type has the required members to be an actor 207 enum bool isActor(A) = hasMember!(A, "task") && isCallable!(A.task) && isSafe!(A.task); 208 209 enum bool isFailHandler(F) 210 = is(F : void function(TaskFailure)) 211 || is(F : void delegate(TaskFailure)); 212 213 /// Stolen from std.concurrency; 214 template isSpawnable(F, T...) { 215 template isParamsImplicitlyConvertible(F1, F2, int i = 0) { 216 alias param1 = Parameters!F1; 217 alias param2 = Parameters!F2; 218 static if (param1.length != param2.length) 219 enum isParamsImplicitlyConvertible = false; 220 else static if (param1.length == i) 221 enum isParamsImplicitlyConvertible = true; 222 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 223 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 224 F2, i + 1); 225 else 226 enum isParamsImplicitlyConvertible = false; 227 } 228 229 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 230 && isParamsImplicitlyConvertible!(F, void function(T)) 231 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 232 } 233 234 /** 235 * A "reference" to an actor that may or may not be spawned, we will never know 236 */ 237 struct ActorHandle { 238 /// the name of the possibly running task 239 string task_name; 240 241 private Tid _tid; 242 /// the tid of the spawned task 243 Tid tid() { 244 _tid = concurrency.locate(task_name); 245 return _tid; 246 } 247 248 // Get the status of the task, asserts if the calling task did not spawn it 249 Ctrl state() nothrow { 250 if ((task_name in thisActor.childrenState) !is null) { 251 return thisActor.childrenState[task_name]; 252 } 253 return Ctrl.UNKNOWN; 254 } 255 256 /// Send a message to this task 257 void send(T...)(T args) @trusted { 258 try { 259 concurrency.send(_tid, args); 260 } 261 catch (AssertError _) { 262 concurrency.send(tid, args).collectException!AssertError; 263 } 264 } 265 /// Send a message to this task 266 void prioritySend(T...)(T args) @trusted { 267 try { 268 concurrency.prioritySend(_tid, args); 269 } 270 catch (AssertError _) { 271 concurrency.prioritySend(tid, args).collectException!AssertError; 272 } 273 } 274 } 275 276 ActorHandle spawn(A, Args...)(immutable(A) actor, string name, Args args) @safe nothrow 277 if (isActor!A && isSpawnable!(typeof(A.task), Args)) { 278 279 try { 280 Tid tid; 281 tid = concurrency.spawn((immutable(A) _actor, string name, Args args) @trusted nothrow{ 282 // log.register(name); 283 thisActor.task_name = name; 284 thisActor.stop = false; 285 A actor = cast(A) _actor; 286 setState(Ctrl.STARTING); // Tell the owner that you are starting. 287 try { 288 actor.task(args); 289 // If the actor forgets to kill it's children we'll do it anyway 290 if (!statusChildren(Ctrl.END)) { 291 foreach (child_task_name, ctrl; thisActor.childrenState) { 292 if (ctrl is Ctrl.ALIVE) { 293 locate(child_task_name).send(Sig.STOP); 294 } 295 } 296 waitforChildren(Ctrl.END); 297 } 298 } 299 catch (Exception t) { 300 fail(t); 301 } // This is bad but, We catch assert per thread because there is no message otherwise, when runnning multithreaded 302 catch (AssertError e) { 303 import tagion.GlobalSignals; 304 305 log(e); 306 stopsignal.set; 307 } 308 end; 309 }, actor, name, args); 310 thisActor.childrenState[name] = Ctrl.UNKNOWN; 311 log("spawning %s", name); 312 tid.setMaxMailboxSize(int.max, OnCrowding.throwException); 313 return ActorHandle(name); 314 } 315 catch (Exception e) { 316 assert(0, format("Exception: %s", e.msg)); 317 } 318 } 319 320 ActorHandle _spawn(A, Args...)(string name, Args args) @safe nothrow 321 if (isActor!A) { 322 try { 323 Tid tid; 324 tid = concurrency.spawn((string name, Args args) @trusted nothrow{ 325 thisActor.task_name = name; 326 thisActor.stop = false; 327 try { 328 A actor = A(args); 329 setState(Ctrl.STARTING); // Tell the owner that you are starting. 330 actor.task(); 331 // If the actor forgets to kill it's children we'll do it anyway 332 if (!statusChildren(Ctrl.END)) { 333 foreach (child_task_name, ctrl; thisActor.childrenState) { 334 if (ctrl is Ctrl.ALIVE) { 335 locate(child_task_name).send(Sig.STOP); 336 } 337 } 338 waitforChildren(Ctrl.END); 339 } 340 } 341 catch (Exception t) { 342 fail(t); 343 } // This is bad but, We catch assert per thread because there is no message otherwise, when runnning multithreaded 344 catch (AssertError e) { 345 import tagion.GlobalSignals; 346 347 log(e); 348 stopsignal.set; 349 } 350 end; 351 }, name, args); 352 thisActor.childrenState[name] = Ctrl.UNKNOWN; 353 log("spawning %s", name); 354 tid.setMaxMailboxSize(int.max, OnCrowding.throwException); 355 return ActorHandle(name); 356 } 357 catch (Exception e) { 358 assert(0, format("Exception: %s", e.msg)); 359 } 360 } 361 362 ActorHandle spawn(A, Args...)(string name, Args args) @safe nothrow 363 if (isActor!A) { 364 immutable A actor; 365 return spawn(actor, name, args); 366 } 367 368 /// Nullable and nothrow wrapper around ownerTid 369 Nullable!Tid tidOwner() @safe nothrow { 370 // tid is "null" 371 Nullable!Tid tid; 372 try { 373 // Tid is assigned 374 tid = ownerTid; 375 } 376 catch (Exception _) { 377 } 378 return tid; 379 } 380 381 /// Send to the owner if there is one 382 void sendOwner(T...)(T vals) @safe { 383 if (!tidOwner.isNull) { 384 concurrency.send(tidOwner.get, vals); 385 } 386 else { 387 log.error("No owner tried to send a message to it"); 388 log.error("%s", tuple(vals)); 389 } 390 } 391 392 static Topic taskfailure = Topic("taskfailure"); 393 394 /** 395 * Send a TaskFailure up to the owner 396 * Silently fails if there is no owner 397 * Does NOT exit regular control flow 398 */ 399 void fail(Throwable t) @trusted nothrow { 400 try { 401 debug (actor) { 402 log(t); 403 } 404 immutable tf = TaskFailure(thisActor.task_name, cast(immutable) t); 405 log(taskfailure, "taskfailure", tf); // taskfailrue event 406 ownerTid.prioritySend(tf); 407 } 408 catch (Exception e) { 409 log.fatal("Failed to deliver TaskFailure: \n 410 %s\n\n 411 Because:\n 412 %s", t, e); 413 log.fatal("Stopping because we failed to deliver a TaskFailure to the supervisor"); 414 thisActor.stop = true; 415 } 416 } 417 418 /// send your state to your owner 419 void setState(Ctrl ctrl) @safe nothrow { 420 try { 421 log("setting state to %s", ctrl); 422 assert(thisActor.task_name !is string.init, "Can not set the state for a task with no name"); 423 ownerTid.prioritySend(CtrlMsg(thisActor.task_name, ctrl)); 424 } 425 catch (TidMissingException e) { 426 log.error("Failed to set state %s", e.message); 427 } 428 catch (Exception e) { 429 log.error("Failed to set state"); 430 log(e); 431 } 432 } 433 434 /// Cleanup and notify the supervisor that you have ended 435 void end() @trusted nothrow { 436 thisActor.stop = true; 437 assumeWontThrow(ThreadInfo.thisInfo.cleanup); 438 setState(Ctrl.END); 439 } 440 441 /* 442 * Params: 443 * task_name = the name of the task 444 * args = a list of message handlers for the task 445 */ 446 void run(Args...)(Args args) @safe nothrow 447 if (allSatisfy!(isSafe, Args)) { 448 // Check if a failHandler was passed as an arg 449 static if (args.length == 1 && isFailHandler!(typeof(args[$ - 1]))) { 450 enum failhandler = () @safe {}; /// Use the fail handler passed through `args` 451 } 452 else { 453 enum failhandler = (TaskFailure tf) @safe { 454 if (!tidOwner.isNull) { 455 ownerTid.prioritySend(tf); 456 } 457 }; 458 } 459 460 scope (failure) { 461 setState(Ctrl.END); 462 } 463 464 setState(Ctrl.ALIVE); // Tell the owner that you are running 465 while (!thisActor.stop) { 466 try { 467 receive( 468 args, // The message handlers you pass to your Actor template 469 failhandler, 470 &signal, 471 &control, 472 &ownerTerminated, 473 &unknown, 474 ); 475 } 476 catch (MailboxFull t) { 477 fail(t); 478 thisActor.stop = true; 479 } 480 catch (Exception t) { 481 fail(t); 482 } 483 } 484 } 485 486 /** 487 * 488 * Params: 489 * duration = the duration for the timeout 490 * timeout = delegate function to call 491 * args = normal message handlers for the task 492 */ 493 void runTimeout(Args...)(Duration duration, void delegate() @safe timeout, Args args) nothrow 494 if (allSatisfy!(isSafe, Args)) { 495 // Check if a failHandler was passed as an arg 496 static if (args.length == 1 && isFailHandler!(typeof(args[$ - 1]))) { 497 enum failhandler = () @safe {}; /// Use the fail handler passed through `args` 498 } 499 else { 500 enum failhandler = (TaskFailure tf) @safe { 501 if (!tidOwner.isNull) { 502 ownerTid.prioritySend(tf); 503 } 504 }; 505 } 506 507 scope (failure) { 508 setState(Ctrl.END); 509 } 510 511 setState(Ctrl.ALIVE); // Tell the owner that you are running 512 while (!thisActor.stop) { 513 try { 514 const message = receiveTimeout( 515 duration, 516 args, // The message handlers you pass to your Actor template 517 failhandler, 518 &signal, 519 &control, 520 &ownerTerminated, 521 &unknown, 522 ); 523 if (!message) { 524 timeout(); 525 } 526 } 527 catch (MailboxFull t) { 528 fail(t); 529 thisActor.stop = true; 530 } 531 catch (Exception t) { 532 fail(t); 533 } 534 } 535 } 536 537 void signal(Sig signal) @safe { 538 with (Sig) final switch (signal) { 539 case STOP: 540 thisActor.stop = true; 541 break; 542 } 543 } 544 545 /// Controls message sent from the children. 546 void control(CtrlMsg msg) @safe { 547 thisActor.childrenState[msg.task_name] = msg.ctrl; 548 } 549 550 /// Stops the actor if the supervisor stops 551 void ownerTerminated(OwnerTerminated) @safe { 552 log.trace("%s, Owner stopped... nothing to life for... stopping self", thisTid); 553 thisActor.stop = true; 554 } 555 556 /** 557 * The default message handler, if it's an unknown messages it will send a FAIL to the owner. 558 * Params: 559 * message = literally any message 560 */ 561 void unknown(Variant message) @trusted { 562 throw new UnknownMessage("No delegate to deal with message: %s".format(message)); 563 }