1 /** 
2  * New wave implementation of the tagion node
3 **/
4 module tagion.tools.neuewelle;
5 
6 import core.stdc.stdlib : exit;
7 import core.sync.event;
8 import core.sys.posix.signal;
9 import core.sys.posix.unistd;
10 import core.thread;
11 import core.time;
12 import std.algorithm : countUntil, map, uniq, equal;
13 import std.array;
14 import std.file : chdir, exists;
15 import std.format;
16 import std.getopt;
17 import std.path;
18 import std.path : baseName, dirName;
19 import std.range : iota;
20 import std.stdio;
21 import std.typecons;
22 
23 import tagion.GlobalSignals : segment_fault, stopsignal;
24 import tagion.actor;
25 import tagion.actor.exceptions;
26 import tagion.basic.Types : FileExtension, hasExtension;
27 import tagion.crypto.SecureNet;
28 import tagion.hibon.Document;
29 import tagion.logger;
30 import tagion.services.locator;
31 import tagion.services.logger;
32 import tagion.services.options;
33 import tagion.services.subscription;
34 import tagion.tools.Basic;
35 import tagion.tools.revision;
36 import tagion.tools.toolsexception;
37 import tagion.utils.Term;
38 import tagion.wave.common;
39 
40 static abort = false;
41 import tagion.services.transcript : graceful_shutdown;
42 
43 private extern (C)
44 void signal_handler(int signal) nothrow {
45     try {
46         if (signal !is SIGINT) {
47             return;
48         }
49 
50         if (abort) {
51             printf("Terminating\n");
52             exit(1);
53         }
54         stopsignal.set;
55         abort = true;
56         printf("Received stop signal, telling services to stop\n");
57     }
58     catch (Exception e) {
59         assert(0, format("DID NOT CLOSE PROPERLY \n %s", e));
60     }
61 }
62 
63 mixin Main!(_main, "wave");
64 
65 int _main(string[] args) {
66     try {
67         return _neuewelle(args);
68     }
69     catch (Exception e) {
70         error(e);
71         return 1;
72     }
73 }
74 
75 int _neuewelle(string[] args) {
76     immutable program = args[0];
77     string bootkeys_path;
78     /*
79     * Boot key format expected for mode0
80     * nodename0:pincode0
81     * nodename1:pincode1
82     * nodename2:pincode2
83     * ...      : ...
84     * The directory where the wallet config_file should be placed is 
85     * <bootkeys_path>/<nodenameX>/wallet.json
86     *
87     */
88     File fin = stdin; /// Console input for the bootkeys
89     stopsignal.initialize(true, false);
90 
91     { // Handle sigint
92         sigaction_t sa;
93         sa.sa_handler = &signal_handler;
94 
95         sigemptyset(&sa.sa_mask);
96         sa.sa_flags = 0;
97         // Register the signal handler for SIGINT
98         int rc = sigaction(SIGINT, &sa, null);
99         assert(rc == 0, "sigaction error");
100     }
101     { // Handle sigv
102         sigaction_t sa;
103         sa.sa_sigaction = &segment_fault;
104         sigemptyset(&sa.sa_mask);
105         sa.sa_flags = SA_RESTART;
106 
107         int rc = sigaction(SIGSEGV, &sa, null);
108         assert(rc == 0, "sigaction error");
109     }
110 
111     bool version_switch;
112     bool override_switch;
113     bool monitor;
114 
115     string mode0_node_opts_path;
116     string[] override_options;
117 
118     auto main_args = getopt(args,
119             "version", "Print revision information", &version_switch,
120             "O|override", "Override the config file", &override_switch,
121             "option", "Set an option", &override_options,
122             "k|keys", "Path to the boot-keys in mode0", &bootkeys_path,
123             "v|verbose", "Enable verbose print-out", &__verbose_switch,
124             "n|dry", "Check the parameter without starting the network (dry-run)", &__dry_switch,
125             "nodeopts", "Generate single node opts files for mode0", &mode0_node_opts_path,
126             "m|monitor", "Enable the monitor", &monitor,
127     );
128 
129     if (main_args.helpWanted) {
130         defaultGetoptPrinter(
131                 "Help information for tagion wave program\n" ~
132                 format("Usage: %s <tagionwave.json>\n", program),
133                 main_args.options
134         );
135         return 0;
136     }
137 
138     if (version_switch) {
139         revision_text.writeln;
140         return 0;
141     }
142 
143     enum default_wave_config_filename = "tagionwave".setExtension(FileExtension.json);
144     const user_config_file = args.countUntil!(file => file.hasExtension(FileExtension.json) && file.exists);
145     auto config_file = (user_config_file < 0) ? default_wave_config_filename : args[user_config_file];
146 
147     Options local_options;
148     if (config_file.exists) {
149         try {
150             local_options.load(config_file);
151             log("Running with config file %s", config_file);
152             chdir(config_file.dirName);
153         }
154         catch (Exception e) {
155             stderr.writefln("Error loading config file %s, %s", config_file, e.msg);
156             return 1;
157         }
158     }
159     else {
160         local_options = Options.defaultOptions;
161         stderr.writefln("No config file exits, running with default options");
162     }
163 
164     // Experimental!!
165     if (!override_options.empty) {
166         import std.json;
167         import tagion.utils.JSONCommon;
168 
169         JSONValue json = local_options.toJSON;
170 
171         void set_val(JSONValue j, string[] _key, string val) {
172             if (_key.length == 1) {
173                 j[_key[0]] = val.toJSONType(j[_key[0]].type);
174                 return;
175             }
176             set_val(j[_key[0]], _key[1 .. $], val);
177         }
178 
179         foreach (option; override_options) {
180             string[] key_value = option.split(":");
181             assert(key_value.length == 2, format("Option '%s' invalid, missing key=value", option));
182             auto value = key_value[1];
183             string[] key = key_value[0].split(".");
184             set_val(json, key, value);
185         }
186         // If options does not parse as a string then some types will not be interpreted correctly
187         local_options.parseJSON(json.toString);
188     }
189 
190     if (override_switch) {
191         local_options.save(config_file);
192         writefln("Config file written to %s", config_file);
193         return 0;
194     }
195 
196     scope (failure) {
197         log("Bye bye :(");
198     }
199 
200     // Spawn logger service
201     immutable logger = LoggerService(LoggerServiceOptions(LogType.Console));
202     auto logger_service = spawn(logger, "logger");
203     log.set_logger_task(logger_service.task_name);
204     writeln("logger started: ", waitforChildren(Ctrl.ALIVE));
205 
206     ActorHandle sub_handle;
207     { // Spawn logger subscription service
208         immutable subopts = Options(local_options).subscription;
209         sub_handle = spawn!SubscriptionService("logger_sub", subopts);
210         writeln("logsub started: ", waitforChildren(Ctrl.ALIVE));
211         log.registerSubscriptionTask("logger_sub");
212     }
213 
214     log.task_name = baseName(program);
215 
216     locator_options = new immutable(LocatorOptions)(20, 5);
217     ActorHandle[] supervisor_handles;
218 
219     switch (local_options.wave.network_mode) {
220     case NetworkMode.INTERNAL:
221         import tagion.wave.mode0;
222 
223         auto node_options = getMode0Options(local_options, monitor);
224         auto __net = new StdSecureNet();
225         __net.generateKeyPair("dart_read_pin");
226 
227         if (!isMode0BullseyeSame(node_options, __net)) {
228             assert(0, "DATABASES must be booted with same bullseye - Abort");
229         }
230 
231         auto nodes = inputKeys(fin, node_options, bootkeys_path);
232         if (nodes is Node[].init) {
233             return 0;
234         }
235 
236         Document doc = getHead(node_options, __net);
237         // we only need to read one head since all bullseyes are the same:
238         spawnMode0(node_options, supervisor_handles, nodes, doc);
239         log("started mode 0 net");
240 
241         if (mode0_node_opts_path) {
242             foreach (i, opt; node_options) {
243                 opt.save(buildPath(mode0_node_opts_path, format(opt.wave.prefix_format ~ "opts", i).setExtension(
244                         FileExtension
245                         .json)));
246             }
247         }
248         break;
249     default:
250         assert(0, "NetworkMode not supported");
251     }
252 
253     if (waitforChildren(Ctrl.ALIVE, Duration.max)) {
254         log("alive");
255         bool signaled;
256         import tagion.utils.pretend_safe_concurrency : receiveTimeout;
257         import core.atomic;
258 
259         do {
260             signaled = stopsignal.wait(100.msecs);
261             if (!signaled) {
262                 if (local_options.wave.fail_fast) {
263                     signaled = receiveTimeout(
264                             Duration.zero,
265                             (TaskFailure tf) { log.fatal("Stopping because of unhandled taskfailure\n%s", tf); }
266                     );
267                 }
268                 else {
269                     receiveTimeout(
270                             Duration.zero,
271                             (TaskFailure tf) { log.error("Received an unhandled taskfailure\n%s", tf); }
272                     );
273                 }
274             }
275         }
276         while (!signaled && graceful_shutdown.atomicLoad() != local_options.wave.number_of_nodes);
277     }
278     else {
279         log("Program did not start");
280         return 1;
281     }
282 
283     sub_handle.send(Sig.STOP);
284     log("Sending stop signal to supervisor");
285     foreach (supervisor; supervisor_handles) {
286         supervisor.send(Sig.STOP);
287     }
288     logger_service.send(Sig.STOP);
289 
290     // supervisor_handle.send(Sig.STOP);
291     if (!waitforChildren(Ctrl.END, 5.seconds)) {
292         log("Timed out before all services stopped");
293         return 1;
294     }
295     log("Bye bye! ^.^");
296     return 0;
297 }
298 
299 import tagion.wave.mode0 : Node;
300 import tagion.tools.wallet.WalletOptions;
301 import tagion.tools.wallet.WalletInterface;
302 
303 Node[] inputKeys(File fin, const(Options[]) node_options, string bootkeys_path) {
304     auto by_line = fin.byLine;
305     enum number_of_retry = 3;
306 
307     Node[] nodes;
308     foreach (i, opts; node_options) {
309         StdSecureNet net;
310         scope (exit) {
311             net = null;
312         }
313 
314         if (bootkeys_path.empty) {
315             net = new StdSecureNet;
316             net.generateKeyPair(opts.task_names.supervisor);
317         }
318         else {
319             WalletOptions wallet_options;
320             LoopTry: foreach (tries; 1 .. number_of_retry + 1) {
321                 verbose("Input boot key %d as nodename:pincode", i);
322                 const args = (by_line.front.empty) ? string[].init : by_line.front.split(":");
323                 by_line.popFront;
324                 if (args.length != 2) {
325                     writefln("%1$sBad format %3$s expected nodename:pincode%2$s", RED, RESET, args.front);
326                 }
327                 //string wallet_config_file;
328                 const wallet_config_file = buildPath(bootkeys_path, args[0]).setExtension(FileExtension.json);
329                 writeln("Looking for " ~ wallet_config_file);
330                 verbose("Wallet path %s", wallet_config_file);
331                 if (!wallet_config_file.exists) {
332                     writefln("%1$sBoot key file %3$s not found%2$s", RED, RESET, wallet_config_file);
333                     writefln("Try another node name");
334                 }
335                 else {
336                     verbose("Load config");
337                     wallet_options.load(wallet_config_file);
338                     auto wallet_interface = WalletInterface(wallet_options);
339                     verbose("Load wallet");
340                     wallet_interface.load;
341 
342                     const loggedin = wallet_interface.secure_wallet.login(args[1]);
343                     if (wallet_interface.secure_wallet.isLoggedin) {
344                         verbose("%1$sNode %3$s successfull%2$s", GREEN, RESET, args[0]);
345                         net = cast(StdSecureNet) wallet_interface.secure_wallet.net.clone;
346                         break LoopTry;
347                     }
348                     else {
349                         writefln("%1$sWrong pincode bootkey %3$s node %4$s%2$s", RED, RESET, i, args[0]);
350                     }
351                 }
352                 check(tries < number_of_retry, format("Max number of retries is %d", number_of_retry));
353             }
354         }
355         if (dry_switch && !bootkeys_path.empty) {
356             writefln("%1$sBoot keys correct%2$s", GREEN, RESET);
357         }
358         shared shared_net = (() @trusted => cast(shared) net)();
359 
360         nodes ~= Node(opts, shared_net, net.pubkey);
361     }
362 
363     if (dry_switch) {
364         return Node[].init;
365     }
366     return nodes;
367 }