1 /// [Documentation documents/architecture/InputValidator](https://docs.tagion.org/#/documents/architecture/Collector)
2 module tagion.services.collector;
3 @safe:
4 
5 import std.exception;
6 import std.typecons;
7 import tagion.actor.actor;
8 import tagion.basic.Types;
9 import tagion.communication.HiRPC;
10 import tagion.crypto.SecureInterfaceNet;
11 import tagion.crypto.SecureNet;
12 import tagion.crypto.Types;
13 import tagion.dart.Recorder : Archive, RecordFactory;
14 import tagion.dart.DARTBasic : DARTIndex;
15 import tagion.hibon.Document;
16 import tagion.hibon.HiBONException : HiBONRecordException;
17 import tagion.hibon.HiBONRecord;
18 import tagion.logger.Logger;
19 import tagion.script.common;
20 import tagion.script.execute;
21 import tagion.services.messages;
22 import tagion.services.options : TaskNames;
23 import tagion.utils.pretend_safe_concurrency;
24 
25 struct CollectorOptions {
26     import tagion.utils.JSONCommon;
27 
28     mixin JSONCommon;
29 }
30 
31 /// Topic for rejected collector inputs;
32 enum reject_collector = "reject/collector";
33 
34 /**
35  * Collector Service actor
36  * Sends:
37  *  (dartReadRR, immutable(DARTIndex)[]) to TaskNames.dart
38  *  (consensusContract(), immutable(CollectedSignedContract)*) to TaskNames.tvm 
39  *  (signedContract(), immutable(CollectedSignedContract)*) to TaskNames.tvm 
40 **/
41 struct CollectorService {
42     ActorHandle dart_handle;
43     ActorHandle tvm_handle;
44 
45     this(immutable(TaskNames) tn) nothrow {
46         dart_handle = ActorHandle(tn.dart);
47         tvm_handle = ActorHandle(tn.tvm);
48     }
49 
50     immutable(SignedContract)*[uint] contracts;
51     bool[uint] is_consensus_contract;
52     immutable(Document)[][uint] reads;
53 
54     Topic reject = Topic(reject_collector);
55     SecureNet net;
56     void task() {
57         net = new StdSecureNet;
58         assert(net !is null, "No secure net");
59         run(&receive_recorder, &signed_contract, &consensus_signed_contract, &rpc_contract);
60     }
61 
62     // Makes the read calls to the dart service;
63     void read_indices(dartReadRR req, immutable(SignedContract)* s_contract) {
64         if (s_contract.signs.length != s_contract.contract.inputs.length) {
65             log(reject, "contract_mismatch_signature_length", Document.init);
66             return;
67         }
68 
69         contracts[req.id] = s_contract;
70         scope (failure) {
71             contracts.remove(req.id);
72             reads.remove(req.id);
73         }
74         log("Set the signed_contract %s", (req.id in contracts) !is null);
75         if (s_contract.contract.reads !is DARTIndex[].init) {
76             log("sending contract read request to dart");
77             dart_handle.send(req, (*s_contract).contract.reads);
78         }
79 
80         log("sending contract input request to dart");
81         dart_handle.send(req, (*s_contract).contract.inputs);
82     }
83 
84     void consensus_signed_contract(consensusContract, immutable(SignedContract*)[] signed_contracts) {
85         foreach (s_contract; signed_contracts) {
86             auto req = dartReadRR();
87             is_consensus_contract[req.id] = true;
88             read_indices(req, s_contract);
89         }
90     }
91 
92     void signed_contract(inputContract, immutable(SignedContract)* s_contract) {
93         auto req = dartReadRR();
94         is_consensus_contract[req.id] = false;
95         read_indices(req, s_contract);
96     }
97 
98     // Input received directly from the HiRPC verifier
99     void rpc_contract(inputHiRPC, immutable(HiRPC.Receiver) receiver) @safe {
100         immutable doc = Document(receiver.method.params);
101         log("collector received receiver");
102         try {
103             immutable s_contract = new immutable(SignedContract)(doc);
104             signed_contract(inputContract(), s_contract);
105         }
106         catch (HiBONRecordException e) {
107             log(reject, "hirpc_invalid_signed_contract", doc);
108         }
109     }
110 
111     private void clean(uint id) {
112         is_consensus_contract.remove(id);
113         contracts.remove(id);
114         reads.remove(id);
115     }
116 
117     // Receives the read Documents from the dart and constructs the CollectedSignedContract
118     void receive_recorder(dartReadRR.Response res, immutable(RecordFactory.Recorder) recorder) {
119         import std.algorithm.iteration : map;
120         import std.range;
121 
122         scope (failure) {
123             clean(res.id);
124         }
125         log("received dartresponse");
126 
127         if (!(res.id in contracts)) {
128             return;
129         }
130 
131         immutable s_contract = contracts[res.id];
132         auto fingerprints = recorder[].map!(a => a.dart_index).array;
133         if (s_contract.contract.reads !is null && fingerprints == contracts[res.id].contract.reads) {
134             reads[res.id] = recorder[].map!(a => a.filed).array;
135             return;
136         }
137         else if (fingerprints == contracts[res.id].contract.inputs) {
138             log("Received and input response");
139             scope (exit) {
140                 clean(res.id);
141             }
142 
143             immutable inputs = recorder[].map!(a => a.filed).array;
144 
145             if (!verify(net, s_contract, inputs)) {
146                 log(reject, "contract_no_verify", recorder);
147                 return;
148             }
149 
150             assert(inputs !is Document[].init, "Recorder should've contained inputs at this point");
151             immutable collection =
152                 ((res.id in reads) !is null)
153                 ? new immutable(CollectedSignedContract)(s_contract, inputs, reads[res.id]) : new immutable(
154                         CollectedSignedContract)(s_contract, inputs);
155 
156             log("sending to tvm");
157             if (is_consensus_contract[res.id]) {
158                 tvm_handle.send(consensusContract(), collection);
159             }
160             else {
161                 tvm_handle.send(signedContract(), collection);
162             }
163             return;
164         }
165         else {
166             clean(res.id);
167             log(reject, "missing_archives", recorder);
168             return;
169         }
170     }
171 
172 }