1 // Service for transcript  
2 /// [Documentation](https://docs.tagion.org/#/documents/architecture/transcript)
3 module tagion.services.transcript;
4 
5 @safe:
6 
7 import core.time;
8 import std.algorithm;
9 import std.array;
10 import std.exception;
11 import std.format;
12 import std.range;
13 import std.stdio;
14 import tagion.actor.actor;
15 import tagion.communication.HiRPC;
16 import tagion.crypto.SecureInterfaceNet;
17 import tagion.crypto.SecureNet;
18 import tagion.crypto.Types;
19 import tagion.dart.DARTBasic : DARTIndex, dartIndex;
20 import tagion.dart.DARTBasic;
21 import tagion.dart.Recorder;
22 import tagion.hashgraph.HashGraphBasic : EventPackage, isMajority;
23 import tagion.hibon.BigNumber;
24 import tagion.hibon.Document;
25 import tagion.hibon.HiBONJSON;
26 import tagion.hibon.HiBONRecord : HiBONRecord, isRecord;
27 import tagion.logger.Logger;
28 import tagion.logger.Logger;
29 import tagion.script.TagionCurrency;
30 import tagion.script.common;
31 import tagion.script.execute : ContractProduct;
32 import tagion.script.standardnames;
33 import tagion.services.locator;
34 import tagion.services.messages;
35 import tagion.services.options : TaskNames;
36 import tagion.services.exception;
37 import tagion.utils.JSONCommon;
38 import tagion.utils.StdTime;
39 import tagion.utils.pretend_safe_concurrency;
40 import std.process : thisProcessID;
41 import std.path : buildPath;
42 import std.file : exists;
43 import std.conv : to;
44 
45 @safe:
46 
47 shared static size_t graceful_shutdown;
48 enum BUFFER_TIME_SECONDS = 30;
49 
50 struct TranscriptOptions {
51     string shutdown_folder = "/tmp/";
52     string shutdown_file_prefix = "epoch_shutdown_";
53     mixin JSONCommon;
54 }
55 
56 /**
57  * TranscriptService actor
58  * Receives: (inputDoc, Document)
59  * Sends: (inputHiRPC, HiRPC.Receiver) to receiver_task, where Document is a correctly formatted HiRPC
60 **/
61 struct TranscriptService {
62     void task(immutable(TranscriptOptions) opts,
63             immutable(size_t) number_of_nodes,
64             shared(StdSecureNet) shared_net,
65             immutable(TaskNames) task_names) {
66         const(SecureNet) net = new StdSecureNet(shared_net);
67         auto rec_factory = RecordFactory(net);
68 
69         ActorHandle dart_handle = ActorHandle(task_names.dart);
70         ActorHandle epoch_creator_handle = ActorHandle(task_names.epoch_creator);
71         
72 
73         immutable(ContractProduct)*[DARTIndex] products;
74 
75         struct Votes {
76             const(ConsensusVoting)[] votes;
77             Epoch epoch;
78             LockedArchives locked_archives;
79         }
80         Votes[long] votes;
81 
82         struct EpochContracts {
83             const(SignedContract)[] signed_contracts;
84             sdt_t epoch_time;
85         }
86         const(EpochContracts)*[long] epoch_contracts;
87 
88         
89         const process_file_name = format("%s%d", opts.shutdown_file_prefix, thisProcessID());
90         const process_file_path = buildPath(opts.shutdown_folder, process_file_name);
91         log("PROCESS FILE PATH %s", process_file_path);
92         long shutdown;
93 
94 
95         /** 
96          * Get the current head and epoch
97          */
98         TagionGlobals last_globals = TagionGlobals(BigNumber(1000_000_000), BigNumber(0), long(10_0000), long(0));
99         TagionHead last_head = TagionHead(TagionDomain, 0);
100 
101         Fingerprint previous_epoch = Fingerprint([1, 2, 3, 4]);
102         long last_epoch_number = 0;
103         long last_consensus_epoch = 0;
104 
105         {
106             bool head_found;
107             // start by reading the head
108             immutable tagion_index = net.dartKey(StdNames.name, TagionDomain);
109             auto dart_tid = tryLocate(task_names.dart);
110             dart_tid.send(dartReadRR(), [tagion_index]);
111             log("SENDING HEAD REQUEST TO DART");
112 
113             auto received = receiveTimeout(1.seconds, (dartReadRR.Response _, immutable(RecordFactory.Recorder) head_recorder) {
114                 if (!head_recorder.empty) {
115                     log("FOUND A TAGIONHEAD");
116                     // yay we found a head!
117                     last_head = TagionHead(head_recorder[].front.filed);
118                     head_found = true;
119                 }
120                 else {
121                     log("NO HEAD FOUND");
122 
123                 }
124 
125             });
126             if (!received) {
127                 throw new ServiceException("Never received a tagionhead");
128 
129             }
130 
131             if (head_found) {
132                 // now we locate the epoch
133                 immutable epoch_index = net.dartKey(StdNames.epoch, last_head.current_epoch);
134                 dart_tid.send(dartReadRR(), [epoch_index]);
135                 receiveTimeout(1.seconds, (dartReadRR.Response _, immutable(RecordFactory.Recorder) epoch_recorder) {
136                     if (!epoch_recorder.empty) {
137                         auto doc = epoch_recorder[].front.filed;
138                         if (doc.isRecord!Epoch) {
139                             log("FOUND A EPOCH");
140                             auto epoch = Epoch(doc);
141                             last_epoch_number = epoch.epoch_number;
142                             last_consensus_epoch = epoch.epoch_number;
143                             last_globals = epoch.globals;
144                         }
145                         else if (doc.isRecord!GenesisEpoch) {
146                             auto genesis_epoch = GenesisEpoch(doc);
147                             last_epoch_number = genesis_epoch.epoch_number;
148                             last_globals = genesis_epoch.globals;
149                             log("FOUND A EPOCH");
150                         }
151                         else {
152                             import tagion.services.exception;
153 
154                             throw new ServiceException("The read epoch was not of type Epoch or GenesisEpoch");
155                         }
156                         previous_epoch = Fingerprint(net.calcHash(doc));
157                     }
158                 });
159             }
160         }
161         log("Booting with globals: %s\n last_head: %s", last_globals.toPretty, last_head.toPretty);
162 
163         void createRecorder(dartCheckReadRR.Response res, immutable(DARTIndex)[] not_in_dart) {
164 
165             DARTIndex[] used;
166 
167             auto recorder = rec_factory.recorder;
168             used ~= not_in_dart;
169 
170             /*
171                 The vote array is already updated. We must go through all the different vote indices and update the epoch that was stored in the dart if any new votes are found.
172             */
173 
174             Finished: foreach (v; votes.byKeyValue) {
175                 // add the new signatures to the epoch. We only want to do it if there are new signatures
176                 if (v.value.epoch.bullseye !is Fingerprint.init) {
177                     // add the signatures to the epoch. Only add them if the signature match ours
178                     foreach (single_vote; v.value.votes) {
179                         // check that we have not already added the signature
180                         if (v.value.epoch.signs.canFind(single_vote.signed_bullseye)) {
181                             continue;
182                         }
183                         if (single_vote.verifyBullseye(net, v.value.epoch.bullseye)) {
184                             v.value.epoch.signs ~= single_vote.signed_bullseye;
185                         }
186                         else {
187                             import tagion.basic.ConsensusExceptions;
188 
189                             throw new ConsensusException(format("Bullseyes not the same on epoch %s", v.value.epoch
190                                     .epoch_number));
191                         }
192                     }
193 
194                     // if the new length of the epoch is majority then we finish the epoch
195                     if (v.value.epoch.signs.length == number_of_nodes && v.value.epoch.epoch_number == last_consensus_epoch + 1) {
196                         v.value.epoch.previous = previous_epoch;
197                         previous_epoch = net.calcHash(v.value.epoch);
198                         last_consensus_epoch += 1;
199                         recorder.insert(v.value.epoch, Archive.Type.ADD);
200                         recorder.insert(v.value.locked_archives, Archive.Type.REMOVE);
201                         votes.remove(v.value.epoch.epoch_number);
202                         break Finished;
203                     }
204 
205                 }
206 
207             }
208 
209 
210             if (shutdown !is long.init && last_consensus_epoch >= shutdown) {
211                 auto req = dartModifyRR();
212                 req.id = res.id;
213 
214                 TagionHead new_head = TagionHead(
215                         TagionDomain,
216                         shutdown,
217                 );
218                 recorder.insert(new_head, Archive.Type.ADD);
219 
220                 import core.atomic;
221                 dart_handle.send(req, RecordFactory.uniqueRecorder(recorder), cast(immutable) res.id);
222                 graceful_shutdown.atomicOp!"+="(1);
223                 thisActor.stop = true;
224                 return;
225             }
226             
227             
228             const epoch_contract = epoch_contracts.get(res.id, null);
229             if (epoch_contract is null) {
230                 throw new ServiceException(format("unlinked epoch contract %s", res.id));
231             }
232             scope (exit) {
233                 epoch_contracts.remove(res.id);
234             }
235 
236             loop_signed_contracts: foreach (signed_contract; epoch_contract.signed_contracts) {
237                 try {
238                     foreach (input; signed_contract.contract.inputs) {
239                         if (used.canFind(input)) {
240                             log("input already in used list");
241                             continue loop_signed_contracts;
242                         }
243                     }
244 
245                     const tvm_contract_outputs = products.get(net.dartIndex(signed_contract.contract), null);
246                     if (tvm_contract_outputs is null) {
247                         continue loop_signed_contracts;
248                         log("contract not found asserting");
249                     }
250 
251                     import core.time;
252                     import std.datetime;
253                     import tagion.utils.StdTime;
254 
255                     const max_time = sdt_t((SysTime(cast(long) epoch_contract.epoch_time) + BUFFER_TIME_SECONDS.seconds)
256                             .stdTime);
257 
258                     foreach (doc; tvm_contract_outputs.outputs) {
259                         if (!doc.isRecord!TagionBill) {
260                             continue;
261                         }
262                         const bill_time = TagionBill(doc).time;
263                         if (bill_time > max_time) {
264                             log("tagion bill timestamp too new bill_time: %s, epoch_time %s", bill_time.toText, max_time);
265                             continue loop_signed_contracts;
266                         }
267                     }
268 
269                     recorder.insert(tvm_contract_outputs.outputs, Archive.Type.ADD);
270                     recorder.insert(tvm_contract_outputs.contract.inputs, Archive.Type.REMOVE);
271 
272                     used ~= signed_contract.contract.inputs;
273                     products.remove(net.dartIndex(signed_contract.contract));
274                 }
275                 catch (Exception e) {
276                     log("Contract Exception %s", e);
277                     continue loop_signed_contracts;
278                 }
279             }
280 
281             /*
282             Since we write all inromation that is known immediatly we create the epoch chain block here and make it empty.
283             The following information can be added:
284                 epoch_number
285                 time
286                 active
287                 deactive
288                 globals
289             This will be added to thed DART. We also keep this in our cache in order to make the reads as few as possible.
290             */
291             Epoch non_voted_epoch;
292             non_voted_epoch.epoch_number = res.id;
293             non_voted_epoch.time = sdt_t(epoch_contract.epoch_time);
294             // create the globals
295 
296             BigNumber total = last_globals.total;
297             BigNumber total_burned = last_globals.total_burned;
298             long number_of_bills = last_globals.number_of_bills;
299             long burnt_bills = last_globals.burnt_bills;
300 
301             void billStatistic(const(Archive) archive) {
302                 if (!archive.filed.isRecord!TagionBill) {
303                     return;
304                 }
305                 // log("GOING TO STAT BILL: %s, type: %s", bill.toPretty, archive.type;
306 
307                 auto bill = TagionBill(archive.filed);
308 
309                 if (archive.type == Archive.Type.REMOVE) {
310                     total -= bill.value.units;
311                     total_burned += bill.value.units;
312                     burnt_bills += 1;
313                     number_of_bills -= 1;
314                 }
315                 if (archive.type == Archive.Type.ADD) {
316                     total += bill.value.units;
317                     number_of_bills += 1;
318                 }
319             }
320 
321             recorder[].each!(a => billStatistic(a));
322 
323             TagionGlobals new_globals = TagionGlobals(
324                     total,
325                     total_burned,
326                     number_of_bills,
327                     burnt_bills,
328             );
329             non_voted_epoch.globals = new_globals;
330 
331             TagionHead new_head = TagionHead(
332                     TagionDomain,
333                     res.id,
334             );
335             immutable(DARTIndex)[] locked_indexes = recorder[]
336                 .filter!(a => a.type == Archive.Type.ADD)
337                 .map!(a => net.dartIndex(a.filed))
338                 .array;
339 
340             LockedArchives outputs = LockedArchives(res.id, locked_indexes);
341 
342 
343             if (shutdown is long.init || res.id < shutdown) {
344                 recorder.insert(new_head, Archive.Type.ADD);
345                 recorder.insert(non_voted_epoch, Archive.Type.ADD);
346                 recorder.insert(outputs, Archive.Type.ADD);
347             }
348 
349 
350             last_head = new_head;
351             last_globals = new_globals;
352 
353             Votes new_vote;
354             new_vote.epoch = non_voted_epoch;
355             new_vote.locked_archives = outputs;
356             votes[non_voted_epoch.epoch_number] = new_vote;
357 
358             auto req = dartModifyRR();
359             req.id = res.id;
360 
361             dart_handle.send(req, RecordFactory.uniqueRecorder(recorder), cast(immutable) res.id);
362         }
363 
364         void epoch(consensusEpoch,
365                 immutable(EventPackage*)[] epacks,
366                 immutable(long) epoch_number,
367                 const(sdt_t) epoch_time) @safe {
368             last_epoch_number += 1;
369             import tagion.utils.Term;
370 
371 
372             log("%sEpoch round: %d time %s%s", BLUE, last_epoch_number, epoch_time, RESET);
373 
374 
375 
376             if (process_file_path.exists && shutdown is long.init) {
377                 // open the file and set the shutdown sig
378                 auto f = File(process_file_path, "r");
379                 scope(exit) {
380                     f.close;
381                 }
382                 shutdown = (() @trusted => f.byLine.front.to!long)();
383             }
384             if (shutdown !is long.init) {
385                 log("%sShutdown is scheduled for epoch %d%s", YELLOW, shutdown, RESET);
386 
387             }
388             
389 
390             immutable(ConsensusVoting)[] received_votes = epacks
391                 .filter!(epack => epack.event_body.payload.isRecord!ConsensusVoting)
392                 .map!(epack => immutable(ConsensusVoting)(epack.event_body.payload))
393                 .array;
394 
395             // add them to the vote array
396             foreach (v; received_votes) {
397                 if (votes.get(v.epoch, Votes.init) !is Votes.init) {
398                     votes[v.epoch].votes ~= v;
399                 }
400                 else {
401                     log("VOTE IS INIT %s", v.epoch);
402                 }
403             }
404 
405             auto signed_contracts = epacks
406                 .filter!(epack => epack.event_body.payload.isRecord!SignedContract)
407                 .map!(epack => immutable(SignedContract)(epack.event_body.payload))
408                 .array;
409 
410             auto inputs = signed_contracts
411                 .map!(signed_contract => signed_contract.contract.inputs)
412                 .join
413                 .array;
414 
415             auto req = dartCheckReadRR();
416             req.id = last_epoch_number;
417             epoch_contracts[req.id] = new const EpochContracts(signed_contracts, epoch_time);
418 
419             if (inputs.length == 0) {
420                 createRecorder(req.Response(req.msg, req.id), inputs);
421                 return;
422             }
423 
424             dart_handle.send(req, inputs);
425         }
426 
427         void receiveBullseye(dartModifyRR.Response res, Fingerprint bullseye) {
428             import tagion.utils.Miscellaneous : cutHex;
429             const epoch_number = res.id;
430 
431             votes[epoch_number].epoch.bullseye = bullseye;
432 
433 
434             ConsensusVoting own_vote = ConsensusVoting(
435                     epoch_number,
436                     net.pubkey,
437                     net.sign(bullseye)
438             );
439 
440             epoch_creator_handle.send(Payload(), own_vote.toDoc);
441         }
442 
443         void produceContract(producedContract, immutable(ContractProduct)* product) {
444             log("received ContractProduct");
445             auto product_index = net.dartIndex(product.contract.sign_contract.contract);
446             products[product_index] = product;
447 
448         }
449 
450         run(&epoch, &produceContract, &createRecorder, &receiveBullseye);
451     }
452 }