1 /// Main node supervisor service for managing and starting other tagion services
2 module tagion.services.supervisor;
3 
4 import core.time;
5 import std.file;
6 import std.path;
7 import std.stdio;
8 import std.typecons;
9 import tagion.GlobalSignals : stopsignal;
10 import tagion.actor;
11 import tagion.actor.exceptions;
12 import tagion.crypto.SecureNet;
13 import tagion.dart.DARTBasic : DARTIndex;
14 import tagion.dart.DARTFile;
15 import tagion.logger.Logger;
16 import tagion.services.DART;
17 import tagion.services.DARTInterface;
18 import tagion.services.TVM;
19 import tagion.services.collector;
20 import tagion.services.epoch_creator;
21 import tagion.services.hirpc_verifier;
22 import tagion.services.inputvalidator;
23 import tagion.services.options;
24 import tagion.services.replicator;
25 import tagion.services.transcript;
26 import tagion.services.TRTService;
27 import core.memory;
28 
29 @safe
30 struct Supervisor {
31     // auto failHandler = (TaskFailure tf) @trusted { log("Stoping program because Supervisor caught exception: \n%s", tf); };
32 
33     void task(immutable(Options) opts, shared(StdSecureNet) shared_net) @safe {
34         immutable tn = opts.task_names;
35 
36         ActorHandle[] handles;
37 
38         handles ~= spawn!ReplicatorService(tn.replicator, opts.replicator);
39         handles ~= spawn!DARTService(tn.dart, opts.dart, tn, shared_net, opts.trt.enable);
40 
41         if (opts.trt.enable) {
42             handles ~= spawn!TRTService(tn.trt, opts.trt, tn, shared_net);
43         }
44 
45         handles ~= spawn!HiRPCVerifierService(tn.hirpc_verifier, opts.hirpc_verifier, tn);
46 
47         handles ~= spawn!InputValidatorService(tn.inputvalidator, opts.inputvalidator, tn);
48 
49         // signs data
50         handles ~= spawn!EpochCreatorService(tn.epoch_creator, opts.epoch_creator, opts.wave
51                 .network_mode, opts.wave.number_of_nodes, shared_net, opts.monitor, tn);
52 
53         // verifies signature
54         handles ~= _spawn!CollectorService(tn.collector, tn);
55 
56         handles ~= _spawn!TVMService(tn.tvm, tn);
57 
58         // signs data
59         handles ~= spawn!TranscriptService(tn.transcript, TranscriptOptions.init, opts.wave.number_of_nodes, shared_net, tn);
60 
61         handles ~= spawn(immutable(DARTInterfaceService)(opts.dart_interface, opts.trt, tn), tn
62                 .dart_interface);
63 
64         if (waitforChildren(Ctrl.ALIVE, Duration.max)) {
65             run();
66         }
67         else {
68             log.error("Not all children became Alive");
69         }
70 
71         log("Supervisor stopping services");
72         foreach (handle; handles) {
73             if (handle.state is Ctrl.ALIVE) {
74                 handle.send(Sig.STOP);
75             }
76         }
77 
78         (() @trusted { // NNG shoould be safe
79             import core.time;
80             import nngd;
81 
82             NNGSocket input_sock = NNGSocket(nng_socket_type.NNG_SOCKET_REQ);
83             input_sock.dial(opts.inputvalidator.sock_addr);
84             input_sock.maxttl = 1;
85             input_sock.recvtimeout = 1.msecs;
86             input_sock.send("End!"); // Send arbitrary data to the inputvalidator so releases the socket and checks its mailbox
87         })();
88         waitforChildren(Ctrl.END, 10.seconds);
89         log("All services stopped");
90     }
91 }