1 // Service for creating epochs
2 /// [Documentation](https://docs.tagion.org/#/documents/architecture/EpochCreator)
3 module tagion.services.epoch_creator;
4 
5 // tagion
6 import tagion.actor;
7 import tagion.basic.Types : Buffer;
8 import tagion.basic.basic : isinit;
9 import tagion.communication.HiRPC;
10 import tagion.crypto.SecureInterfaceNet : SecureNet;
11 import tagion.crypto.SecureNet : StdSecureNet;
12 import tagion.crypto.Types : Pubkey;
13 import tagion.crypto.random.random;
14 import tagion.gossip.AddressBook;
15 import tagion.gossip.EmulatorGossipNet;
16 import tagion.gossip.GossipNet;
17 import tagion.gossip.InterfaceNet : GossipNet;
18 import tagion.hashgraph.HashGraph;
19 import tagion.hashgraph.Refinement;
20 import tagion.hibon.Document;
21 import tagion.hibon.HiBONException;
22 import tagion.hibon.HiBONJSON;
23 import tagion.logger.Logger;
24 import tagion.services.messages;
25 import tagion.services.monitor;
26 import tagion.services.options : NetworkMode, TaskNames;
27 import tagion.utils.JSONCommon;
28 import tagion.utils.Miscellaneous : cutHex;
29 import tagion.utils.Miscellaneous : cutHex;
30 import tagion.utils.Queue;
31 import tagion.utils.Random;
32 import tagion.utils.StdTime;
33 import tagion.utils.pretend_safe_concurrency;
34 
35 // core
36 import core.time;
37 
38 // std
39 import std.algorithm;
40 import std.exception : RangePrimitive, handle;
41 import std.stdio;
42 import std.typecons : No;
43 
44 alias PayloadQueue = Queue!Document;
45 
46 @safe
47 struct EpochCreatorOptions {
48     uint timeout = 250; // timeout in msecs 
49     uint scrap_depth = 10;
50     mixin JSONCommon;
51 }
52 
53 
54 @safe
55 struct EpochCreatorService {
56 
57     void task(immutable(EpochCreatorOptions) opts,
58             immutable(NetworkMode) network_mode,
59             immutable(size_t) number_of_nodes,
60             shared(StdSecureNet) shared_net,
61             immutable(MonitorOptions) monitor_opts,
62             immutable(TaskNames) task_names) {
63 
64         const net = new StdSecureNet(shared_net);
65 
66         assert(network_mode == NetworkMode.INTERNAL, "Unsupported network mode");
67 
68         if (monitor_opts.enable) {
69             import tagion.hashgraph.Event : Event;
70             import tagion.monitor.Monitor : MonitorCallBacks;
71 
72             auto monitor_socket_tid = spawn(&monitorServiceTask, monitor_opts);
73             Event.callbacks = new MonitorCallBacks(
74                     monitor_socket_tid, monitor_opts.dataformat);
75 
76             if (!waitforChildren(Ctrl.ALIVE)) {
77                 log.warn("Monitor never started, continuing anyway");
78             }
79         }
80 
81         GossipNet gossip_net;
82         gossip_net = new EmulatorGossipNet(net.pubkey, opts.timeout.msecs);
83         Pubkey[] channels = addressbook.activeNodeChannels;
84         Random!size_t random;
85         const _seed = getRandom!size_t;
86         random.seed(_seed);
87 
88         foreach (channel; channels) {
89             gossip_net.add_channel(channel);
90         }
91         log.trace("Beginning gossip");
92 
93         auto refinement = new StdRefinement;
94         refinement.setTasknames(task_names);
95 
96         HashGraph hashgraph = new HashGraph(number_of_nodes, net, refinement, &gossip_net.isValidChannel, No.joining);
97         hashgraph.scrap_depth = opts.scrap_depth;
98 
99         PayloadQueue payload_queue = new PayloadQueue();
100         {
101             immutable buf = cast(Buffer) hashgraph.channel;
102             const nonce = cast(Buffer) net.calcHash(buf);
103             hashgraph.createEvaEvent(gossip_net.time, nonce);
104         }
105 
106         int counter = 0;
107         const(Document) payload() {
108             if (counter > 0) {
109                 log.trace("Payloads in queue=%d", counter);
110             }
111             if (payload_queue.empty) {
112                 return Document();
113             }
114             counter--;
115             return payload_queue.read;
116         }
117 
118         void receivePayload(Payload, const(Document) pload) {
119             payload_queue.write(pload);
120             counter++;
121         }
122 
123         void receiveWavefront(ReceivedWavefront, const(Document) wave_doc) {
124             import std.array;
125             import tagion.hashgraph.HashGraphBasic;
126             import tagion.hibon.HiBONRecord : isRecord;
127             import tagion.script.common : SignedContract;
128 
129             version (EPOCH_LOG) {
130                 log.trace("Received wavefront %s");
131             }
132 
133             const receiver = HiRPC.Receiver(wave_doc);
134 
135             const received_wave = receiver.params!(Wavefront)(net);
136 
137             immutable received_signed_contracts = received_wave.epacks
138                 .map!(e => e.event_body.payload)
139                 .filter!((p) => !p.empty)
140                 .filter!(p => p.isRecord!SignedContract) // Cannot explicitly return immutable container type (*) ?, need assign to immutable container
141                 .map!((doc) { immutable s = new immutable(SignedContract)(doc); return s; })
142                 .handle!(HiBONException, RangePrimitive.front,
143                         (e, r) { log("invalid SignedContract from hashgraph"); return null; }
144             )
145                 .filter!(s => !s.isinit)
146                 .array;
147 
148             if (received_signed_contracts.length != 0) {
149                 // log("would have send to collector %s", received_signed_contracts.map!(s => (*s).toPretty));
150                 locate(task_names.collector).send(consensusContract(), received_signed_contracts);
151             }
152             scope (failure) {
153                 log.fatal("WAVEFRONT\n%s\n", receiver.toPretty);
154             }
155             hashgraph.wavefront(
156                 receiver,
157                 currentTime,
158                 (const(HiRPC.Sender) return_wavefront) { gossip_net.send(receiver.pubkey, return_wavefront); },
159                 &payload);
160         }
161 
162         void timeout() {
163             const init_tide = random.value(0, 2) is 1;
164             if (!init_tide) {
165                 return;
166             }
167             hashgraph.init_tide(&gossip_net.gossip, &payload, currentTime);
168         }
169 
170         while (!thisActor.stop && !hashgraph.areWeInGraph) {
171             const received = receiveTimeout(
172                     opts.timeout.msecs,
173                     &signal,
174                     &ownerTerminated,
175                     &receiveWavefront,
176                     &unknown
177             );
178             if (received) {
179                 continue;
180             }
181             timeout();
182         }
183 
184         runTimeout(opts.timeout.msecs, &timeout, &receivePayload, &receiveWavefront);
185     }
186 
187 }