1 // Service for transcript 2 /// [Documentation](https://docs.tagion.org/#/documents/architecture/transcript) 3 module tagion.services.transcript; 4 5 @safe: 6 7 import core.time; 8 import std.algorithm; 9 import std.array; 10 import std.exception; 11 import std.format; 12 import std.range; 13 import std.stdio; 14 import tagion.actor.actor; 15 import tagion.communication.HiRPC; 16 import tagion.crypto.SecureInterfaceNet; 17 import tagion.crypto.SecureNet; 18 import tagion.crypto.Types; 19 import tagion.dart.DARTBasic : DARTIndex, dartIndex; 20 import tagion.dart.DARTBasic; 21 import tagion.dart.Recorder; 22 import tagion.hashgraph.HashGraphBasic : EventPackage, isMajority; 23 import tagion.hibon.BigNumber; 24 import tagion.hibon.Document; 25 import tagion.hibon.HiBONJSON; 26 import tagion.hibon.HiBONRecord : HiBONRecord, isRecord; 27 import tagion.logger.Logger; 28 import tagion.logger.Logger; 29 import tagion.script.TagionCurrency; 30 import tagion.script.common; 31 import tagion.script.execute : ContractProduct; 32 import tagion.script.standardnames; 33 import tagion.services.locator; 34 import tagion.services.messages; 35 import tagion.services.options : TaskNames; 36 import tagion.services.exception; 37 import tagion.utils.JSONCommon; 38 import tagion.utils.StdTime; 39 import tagion.utils.pretend_safe_concurrency; 40 import std.process : thisProcessID; 41 import std.path : buildPath; 42 import std.file : exists; 43 import std.conv : to; 44 45 @safe: 46 47 shared static size_t graceful_shutdown; 48 enum BUFFER_TIME_SECONDS = 30; 49 50 struct TranscriptOptions { 51 string shutdown_folder = "/tmp/"; 52 string shutdown_file_prefix = "epoch_shutdown_"; 53 mixin JSONCommon; 54 } 55 56 /** 57 * TranscriptService actor 58 * Receives: (inputDoc, Document) 59 * Sends: (inputHiRPC, HiRPC.Receiver) to receiver_task, where Document is a correctly formatted HiRPC 60 **/ 61 struct TranscriptService { 62 void task(immutable(TranscriptOptions) opts, 63 immutable(size_t) number_of_nodes, 64 shared(StdSecureNet) shared_net, 65 immutable(TaskNames) task_names) { 66 const(SecureNet) net = new StdSecureNet(shared_net); 67 auto rec_factory = RecordFactory(net); 68 69 ActorHandle dart_handle = ActorHandle(task_names.dart); 70 ActorHandle epoch_creator_handle = ActorHandle(task_names.epoch_creator); 71 72 73 immutable(ContractProduct)*[DARTIndex] products; 74 75 struct Votes { 76 const(ConsensusVoting)[] votes; 77 Epoch epoch; 78 LockedArchives locked_archives; 79 } 80 Votes[long] votes; 81 82 struct EpochContracts { 83 const(SignedContract)[] signed_contracts; 84 sdt_t epoch_time; 85 } 86 const(EpochContracts)*[long] epoch_contracts; 87 88 89 const process_file_name = format("%s%d", opts.shutdown_file_prefix, thisProcessID()); 90 const process_file_path = buildPath(opts.shutdown_folder, process_file_name); 91 log("PROCESS FILE PATH %s", process_file_path); 92 long shutdown; 93 94 95 /** 96 * Get the current head and epoch 97 */ 98 TagionGlobals last_globals = TagionGlobals(BigNumber(1000_000_000), BigNumber(0), long(10_0000), long(0)); 99 TagionHead last_head = TagionHead(TagionDomain, 0); 100 101 Fingerprint previous_epoch = Fingerprint([1, 2, 3, 4]); 102 long last_epoch_number = 0; 103 long last_consensus_epoch = 0; 104 105 { 106 bool head_found; 107 // start by reading the head 108 immutable tagion_index = net.dartKey(StdNames.name, TagionDomain); 109 auto dart_tid = tryLocate(task_names.dart); 110 dart_tid.send(dartReadRR(), [tagion_index]); 111 log("SENDING HEAD REQUEST TO DART"); 112 113 auto received = receiveTimeout(1.seconds, (dartReadRR.Response _, immutable(RecordFactory.Recorder) head_recorder) { 114 if (!head_recorder.empty) { 115 log("FOUND A TAGIONHEAD"); 116 // yay we found a head! 117 last_head = TagionHead(head_recorder[].front.filed); 118 head_found = true; 119 } 120 else { 121 log("NO HEAD FOUND"); 122 123 } 124 125 }); 126 if (!received) { 127 throw new ServiceException("Never received a tagionhead"); 128 129 } 130 131 if (head_found) { 132 // now we locate the epoch 133 immutable epoch_index = net.dartKey(StdNames.epoch, last_head.current_epoch); 134 dart_tid.send(dartReadRR(), [epoch_index]); 135 receiveTimeout(1.seconds, (dartReadRR.Response _, immutable(RecordFactory.Recorder) epoch_recorder) { 136 if (!epoch_recorder.empty) { 137 auto doc = epoch_recorder[].front.filed; 138 if (doc.isRecord!Epoch) { 139 log("FOUND A EPOCH"); 140 auto epoch = Epoch(doc); 141 last_epoch_number = epoch.epoch_number; 142 last_consensus_epoch = epoch.epoch_number; 143 last_globals = epoch.globals; 144 } 145 else if (doc.isRecord!GenesisEpoch) { 146 auto genesis_epoch = GenesisEpoch(doc); 147 last_epoch_number = genesis_epoch.epoch_number; 148 last_globals = genesis_epoch.globals; 149 log("FOUND A EPOCH"); 150 } 151 else { 152 import tagion.services.exception; 153 154 throw new ServiceException("The read epoch was not of type Epoch or GenesisEpoch"); 155 } 156 previous_epoch = Fingerprint(net.calcHash(doc)); 157 } 158 }); 159 } 160 } 161 log("Booting with globals: %s\n last_head: %s", last_globals.toPretty, last_head.toPretty); 162 163 void createRecorder(dartCheckReadRR.Response res, immutable(DARTIndex)[] not_in_dart) { 164 165 DARTIndex[] used; 166 167 auto recorder = rec_factory.recorder; 168 used ~= not_in_dart; 169 170 /* 171 The vote array is already updated. We must go through all the different vote indices and update the epoch that was stored in the dart if any new votes are found. 172 */ 173 174 Finished: foreach (v; votes.byKeyValue) { 175 // add the new signatures to the epoch. We only want to do it if there are new signatures 176 if (v.value.epoch.bullseye !is Fingerprint.init) { 177 // add the signatures to the epoch. Only add them if the signature match ours 178 foreach (single_vote; v.value.votes) { 179 // check that we have not already added the signature 180 if (v.value.epoch.signs.canFind(single_vote.signed_bullseye)) { 181 continue; 182 } 183 if (single_vote.verifyBullseye(net, v.value.epoch.bullseye)) { 184 v.value.epoch.signs ~= single_vote.signed_bullseye; 185 } 186 else { 187 import tagion.basic.ConsensusExceptions; 188 189 throw new ConsensusException(format("Bullseyes not the same on epoch %s", v.value.epoch 190 .epoch_number)); 191 } 192 } 193 194 // if the new length of the epoch is majority then we finish the epoch 195 if (v.value.epoch.signs.length == number_of_nodes && v.value.epoch.epoch_number == last_consensus_epoch + 1) { 196 v.value.epoch.previous = previous_epoch; 197 previous_epoch = net.calcHash(v.value.epoch); 198 last_consensus_epoch += 1; 199 recorder.insert(v.value.epoch, Archive.Type.ADD); 200 recorder.insert(v.value.locked_archives, Archive.Type.REMOVE); 201 votes.remove(v.value.epoch.epoch_number); 202 break Finished; 203 } 204 205 } 206 207 } 208 209 210 if (shutdown !is long.init && last_consensus_epoch >= shutdown) { 211 auto req = dartModifyRR(); 212 req.id = res.id; 213 214 TagionHead new_head = TagionHead( 215 TagionDomain, 216 shutdown, 217 ); 218 recorder.insert(new_head, Archive.Type.ADD); 219 220 import core.atomic; 221 dart_handle.send(req, RecordFactory.uniqueRecorder(recorder), cast(immutable) res.id); 222 graceful_shutdown.atomicOp!"+="(1); 223 thisActor.stop = true; 224 return; 225 } 226 227 228 const epoch_contract = epoch_contracts.get(res.id, null); 229 if (epoch_contract is null) { 230 throw new ServiceException(format("unlinked epoch contract %s", res.id)); 231 } 232 scope (exit) { 233 epoch_contracts.remove(res.id); 234 } 235 236 loop_signed_contracts: foreach (signed_contract; epoch_contract.signed_contracts) { 237 try { 238 foreach (input; signed_contract.contract.inputs) { 239 if (used.canFind(input)) { 240 log("input already in used list"); 241 continue loop_signed_contracts; 242 } 243 } 244 245 const tvm_contract_outputs = products.get(net.dartIndex(signed_contract.contract), null); 246 if (tvm_contract_outputs is null) { 247 continue loop_signed_contracts; 248 log("contract not found asserting"); 249 } 250 251 import core.time; 252 import std.datetime; 253 import tagion.utils.StdTime; 254 255 const max_time = sdt_t((SysTime(cast(long) epoch_contract.epoch_time) + BUFFER_TIME_SECONDS.seconds) 256 .stdTime); 257 258 foreach (doc; tvm_contract_outputs.outputs) { 259 if (!doc.isRecord!TagionBill) { 260 continue; 261 } 262 const bill_time = TagionBill(doc).time; 263 if (bill_time > max_time) { 264 log("tagion bill timestamp too new bill_time: %s, epoch_time %s", bill_time.toText, max_time); 265 continue loop_signed_contracts; 266 } 267 } 268 269 recorder.insert(tvm_contract_outputs.outputs, Archive.Type.ADD); 270 recorder.insert(tvm_contract_outputs.contract.inputs, Archive.Type.REMOVE); 271 272 used ~= signed_contract.contract.inputs; 273 products.remove(net.dartIndex(signed_contract.contract)); 274 } 275 catch (Exception e) { 276 log("Contract Exception %s", e); 277 continue loop_signed_contracts; 278 } 279 } 280 281 /* 282 Since we write all inromation that is known immediatly we create the epoch chain block here and make it empty. 283 The following information can be added: 284 epoch_number 285 time 286 active 287 deactive 288 globals 289 This will be added to thed DART. We also keep this in our cache in order to make the reads as few as possible. 290 */ 291 Epoch non_voted_epoch; 292 non_voted_epoch.epoch_number = res.id; 293 non_voted_epoch.time = sdt_t(epoch_contract.epoch_time); 294 // create the globals 295 296 BigNumber total = last_globals.total; 297 BigNumber total_burned = last_globals.total_burned; 298 long number_of_bills = last_globals.number_of_bills; 299 long burnt_bills = last_globals.burnt_bills; 300 301 void billStatistic(const(Archive) archive) { 302 if (!archive.filed.isRecord!TagionBill) { 303 return; 304 } 305 // log("GOING TO STAT BILL: %s, type: %s", bill.toPretty, archive.type; 306 307 auto bill = TagionBill(archive.filed); 308 309 if (archive.type == Archive.Type.REMOVE) { 310 total -= bill.value.units; 311 total_burned += bill.value.units; 312 burnt_bills += 1; 313 number_of_bills -= 1; 314 } 315 if (archive.type == Archive.Type.ADD) { 316 total += bill.value.units; 317 number_of_bills += 1; 318 } 319 } 320 321 recorder[].each!(a => billStatistic(a)); 322 323 TagionGlobals new_globals = TagionGlobals( 324 total, 325 total_burned, 326 number_of_bills, 327 burnt_bills, 328 ); 329 non_voted_epoch.globals = new_globals; 330 331 TagionHead new_head = TagionHead( 332 TagionDomain, 333 res.id, 334 ); 335 immutable(DARTIndex)[] locked_indexes = recorder[] 336 .filter!(a => a.type == Archive.Type.ADD) 337 .map!(a => net.dartIndex(a.filed)) 338 .array; 339 340 LockedArchives outputs = LockedArchives(res.id, locked_indexes); 341 342 343 if (shutdown is long.init || res.id < shutdown) { 344 recorder.insert(new_head, Archive.Type.ADD); 345 recorder.insert(non_voted_epoch, Archive.Type.ADD); 346 recorder.insert(outputs, Archive.Type.ADD); 347 } 348 349 350 last_head = new_head; 351 last_globals = new_globals; 352 353 Votes new_vote; 354 new_vote.epoch = non_voted_epoch; 355 new_vote.locked_archives = outputs; 356 votes[non_voted_epoch.epoch_number] = new_vote; 357 358 auto req = dartModifyRR(); 359 req.id = res.id; 360 361 dart_handle.send(req, RecordFactory.uniqueRecorder(recorder), cast(immutable) res.id); 362 } 363 364 void epoch(consensusEpoch, 365 immutable(EventPackage*)[] epacks, 366 immutable(long) epoch_number, 367 const(sdt_t) epoch_time) @safe { 368 last_epoch_number += 1; 369 import tagion.utils.Term; 370 371 372 log("%sEpoch round: %d time %s%s", BLUE, last_epoch_number, epoch_time, RESET); 373 374 375 376 if (process_file_path.exists && shutdown is long.init) { 377 // open the file and set the shutdown sig 378 auto f = File(process_file_path, "r"); 379 scope(exit) { 380 f.close; 381 } 382 shutdown = (() @trusted => f.byLine.front.to!long)(); 383 } 384 if (shutdown !is long.init) { 385 log("%sShutdown is scheduled for epoch %d%s", YELLOW, shutdown, RESET); 386 387 } 388 389 390 immutable(ConsensusVoting)[] received_votes = epacks 391 .filter!(epack => epack.event_body.payload.isRecord!ConsensusVoting) 392 .map!(epack => immutable(ConsensusVoting)(epack.event_body.payload)) 393 .array; 394 395 // add them to the vote array 396 foreach (v; received_votes) { 397 if (votes.get(v.epoch, Votes.init) !is Votes.init) { 398 votes[v.epoch].votes ~= v; 399 } 400 else { 401 log("VOTE IS INIT %s", v.epoch); 402 } 403 } 404 405 auto signed_contracts = epacks 406 .filter!(epack => epack.event_body.payload.isRecord!SignedContract) 407 .map!(epack => immutable(SignedContract)(epack.event_body.payload)) 408 .array; 409 410 auto inputs = signed_contracts 411 .map!(signed_contract => signed_contract.contract.inputs) 412 .join 413 .array; 414 415 auto req = dartCheckReadRR(); 416 req.id = last_epoch_number; 417 epoch_contracts[req.id] = new const EpochContracts(signed_contracts, epoch_time); 418 419 if (inputs.length == 0) { 420 createRecorder(req.Response(req.msg, req.id), inputs); 421 return; 422 } 423 424 dart_handle.send(req, inputs); 425 } 426 427 void receiveBullseye(dartModifyRR.Response res, Fingerprint bullseye) { 428 import tagion.utils.Miscellaneous : cutHex; 429 const epoch_number = res.id; 430 431 votes[epoch_number].epoch.bullseye = bullseye; 432 433 434 ConsensusVoting own_vote = ConsensusVoting( 435 epoch_number, 436 net.pubkey, 437 net.sign(bullseye) 438 ); 439 440 epoch_creator_handle.send(Payload(), own_vote.toDoc); 441 } 442 443 void produceContract(producedContract, immutable(ContractProduct)* product) { 444 log("received ContractProduct"); 445 auto product_index = net.dartIndex(product.contract.sign_contract.contract); 446 products[product_index] = product; 447 448 } 449 450 run(&epoch, &produceContract, &createRecorder, &receiveBullseye); 451 } 452 }