1 // Service for creating epochs 2 /// [Documentation](https://docs.tagion.org/#/documents/architecture/EpochCreator) 3 module tagion.services.epoch_creator; 4 5 // tagion 6 import tagion.actor; 7 import tagion.basic.Types : Buffer; 8 import tagion.basic.basic : isinit; 9 import tagion.communication.HiRPC; 10 import tagion.crypto.SecureInterfaceNet : SecureNet; 11 import tagion.crypto.SecureNet : StdSecureNet; 12 import tagion.crypto.Types : Pubkey; 13 import tagion.crypto.random.random; 14 import tagion.gossip.AddressBook; 15 import tagion.gossip.EmulatorGossipNet; 16 import tagion.gossip.GossipNet; 17 import tagion.gossip.InterfaceNet : GossipNet; 18 import tagion.hashgraph.HashGraph; 19 import tagion.hashgraph.Refinement; 20 import tagion.hibon.Document; 21 import tagion.hibon.HiBONException; 22 import tagion.hibon.HiBONJSON; 23 import tagion.logger.Logger; 24 import tagion.services.messages; 25 import tagion.services.monitor; 26 import tagion.services.options : NetworkMode, TaskNames; 27 import tagion.utils.JSONCommon; 28 import tagion.utils.Miscellaneous : cutHex; 29 import tagion.utils.Miscellaneous : cutHex; 30 import tagion.utils.Queue; 31 import tagion.utils.Random; 32 import tagion.utils.StdTime; 33 import tagion.utils.pretend_safe_concurrency; 34 35 // core 36 import core.time; 37 38 // std 39 import std.algorithm; 40 import std.exception : RangePrimitive, handle; 41 import std.stdio; 42 import std.typecons : No; 43 44 alias PayloadQueue = Queue!Document; 45 46 @safe 47 struct EpochCreatorOptions { 48 uint timeout = 250; // timeout in msecs 49 uint scrap_depth = 10; 50 mixin JSONCommon; 51 } 52 53 54 @safe 55 struct EpochCreatorService { 56 57 void task(immutable(EpochCreatorOptions) opts, 58 immutable(NetworkMode) network_mode, 59 immutable(size_t) number_of_nodes, 60 shared(StdSecureNet) shared_net, 61 immutable(MonitorOptions) monitor_opts, 62 immutable(TaskNames) task_names) { 63 64 const net = new StdSecureNet(shared_net); 65 66 assert(network_mode == NetworkMode.INTERNAL, "Unsupported network mode"); 67 68 if (monitor_opts.enable) { 69 import tagion.hashgraph.Event : Event; 70 import tagion.monitor.Monitor : MonitorCallBacks; 71 72 auto monitor_socket_tid = spawn(&monitorServiceTask, monitor_opts); 73 Event.callbacks = new MonitorCallBacks( 74 monitor_socket_tid, monitor_opts.dataformat); 75 76 if (!waitforChildren(Ctrl.ALIVE)) { 77 log.warn("Monitor never started, continuing anyway"); 78 } 79 } 80 81 GossipNet gossip_net; 82 gossip_net = new EmulatorGossipNet(net.pubkey, opts.timeout.msecs); 83 Pubkey[] channels = addressbook.activeNodeChannels; 84 Random!size_t random; 85 const _seed = getRandom!size_t; 86 random.seed(_seed); 87 88 foreach (channel; channels) { 89 gossip_net.add_channel(channel); 90 } 91 log.trace("Beginning gossip"); 92 93 auto refinement = new StdRefinement; 94 refinement.setTasknames(task_names); 95 96 HashGraph hashgraph = new HashGraph(number_of_nodes, net, refinement, &gossip_net.isValidChannel, No.joining); 97 hashgraph.scrap_depth = opts.scrap_depth; 98 99 PayloadQueue payload_queue = new PayloadQueue(); 100 { 101 immutable buf = cast(Buffer) hashgraph.channel; 102 const nonce = cast(Buffer) net.calcHash(buf); 103 hashgraph.createEvaEvent(gossip_net.time, nonce); 104 } 105 106 int counter = 0; 107 const(Document) payload() { 108 if (counter > 0) { 109 log.trace("Payloads in queue=%d", counter); 110 } 111 if (payload_queue.empty) { 112 return Document(); 113 } 114 counter--; 115 return payload_queue.read; 116 } 117 118 void receivePayload(Payload, const(Document) pload) { 119 payload_queue.write(pload); 120 counter++; 121 } 122 123 void receiveWavefront(ReceivedWavefront, const(Document) wave_doc) { 124 import std.array; 125 import tagion.hashgraph.HashGraphBasic; 126 import tagion.hibon.HiBONRecord : isRecord; 127 import tagion.script.common : SignedContract; 128 129 version (EPOCH_LOG) { 130 log.trace("Received wavefront %s"); 131 } 132 133 const receiver = HiRPC.Receiver(wave_doc); 134 135 const received_wave = receiver.params!(Wavefront)(net); 136 137 immutable received_signed_contracts = received_wave.epacks 138 .map!(e => e.event_body.payload) 139 .filter!((p) => !p.empty) 140 .filter!(p => p.isRecord!SignedContract) // Cannot explicitly return immutable container type (*) ?, need assign to immutable container 141 .map!((doc) { immutable s = new immutable(SignedContract)(doc); return s; }) 142 .handle!(HiBONException, RangePrimitive.front, 143 (e, r) { log("invalid SignedContract from hashgraph"); return null; } 144 ) 145 .filter!(s => !s.isinit) 146 .array; 147 148 if (received_signed_contracts.length != 0) { 149 // log("would have send to collector %s", received_signed_contracts.map!(s => (*s).toPretty)); 150 locate(task_names.collector).send(consensusContract(), received_signed_contracts); 151 } 152 scope (failure) { 153 log.fatal("WAVEFRONT\n%s\n", receiver.toPretty); 154 } 155 hashgraph.wavefront( 156 receiver, 157 currentTime, 158 (const(HiRPC.Sender) return_wavefront) { gossip_net.send(receiver.pubkey, return_wavefront); }, 159 &payload); 160 } 161 162 void timeout() { 163 const init_tide = random.value(0, 2) is 1; 164 if (!init_tide) { 165 return; 166 } 167 hashgraph.init_tide(&gossip_net.gossip, &payload, currentTime); 168 } 169 170 while (!thisActor.stop && !hashgraph.areWeInGraph) { 171 const received = receiveTimeout( 172 opts.timeout.msecs, 173 &signal, 174 &ownerTerminated, 175 &receiveWavefront, 176 &unknown 177 ); 178 if (received) { 179 continue; 180 } 181 timeout(); 182 } 183 184 runTimeout(opts.timeout.msecs, &timeout, &receivePayload, &receiveWavefront); 185 } 186 187 }