1 /// Main node supervisor service for managing and starting other tagion services 2 module tagion.services.supervisor; 3 4 import core.time; 5 import std.file; 6 import std.path; 7 import std.stdio; 8 import std.typecons; 9 import tagion.GlobalSignals : stopsignal; 10 import tagion.actor; 11 import tagion.actor.exceptions; 12 import tagion.crypto.SecureNet; 13 import tagion.dart.DARTBasic : DARTIndex; 14 import tagion.dart.DARTFile; 15 import tagion.logger.Logger; 16 import tagion.services.DART; 17 import tagion.services.DARTInterface; 18 import tagion.services.TVM; 19 import tagion.services.collector; 20 import tagion.services.epoch_creator; 21 import tagion.services.hirpc_verifier; 22 import tagion.services.inputvalidator; 23 import tagion.services.options; 24 import tagion.services.replicator; 25 import tagion.services.transcript; 26 import tagion.services.TRTService; 27 import core.memory; 28 29 @safe 30 struct Supervisor { 31 // auto failHandler = (TaskFailure tf) @trusted { log("Stoping program because Supervisor caught exception: \n%s", tf); }; 32 33 void task(immutable(Options) opts, shared(StdSecureNet) shared_net) @safe { 34 immutable tn = opts.task_names; 35 36 ActorHandle[] handles; 37 38 handles ~= spawn!ReplicatorService(tn.replicator, opts.replicator); 39 handles ~= spawn!DARTService(tn.dart, opts.dart, tn, shared_net, opts.trt.enable); 40 41 if (opts.trt.enable) { 42 handles ~= spawn!TRTService(tn.trt, opts.trt, tn, shared_net); 43 } 44 45 handles ~= spawn!HiRPCVerifierService(tn.hirpc_verifier, opts.hirpc_verifier, tn); 46 47 handles ~= spawn!InputValidatorService(tn.inputvalidator, opts.inputvalidator, tn); 48 49 // signs data 50 handles ~= spawn!EpochCreatorService(tn.epoch_creator, opts.epoch_creator, opts.wave 51 .network_mode, opts.wave.number_of_nodes, shared_net, opts.monitor, tn); 52 53 // verifies signature 54 handles ~= _spawn!CollectorService(tn.collector, tn); 55 56 handles ~= _spawn!TVMService(tn.tvm, tn); 57 58 // signs data 59 handles ~= spawn!TranscriptService(tn.transcript, TranscriptOptions.init, opts.wave.number_of_nodes, shared_net, tn); 60 61 handles ~= spawn(immutable(DARTInterfaceService)(opts.dart_interface, opts.trt, tn), tn 62 .dart_interface); 63 64 if (waitforChildren(Ctrl.ALIVE, Duration.max)) { 65 run(); 66 } 67 else { 68 log.error("Not all children became Alive"); 69 } 70 71 log("Supervisor stopping services"); 72 foreach (handle; handles) { 73 if (handle.state is Ctrl.ALIVE) { 74 handle.send(Sig.STOP); 75 } 76 } 77 78 (() @trusted { // NNG shoould be safe 79 import core.time; 80 import nngd; 81 82 NNGSocket input_sock = NNGSocket(nng_socket_type.NNG_SOCKET_REQ); 83 input_sock.dial(opts.inputvalidator.sock_addr); 84 input_sock.maxttl = 1; 85 input_sock.recvtimeout = 1.msecs; 86 input_sock.send("End!"); // Send arbitrary data to the inputvalidator so releases the socket and checks its mailbox 87 })(); 88 waitforChildren(Ctrl.END, 10.seconds); 89 log("All services stopped"); 90 } 91 }