1 /// [Documentation documents/architecture/InputValidator](https://docs.tagion.org/#/documents/architecture/Collector) 2 module tagion.services.collector; 3 @safe: 4 5 import std.exception; 6 import std.typecons; 7 import tagion.actor.actor; 8 import tagion.basic.Types; 9 import tagion.communication.HiRPC; 10 import tagion.crypto.SecureInterfaceNet; 11 import tagion.crypto.SecureNet; 12 import tagion.crypto.Types; 13 import tagion.dart.Recorder : Archive, RecordFactory; 14 import tagion.dart.DARTBasic : DARTIndex; 15 import tagion.hibon.Document; 16 import tagion.hibon.HiBONException : HiBONRecordException; 17 import tagion.hibon.HiBONRecord; 18 import tagion.logger.Logger; 19 import tagion.script.common; 20 import tagion.script.execute; 21 import tagion.services.messages; 22 import tagion.services.options : TaskNames; 23 import tagion.utils.pretend_safe_concurrency; 24 25 struct CollectorOptions { 26 import tagion.utils.JSONCommon; 27 28 mixin JSONCommon; 29 } 30 31 /// Topic for rejected collector inputs; 32 enum reject_collector = "reject/collector"; 33 34 /** 35 * Collector Service actor 36 * Sends: 37 * (dartReadRR, immutable(DARTIndex)[]) to TaskNames.dart 38 * (consensusContract(), immutable(CollectedSignedContract)*) to TaskNames.tvm 39 * (signedContract(), immutable(CollectedSignedContract)*) to TaskNames.tvm 40 **/ 41 struct CollectorService { 42 ActorHandle dart_handle; 43 ActorHandle tvm_handle; 44 45 this(immutable(TaskNames) tn) nothrow { 46 dart_handle = ActorHandle(tn.dart); 47 tvm_handle = ActorHandle(tn.tvm); 48 } 49 50 immutable(SignedContract)*[uint] contracts; 51 bool[uint] is_consensus_contract; 52 immutable(Document)[][uint] reads; 53 54 Topic reject = Topic(reject_collector); 55 SecureNet net; 56 void task() { 57 net = new StdSecureNet; 58 assert(net !is null, "No secure net"); 59 run(&receive_recorder, &signed_contract, &consensus_signed_contract, &rpc_contract); 60 } 61 62 // Makes the read calls to the dart service; 63 void read_indices(dartReadRR req, immutable(SignedContract)* s_contract) { 64 if (s_contract.signs.length != s_contract.contract.inputs.length) { 65 log(reject, "contract_mismatch_signature_length", Document.init); 66 return; 67 } 68 69 contracts[req.id] = s_contract; 70 scope (failure) { 71 contracts.remove(req.id); 72 reads.remove(req.id); 73 } 74 log("Set the signed_contract %s", (req.id in contracts) !is null); 75 if (s_contract.contract.reads !is DARTIndex[].init) { 76 log("sending contract read request to dart"); 77 dart_handle.send(req, (*s_contract).contract.reads); 78 } 79 80 log("sending contract input request to dart"); 81 dart_handle.send(req, (*s_contract).contract.inputs); 82 } 83 84 void consensus_signed_contract(consensusContract, immutable(SignedContract*)[] signed_contracts) { 85 foreach (s_contract; signed_contracts) { 86 auto req = dartReadRR(); 87 is_consensus_contract[req.id] = true; 88 read_indices(req, s_contract); 89 } 90 } 91 92 void signed_contract(inputContract, immutable(SignedContract)* s_contract) { 93 auto req = dartReadRR(); 94 is_consensus_contract[req.id] = false; 95 read_indices(req, s_contract); 96 } 97 98 // Input received directly from the HiRPC verifier 99 void rpc_contract(inputHiRPC, immutable(HiRPC.Receiver) receiver) @safe { 100 immutable doc = Document(receiver.method.params); 101 log("collector received receiver"); 102 try { 103 immutable s_contract = new immutable(SignedContract)(doc); 104 signed_contract(inputContract(), s_contract); 105 } 106 catch (HiBONRecordException e) { 107 log(reject, "hirpc_invalid_signed_contract", doc); 108 } 109 } 110 111 private void clean(uint id) { 112 is_consensus_contract.remove(id); 113 contracts.remove(id); 114 reads.remove(id); 115 } 116 117 // Receives the read Documents from the dart and constructs the CollectedSignedContract 118 void receive_recorder(dartReadRR.Response res, immutable(RecordFactory.Recorder) recorder) { 119 import std.algorithm.iteration : map; 120 import std.range; 121 122 scope (failure) { 123 clean(res.id); 124 } 125 log("received dartresponse"); 126 127 if (!(res.id in contracts)) { 128 return; 129 } 130 131 immutable s_contract = contracts[res.id]; 132 auto fingerprints = recorder[].map!(a => a.dart_index).array; 133 if (s_contract.contract.reads !is null && fingerprints == contracts[res.id].contract.reads) { 134 reads[res.id] = recorder[].map!(a => a.filed).array; 135 return; 136 } 137 else if (fingerprints == contracts[res.id].contract.inputs) { 138 log("Received and input response"); 139 scope (exit) { 140 clean(res.id); 141 } 142 143 immutable inputs = recorder[].map!(a => a.filed).array; 144 145 if (!verify(net, s_contract, inputs)) { 146 log(reject, "contract_no_verify", recorder); 147 return; 148 } 149 150 assert(inputs !is Document[].init, "Recorder should've contained inputs at this point"); 151 immutable collection = 152 ((res.id in reads) !is null) 153 ? new immutable(CollectedSignedContract)(s_contract, inputs, reads[res.id]) : new immutable( 154 CollectedSignedContract)(s_contract, inputs); 155 156 log("sending to tvm"); 157 if (is_consensus_contract[res.id]) { 158 tvm_handle.send(consensusContract(), collection); 159 } 160 else { 161 tvm_handle.send(signedContract(), collection); 162 } 163 return; 164 } 165 else { 166 clean(res.id); 167 log(reject, "missing_archives", recorder); 168 return; 169 } 170 } 171 172 }