1 /** 2 * New wave implementation of the tagion node 3 **/ 4 module tagion.tools.neuewelle; 5 6 import core.stdc.stdlib : exit; 7 import core.sync.event; 8 import core.sys.posix.signal; 9 import core.sys.posix.unistd; 10 import core.thread; 11 import core.time; 12 import std.algorithm : countUntil, map, uniq, equal; 13 import std.array; 14 import std.file : chdir, exists; 15 import std.format; 16 import std.getopt; 17 import std.path; 18 import std.path : baseName, dirName; 19 import std.range : iota; 20 import std.stdio; 21 import std.typecons; 22 23 import tagion.GlobalSignals : segment_fault, stopsignal; 24 import tagion.actor; 25 import tagion.actor.exceptions; 26 import tagion.basic.Types : FileExtension, hasExtension; 27 import tagion.crypto.SecureNet; 28 import tagion.hibon.Document; 29 import tagion.logger; 30 import tagion.services.locator; 31 import tagion.services.logger; 32 import tagion.services.options; 33 import tagion.services.subscription; 34 import tagion.tools.Basic; 35 import tagion.tools.revision; 36 import tagion.tools.toolsexception; 37 import tagion.utils.Term; 38 import tagion.wave.common; 39 40 static abort = false; 41 import tagion.services.transcript : graceful_shutdown; 42 43 private extern (C) 44 void signal_handler(int signal) nothrow { 45 try { 46 if (signal !is SIGINT) { 47 return; 48 } 49 50 if (abort) { 51 printf("Terminating\n"); 52 exit(1); 53 } 54 stopsignal.set; 55 abort = true; 56 printf("Received stop signal, telling services to stop\n"); 57 } 58 catch (Exception e) { 59 assert(0, format("DID NOT CLOSE PROPERLY \n %s", e)); 60 } 61 } 62 63 mixin Main!(_main, "wave"); 64 65 int _main(string[] args) { 66 try { 67 return _neuewelle(args); 68 } 69 catch (Exception e) { 70 error(e); 71 return 1; 72 } 73 } 74 75 int _neuewelle(string[] args) { 76 immutable program = args[0]; 77 string bootkeys_path; 78 /* 79 * Boot key format expected for mode0 80 * nodename0:pincode0 81 * nodename1:pincode1 82 * nodename2:pincode2 83 * ... : ... 84 * The directory where the wallet config_file should be placed is 85 * <bootkeys_path>/<nodenameX>/wallet.json 86 * 87 */ 88 File fin = stdin; /// Console input for the bootkeys 89 stopsignal.initialize(true, false); 90 91 { // Handle sigint 92 sigaction_t sa; 93 sa.sa_handler = &signal_handler; 94 95 sigemptyset(&sa.sa_mask); 96 sa.sa_flags = 0; 97 // Register the signal handler for SIGINT 98 int rc = sigaction(SIGINT, &sa, null); 99 assert(rc == 0, "sigaction error"); 100 } 101 { // Handle sigv 102 sigaction_t sa; 103 sa.sa_sigaction = &segment_fault; 104 sigemptyset(&sa.sa_mask); 105 sa.sa_flags = SA_RESTART; 106 107 int rc = sigaction(SIGSEGV, &sa, null); 108 assert(rc == 0, "sigaction error"); 109 } 110 111 bool version_switch; 112 bool override_switch; 113 bool monitor; 114 115 string mode0_node_opts_path; 116 string[] override_options; 117 118 auto main_args = getopt(args, 119 "version", "Print revision information", &version_switch, 120 "O|override", "Override the config file", &override_switch, 121 "option", "Set an option", &override_options, 122 "k|keys", "Path to the boot-keys in mode0", &bootkeys_path, 123 "v|verbose", "Enable verbose print-out", &__verbose_switch, 124 "n|dry", "Check the parameter without starting the network (dry-run)", &__dry_switch, 125 "nodeopts", "Generate single node opts files for mode0", &mode0_node_opts_path, 126 "m|monitor", "Enable the monitor", &monitor, 127 ); 128 129 if (main_args.helpWanted) { 130 defaultGetoptPrinter( 131 "Help information for tagion wave program\n" ~ 132 format("Usage: %s <tagionwave.json>\n", program), 133 main_args.options 134 ); 135 return 0; 136 } 137 138 if (version_switch) { 139 revision_text.writeln; 140 return 0; 141 } 142 143 enum default_wave_config_filename = "tagionwave".setExtension(FileExtension.json); 144 const user_config_file = args.countUntil!(file => file.hasExtension(FileExtension.json) && file.exists); 145 auto config_file = (user_config_file < 0) ? default_wave_config_filename : args[user_config_file]; 146 147 Options local_options; 148 if (config_file.exists) { 149 try { 150 local_options.load(config_file); 151 log("Running with config file %s", config_file); 152 chdir(config_file.dirName); 153 } 154 catch (Exception e) { 155 stderr.writefln("Error loading config file %s, %s", config_file, e.msg); 156 return 1; 157 } 158 } 159 else { 160 local_options = Options.defaultOptions; 161 stderr.writefln("No config file exits, running with default options"); 162 } 163 164 // Experimental!! 165 if (!override_options.empty) { 166 import std.json; 167 import tagion.utils.JSONCommon; 168 169 JSONValue json = local_options.toJSON; 170 171 void set_val(JSONValue j, string[] _key, string val) { 172 if (_key.length == 1) { 173 j[_key[0]] = val.toJSONType(j[_key[0]].type); 174 return; 175 } 176 set_val(j[_key[0]], _key[1 .. $], val); 177 } 178 179 foreach (option; override_options) { 180 string[] key_value = option.split(":"); 181 assert(key_value.length == 2, format("Option '%s' invalid, missing key=value", option)); 182 auto value = key_value[1]; 183 string[] key = key_value[0].split("."); 184 set_val(json, key, value); 185 } 186 // If options does not parse as a string then some types will not be interpreted correctly 187 local_options.parseJSON(json.toString); 188 } 189 190 if (override_switch) { 191 local_options.save(config_file); 192 writefln("Config file written to %s", config_file); 193 return 0; 194 } 195 196 scope (failure) { 197 log("Bye bye :("); 198 } 199 200 // Spawn logger service 201 immutable logger = LoggerService(LoggerServiceOptions(LogType.Console)); 202 auto logger_service = spawn(logger, "logger"); 203 log.set_logger_task(logger_service.task_name); 204 writeln("logger started: ", waitforChildren(Ctrl.ALIVE)); 205 206 ActorHandle sub_handle; 207 { // Spawn logger subscription service 208 immutable subopts = Options(local_options).subscription; 209 sub_handle = spawn!SubscriptionService("logger_sub", subopts); 210 writeln("logsub started: ", waitforChildren(Ctrl.ALIVE)); 211 log.registerSubscriptionTask("logger_sub"); 212 } 213 214 log.task_name = baseName(program); 215 216 locator_options = new immutable(LocatorOptions)(20, 5); 217 ActorHandle[] supervisor_handles; 218 219 switch (local_options.wave.network_mode) { 220 case NetworkMode.INTERNAL: 221 import tagion.wave.mode0; 222 223 auto node_options = getMode0Options(local_options, monitor); 224 auto __net = new StdSecureNet(); 225 __net.generateKeyPair("dart_read_pin"); 226 227 if (!isMode0BullseyeSame(node_options, __net)) { 228 assert(0, "DATABASES must be booted with same bullseye - Abort"); 229 } 230 231 auto nodes = inputKeys(fin, node_options, bootkeys_path); 232 if (nodes is Node[].init) { 233 return 0; 234 } 235 236 Document doc = getHead(node_options, __net); 237 // we only need to read one head since all bullseyes are the same: 238 spawnMode0(node_options, supervisor_handles, nodes, doc); 239 log("started mode 0 net"); 240 241 if (mode0_node_opts_path) { 242 foreach (i, opt; node_options) { 243 opt.save(buildPath(mode0_node_opts_path, format(opt.wave.prefix_format ~ "opts", i).setExtension( 244 FileExtension 245 .json))); 246 } 247 } 248 break; 249 default: 250 assert(0, "NetworkMode not supported"); 251 } 252 253 if (waitforChildren(Ctrl.ALIVE, Duration.max)) { 254 log("alive"); 255 bool signaled; 256 import tagion.utils.pretend_safe_concurrency : receiveTimeout; 257 import core.atomic; 258 259 do { 260 signaled = stopsignal.wait(100.msecs); 261 if (!signaled) { 262 if (local_options.wave.fail_fast) { 263 signaled = receiveTimeout( 264 Duration.zero, 265 (TaskFailure tf) { log.fatal("Stopping because of unhandled taskfailure\n%s", tf); } 266 ); 267 } 268 else { 269 receiveTimeout( 270 Duration.zero, 271 (TaskFailure tf) { log.error("Received an unhandled taskfailure\n%s", tf); } 272 ); 273 } 274 } 275 } 276 while (!signaled && graceful_shutdown.atomicLoad() != local_options.wave.number_of_nodes); 277 } 278 else { 279 log("Program did not start"); 280 return 1; 281 } 282 283 sub_handle.send(Sig.STOP); 284 log("Sending stop signal to supervisor"); 285 foreach (supervisor; supervisor_handles) { 286 supervisor.send(Sig.STOP); 287 } 288 logger_service.send(Sig.STOP); 289 290 // supervisor_handle.send(Sig.STOP); 291 if (!waitforChildren(Ctrl.END, 5.seconds)) { 292 log("Timed out before all services stopped"); 293 return 1; 294 } 295 log("Bye bye! ^.^"); 296 return 0; 297 } 298 299 import tagion.wave.mode0 : Node; 300 import tagion.tools.wallet.WalletOptions; 301 import tagion.tools.wallet.WalletInterface; 302 303 Node[] inputKeys(File fin, const(Options[]) node_options, string bootkeys_path) { 304 auto by_line = fin.byLine; 305 enum number_of_retry = 3; 306 307 Node[] nodes; 308 foreach (i, opts; node_options) { 309 StdSecureNet net; 310 scope (exit) { 311 net = null; 312 } 313 314 if (bootkeys_path.empty) { 315 net = new StdSecureNet; 316 net.generateKeyPair(opts.task_names.supervisor); 317 } 318 else { 319 WalletOptions wallet_options; 320 LoopTry: foreach (tries; 1 .. number_of_retry + 1) { 321 verbose("Input boot key %d as nodename:pincode", i); 322 const args = (by_line.front.empty) ? string[].init : by_line.front.split(":"); 323 by_line.popFront; 324 if (args.length != 2) { 325 writefln("%1$sBad format %3$s expected nodename:pincode%2$s", RED, RESET, args.front); 326 } 327 //string wallet_config_file; 328 const wallet_config_file = buildPath(bootkeys_path, args[0]).setExtension(FileExtension.json); 329 writeln("Looking for " ~ wallet_config_file); 330 verbose("Wallet path %s", wallet_config_file); 331 if (!wallet_config_file.exists) { 332 writefln("%1$sBoot key file %3$s not found%2$s", RED, RESET, wallet_config_file); 333 writefln("Try another node name"); 334 } 335 else { 336 verbose("Load config"); 337 wallet_options.load(wallet_config_file); 338 auto wallet_interface = WalletInterface(wallet_options); 339 verbose("Load wallet"); 340 wallet_interface.load; 341 342 const loggedin = wallet_interface.secure_wallet.login(args[1]); 343 if (wallet_interface.secure_wallet.isLoggedin) { 344 verbose("%1$sNode %3$s successfull%2$s", GREEN, RESET, args[0]); 345 net = cast(StdSecureNet) wallet_interface.secure_wallet.net.clone; 346 break LoopTry; 347 } 348 else { 349 writefln("%1$sWrong pincode bootkey %3$s node %4$s%2$s", RED, RESET, i, args[0]); 350 } 351 } 352 check(tries < number_of_retry, format("Max number of retries is %d", number_of_retry)); 353 } 354 } 355 if (dry_switch && !bootkeys_path.empty) { 356 writefln("%1$sBoot keys correct%2$s", GREEN, RESET); 357 } 358 shared shared_net = (() @trusted => cast(shared) net)(); 359 360 nodes ~= Node(opts, shared_net, net.pubkey); 361 } 362 363 if (dry_switch) { 364 return Node[].init; 365 } 366 return nodes; 367 }