1 /// Consensus HashGraph main object 
2 module tagion.hashgraph.HashGraph;
3 
4 import std.algorithm;
5 import std.array : array;
6 import std.conv;
7 import std.exception : assumeWontThrow;
8 import std.format;
9 import std.range;
10 import std.stdio;
11 import std.typecons : Flag, No, Yes;
12 import tagion.basic.Debug : __format;
13 import tagion.basic.Types : Buffer;
14 import tagion.communication.HiRPC;
15 import tagion.crypto.SecureInterfaceNet;
16 import tagion.crypto.Types : Privkey, Pubkey, Signature;
17 import tagion.gossip.InterfaceNet;
18 import tagion.hashgraph.Event;
19 import tagion.hashgraph.HashGraphBasic;
20 import tagion.hashgraph.RefinementInterface;
21 import tagion.hashgraph.Round;
22 import tagion.hibon.Document : Document;
23 import tagion.hibon.HiBON : HiBON;
24 import tagion.hibon.HiBONRecord : isHiBONRecord;
25 import tagion.logger.Logger;
26 import tagion.utils.BitMask;
27 import tagion.utils.StdTime;
28 
29 // debug
30 import tagion.basic.Debug;
31 import tagion.hibon.HiBONJSON;
32 import tagion.utils.Miscellaneous : cutHex;
33 
34 @safe
35 class HashGraph {
36     enum default_scrap_depth = 10;
37     //bool print_flag;
38     int scrap_depth = default_scrap_depth;
39     import tagion.basic.ConsensusExceptions;
40 
41     bool __debug_print;
42 
43     protected alias check = Check!HashGraphConsensusException;
44     //   protected alias consensus=consensusCheckArguments!(HashGraphConsensusException);
45     import tagion.logger.Statistic;
46 
47     immutable size_t node_size; /// Number of active nodes in the graph
48     immutable(string) name; // Only used for debugging
49     Statistic!uint witness_search_statistic;
50     Statistic!uint strong_seeing_statistic;
51     Statistic!uint received_order_statistic;
52     Statistic!uint mark_received_statistic;
53     Statistic!uint order_compare_statistic;
54     Statistic!uint rare_order_compare_statistic;
55     Statistic!uint epoch_events_statistic;
56     Statistic!uint wavefront_event_package_statistic;
57     Statistic!uint wavefront_event_package_used_statistic;
58     Statistic!uint live_events_statistic;
59     Statistic!uint live_witness_statistic;
60     Statistic!long epoch_delay_statistic;
61     BitMask _excluded_nodes_mask;
62     private {
63         Node[Pubkey] _nodes; // List of participating _nodes T
64         uint event_id;
65         sdt_t last_epoch_time;
66         Flag!"joining" _joining;
67     }
68     Refinement refinement;
69     protected Node _owner_node;
70     const(Node) owner_node() const pure nothrow @nogc {
71         return _owner_node;
72     }
73 
74     Flag!"joining" joining() const pure nothrow @nogc {
75         return _joining;
76     }
77 
78     /**
79  * Get a map of all the nodes currently handled by the graph 
80  * Returns: 
81  */
82     const(Node[Pubkey]) nodes() const pure nothrow @nogc {
83         return _nodes;
84     }
85 
86     const HiRPC hirpc;
87 
88     @nogc
89     const(BitMask) excluded_nodes_mask() const pure nothrow {
90         return _excluded_nodes_mask;
91     }
92 
93     void excluded_nodes_mask(const(BitMask) mask) pure nothrow {
94         _excluded_nodes_mask = mask;
95     }
96 
97     package Round.Rounder _rounds; /// The rounder hold the round in the queue both decided and undecided rounds
98 
99     alias ValidChannel = bool delegate(const Pubkey channel);
100     const ValidChannel valid_channel; /// Valiates of a node at channel is valid
101     /**
102  * Creates a graph with node_size nodes
103  * Params:
104  *   node_size = number of nodes handles byt the graph
105  *   net = Securety element handles hash function, signing and signature validation
106  *   valid_channel = call-back to check if a node is valid
107  *   epoch_callback = call-back which is called when an epoch has been produced
108  *   epack_callback = call-back call if when a package has been added to the cache.
109  *   name = used for debuging label the node name
110  */
111     this(const size_t node_size,
112             const SecureNet net,
113             Refinement refinement,
114             const ValidChannel valid_channel,
115             const Flag!"joining" joining,
116             string name = null)
117     in (node_size >= 4)
118     do {
119         hirpc = HiRPC(net);
120         this.node_size = node_size;
121         this._owner_node = getNode(hirpc.net.pubkey);
122         this.refinement = refinement;
123         this.refinement.setOwner(this);
124         this.valid_channel = valid_channel;
125 
126         this._joining = joining;
127         this.name = name;
128         _rounds = Round.Rounder(this);
129     }
130 
131     void initialize_witness(const(immutable(EventPackage)*[]) epacks)
132     in {
133         assert(_nodes.length > 0 && (channel in _nodes),
134                 "Owen Eva event needs to be create before witness can be initialized");
135         assert(_owner_node !is null);
136     }
137     do {
138         version (EPOCH_LOG) {
139             log("INITTING WITNESSES %s", _owner_node.channel.cutHex);
140         }
141         Node[Pubkey] recovered_nodes;
142         scope (success) {
143             void init_event(immutable(EventPackage*) epack) {
144                 auto event = new Event(epack, this);
145                 _event_cache[event.fingerprint] = event;
146                 event.witness_event(node_size);
147                 version (EPOCH_LOG) {
148                     log("init_event time %s", event.event_body.time);
149                 }
150                 _rounds.last_round.add(event);
151                 front_seat(event);
152                 event._round_received = _rounds.last_round;
153             }
154 
155             _rounds.erase;
156             _rounds = Round.Rounder(this);
157             _rounds.last_decided_round = _rounds.last_round;
158             (() @trusted { _event_cache.clear; })();
159             init_event(_owner_node.event.event_package);
160             // front_seat(owen_event);
161             foreach (epack; epacks) {
162                 if (epack.pubkey != channel) {
163                     init_event(epack);
164                 }
165             }
166             foreach (channel, recovered_node; recovered_nodes) {
167                 if (!(channel in _nodes)) {
168                     if (recovered_node.event) {
169                         init_event(recovered_node.event.event_package);
170                     }
171                 }
172             }
173 
174             _nodes.byValue
175                 .map!(n => n.event)
176                 .each!(e => e.initializeOrder);
177         }
178         scope (failure) {
179             _nodes = recovered_nodes;
180         }
181         recovered_nodes = _nodes;
182         _nodes = null;
183         check(isMajority(cast(uint) epacks.length), ConsensusFailCode.HASHGRAPH_EVENT_INITIALIZE);
184         // consensus(epacks.length)
185         //     .check(epacks.length <= node_size, ConsensusFailCode.HASHGRAPH_EVENT_INITIALIZE);
186         // getNode(channel); // Make sure that node_id == 0 is owner node
187         foreach (epack; epacks) {
188             if (epack.pubkey != channel) {
189                 check(!(epack.pubkey in _nodes), ConsensusFailCode.HASHGRAPH_DUBLICATE_WITNESS);
190                 auto node = getNode(epack.pubkey);
191             }
192         }
193     }
194 
195     @nogc
196     const(Round.Rounder) rounds() const pure nothrow {
197         return _rounds;
198     }
199 
200     bool areWeInGraph() const pure nothrow {
201         return _rounds.last_decided_round !is null;
202     }
203 
204     final Pubkey channel() const pure nothrow {
205         return hirpc.net.pubkey;
206     }
207 
208     const(Pubkey[]) channels() const pure nothrow {
209         return _nodes.keys;
210     }
211 
212     bool not_used_channels(const(Pubkey) selected_channel) {
213         if (selected_channel == channel) {
214             return false;
215         }
216         const node = _nodes.get(selected_channel, null);
217         if (node) {
218             return node.state is ExchangeState.NONE;
219         }
220         return true;
221     }
222 
223     alias GraphResonse = const(Pubkey) delegate(
224             GossipNet.ChannelFilter channel_filter,
225             GossipNet.SenderCallBack sender) @safe;
226     alias GraphPayload = const(Document) delegate() @safe;
227 
228     void init_tide(
229             const(GraphResonse) responde,
230             const(GraphPayload) payload,
231             lazy const sdt_t time) {
232         const(HiRPC.Sender) payload_sender() @safe {
233             const doc = payload();
234             immutable epack = event_pack(time, null, doc);
235 
236             const registrated = registerEventPackage(epack);
237 
238             assert(registrated, "Should not fail here");
239             const sender = hirpc.wavefront(tidalWave);
240             return sender;
241         }
242 
243         const(HiRPC.Sender) sharp_sender() @safe {
244             version (EPOCH_LOG) {
245                 log("SENDING sharp sender: %s", owner_node.channel.cutHex);
246             }
247 
248             const sharp_wavefront = sharpWave();
249             const sender = hirpc.wavefront(sharp_wavefront);
250             return sender;
251         }
252 
253         if (areWeInGraph) {
254             const send_channel = responde(
255                     &not_used_channels,
256                     &payload_sender);
257             if (send_channel !is Pubkey.init) {
258                 getNode(send_channel).state = ExchangeState.INIT_TIDE;
259             }
260         }
261         else {
262             const send_channel = responde(
263                     &not_used_channels,
264                     &sharp_sender);
265         }
266     }
267 
268     immutable(EventPackage)* event_pack(
269             lazy const sdt_t time,
270             const(Event) father_event,
271             const Document doc) {
272 
273         const mother_event = getNode(channel).event;
274 
275         immutable ebody = EventBody(doc, mother_event, father_event, time);
276 
277         immutable result = new immutable(EventPackage)(hirpc.net, ebody);
278         return result;
279     }
280 
281     immutable(EventPackage*) eva_pack(lazy const sdt_t time, const Buffer nonce) {
282         const payload = EvaPayload(channel, nonce);
283         immutable eva_event_body = EventBody(payload.toDoc, null, null, time);
284         immutable epack = new immutable(EventPackage)(hirpc.net, eva_event_body);
285         return epack;
286     }
287 
288     Event createEvaEvent(lazy const sdt_t time, const Buffer nonce) {
289         immutable eva_epack = eva_pack(time, nonce);
290         auto eva_event = new Event(eva_epack, this);
291 
292         _event_cache[eva_event.fingerprint] = eva_event;
293         front_seat(eva_event);
294         // set_strongly_seen_mask(eva_event);
295         return eva_event;
296     }
297 
298     alias EventPackageCache = immutable(EventPackage)*[Buffer]; /// EventPackages received from another node in the hashgraph.
299     alias EventCache = Event[Buffer]; /// Events already connected to this hashgraph. 
300 
301     protected {
302         EventCache _event_cache;
303     }
304 
305     void eliminate(scope const(Buffer) fingerprint) pure nothrow {
306         _event_cache.remove(fingerprint);
307     }
308 
309     @nogc
310     size_t number_of_registered_event() const pure nothrow {
311         return _event_cache.length;
312     }
313 
314     // function not used
315     @nogc
316     bool isRegistered(scope const(ubyte[]) fingerprint) const pure nothrow {
317         return (fingerprint in _event_cache) !is null;
318     }
319 
320     Topic topic = Topic("hashgraph_event");
321     package void epoch(Event[] event_collection, const Round decided_round) {
322         refinement.epoch(event_collection, decided_round);
323         if (scrap_depth > 0) {
324             live_events_statistic(Event.count);
325             log(topic, live_events_statistic.stringof, live_events_statistic);
326             live_witness_statistic(Event.Witness.count);
327             log(topic, live_witness_statistic.stringof, live_events_statistic);
328             _rounds.dustman;
329         }
330     }
331 
332     /++
333      @return true if the event package has been register correct
334      +/
335     Event registerEventPackage(
336             immutable(EventPackage*) event_pack)
337     in (event_pack.fingerprint !in _event_cache,
338         format("Event %(%02x%) has already been registerd",
339             event_pack.fingerprint))
340     do {
341         if (valid_channel(event_pack.pubkey)) {
342             auto event = new Event(event_pack, this);
343             _event_cache[event.fingerprint] = event;
344             refinement.epack(event_pack);
345             event.connect(this);
346             return event;
347         }
348         return null;
349     }
350 
351     class Register {
352         private EventPackageCache event_package_cache;
353 
354         this(const Wavefront received_wave) pure nothrow {
355             uint count_events;
356             scope (exit) {
357                 wavefront_event_package_statistic(count_events);
358                 wavefront_event_package_used_statistic(cast(uint) event_package_cache.length);
359             }
360             foreach (e; received_wave.epacks) {
361                 count_events++;
362                 if (!(e.fingerprint in event_package_cache || e.fingerprint in _event_cache)) {
363                     event_package_cache[e.fingerprint] = e;
364                 }
365             }
366         }
367 
368         final Event lookup(const(Buffer) fingerprint) {
369             if (fingerprint in _event_cache) {
370                 return _event_cache[fingerprint];
371             }
372 
373             if (fingerprint in event_package_cache) {
374                 immutable event_pack = event_package_cache[fingerprint];
375                 if (valid_channel(event_pack.pubkey)) {
376                     auto event = new Event(event_pack, this.outer);
377                     _event_cache[fingerprint] = event;
378                     return event;
379                 }
380             }
381             return null;
382         }
383 
384         // function not used
385         final bool isCached(scope const(Buffer) fingerprint) const pure nothrow {
386             return (fingerprint in event_package_cache) !is null;
387         }
388 
389         final Event register(const(Buffer) fingerprint) {
390             Event event;
391 
392             if (!fingerprint) {
393                 return event;
394             }
395 
396             // event either from event_package_cache or event_cache.
397             event = lookup(fingerprint);
398             Event.check(_joining || event !is null, ConsensusFailCode.EVENT_MISSING_IN_CACHE);
399             if (event !is null) {
400                 event.connect(this.outer);
401             }
402             return event;
403         }
404     }
405 
406     protected Register _register;
407 
408     package final Event register(const(Buffer) fingerprint) {
409         if (_register) {
410             return _register.register(fingerprint);
411         }
412 
413         return _event_cache.get(fingerprint, null);
414     }
415 
416     /++
417      Returns:
418      The front event of the send channel
419      +/
420     const(Event) register_wavefront(const Wavefront received_wave, const Pubkey from_channel) {
421         _register = new Register(received_wave);
422 
423         scope (exit) {
424             log(topic, wavefront_event_package_statistic.stringof, wavefront_event_package_statistic);
425             log(topic, wavefront_event_package_used_statistic.stringof, wavefront_event_package_statistic);
426             _register = null;
427         }
428 
429         Event front_seat_event;
430         foreach (fingerprint; _register.event_package_cache.byKey) {
431             auto registered_event = register(fingerprint);
432             if (registered_event.channel == from_channel) {
433                 if (front_seat_event is null) {
434                     front_seat_event = registered_event;
435                 }
436                 else if (higher(registered_event.altitude, front_seat_event.altitude)) {
437                     front_seat_event = registered_event;
438                 }
439             }
440         }
441 
442         return front_seat_event;
443     }
444 
445     @HiRPCMethod const(HiRPC.Sender) wavefront(
446             const Wavefront wave,
447             const uint id = 0) {
448         return hirpc.wavefront(wave, id);
449     }
450 
451     /++ to synchronize two _nodes A and B
452      +  1)
453      +  Node A send it's wave front to B
454      +  This is done via the waveFront function
455      +  2)
456      +  B collects all the events it has which is are in front of the
457      +  wave front of A.
458      +  This is done via the waveFront function
459      +  B send the all the collected event to B including B's wave font of all
460      +  the node which B know it leads in,
461      +  The wave from is collect via the waveFront function by adding the remaining tides
462      +  3)
463      +  A send the rest of the event which is in front of B's wave-front
464      +/
465     const(Wavefront) tidalWave() pure {
466         Tides tides;
467         foreach (pkey, n; _nodes) {
468             if (n.isOnline) {
469                 tides[pkey] = n.altitude;
470                 assert(n._event.isFront);
471             }
472         }
473         return Wavefront(tides);
474     }
475 
476     const(Wavefront) buildWavefront(const ExchangeState state, const Tides tides = null) {
477         if (state is ExchangeState.NONE || state is ExchangeState.BREAKING_WAVE) {
478             return Wavefront(null, null, state);
479         }
480 
481         immutable(EventPackage)*[] result;
482         Tides owner_tides;
483         foreach (n; _nodes) {
484             if (n.channel in tides) {
485                 const other_altitude = tides[n.channel];
486                 foreach (e; n[]) {
487                     if (!higher(e.altitude, other_altitude)) {
488                         owner_tides[n.channel] = e.altitude;
489                         break;
490                     }
491                     result ~= e.event_package;
492 
493                 }
494             }
495         }
496         if (result.length == 0) {
497             return Wavefront(null, null, state);
498         }
499         return Wavefront(result, owner_tides, state);
500     }
501 
502     /** 
503      * 
504      * Params:
505      *   received_wave = The sharp received wave
506      * Returns: either coherent if in graph or rippleWave
507      */
508     const(Wavefront) sharpResponse(const Wavefront received_wave)
509     in {
510         assert(received_wave.state is ExchangeState.SHARP);
511     }
512     do {
513         if (areWeInGraph) {
514             // writefln("sharp response ingraph:true");
515             immutable(EventPackage)*[] result = _rounds.last_decided_round
516                 .events
517                 .filter!((e) => (e !is null))
518                 .map!((e) => cast(immutable(EventPackage)*) e.event_package)
519                 .array;
520             return Wavefront(result, null, ExchangeState.COHERENT);
521         }
522 
523         // if we are not in graph ourselves, we put the delta information
524         // in and return a RIPPLE.
525         auto received_epacks = received_wave
526             .epacks
527             .map!((e) => cast(immutable(EventPackage)*) e)
528             .array
529             .sort!((a, b) => a.fingerprint < b.fingerprint);
530         auto own_epacks = _nodes.byValue
531             .map!((n) => n[])
532             .joiner
533             .map!((e) => cast(immutable(EventPackage)*) e.event_package)
534             .array
535             .sort!((a, b) => a.fingerprint < b.fingerprint);
536         import std.algorithm.setops : setDifference;
537 
538         auto changes = setDifference!((a, b) => a.fingerprint < b.fingerprint)(received_epacks, own_epacks);
539 
540         version (EPOCH_LOG) {
541             log("owner_epacks %s", own_epacks.length);
542         }
543         if (!changes.empty) {
544             // delta received from sharp should be added to our own node. 
545             version (EPOCH_LOG) {
546                 log("changes found");
547             }
548             foreach (epack; changes) {
549                 const epack_node = getNode(epack.pubkey);
550                 auto first_event = new Event(epack, this);
551                 if (epack_node.event is null) {
552                     check(first_event.isEva, ConsensusFailCode.GOSSIPNET_FIRST_EVENT_MUST_BE_EVA);
553                 }
554                 _event_cache[first_event.fingerprint] = first_event;
555                 front_seat(first_event);
556             }
557         }
558 
559         auto result = setDifference!((a, b) => a.fingerprint < b.fingerprint)(own_epacks, received_epacks).array;
560 
561         const state = ExchangeState.RIPPLE;
562         return Wavefront(result, Tides.init, state);
563 
564     }
565 
566     /** 
567      * First time it is called we only send our own eva since this is all we know.
568      * Later we send everything it knows.  
569      * Returns: the wavefront for a node that either wants to join or is booting.
570      */
571     const(Wavefront) sharpWave() {
572         auto result = _nodes.byValue
573             .filter!((n) => (n._event !is null))
574             .map!((n) => cast(immutable(EventPackage)*) n._event.event_package)
575             .array;
576 
577         return Wavefront(result, null, ExchangeState.SHARP);
578     }
579 
580     void wavefront(
581             const HiRPC.Receiver received,
582             lazy const(sdt_t) time,
583             void delegate(const(HiRPC.Sender) send_wave) @safe response,
584             const(Document) delegate() @safe payload) {
585 
586         alias consensus = consensusCheckArguments!(GossipConsensusException);
587         immutable from_channel = received.pubkey;
588         const received_wave = received.params!(Wavefront)(hirpc.net);
589         check(valid_channel(from_channel), ConsensusFailCode.GOSSIPNET_ILLEGAL_CHANNEL);
590         auto received_node = getNode(from_channel);
591 
592         if (Event.callbacks) {
593             Event.callbacks.receive(received_wave);
594         }
595         version (EPOCH_LOG) {
596             log.trace("received_wave(%s <- %s)", received_wave.state, received_node.state);
597         }
598         scope (exit) {
599             version (EPOCH_LOG) {
600                 log.trace("next <- %s", received_node.state);
601             }
602         }
603         const(Wavefront) wavefront_response() @safe {
604             with (ExchangeState) {
605                 final switch (received_wave.state) {
606                 case NONE:
607                 case INIT_TIDE:
608                     consensus(received_wave.state)
609                         .check(false, ConsensusFailCode.GOSSIPNET_ILLEGAL_EXCHANGE_STATE);
610                     break;
611 
612                 case SHARP: ///
613                     received_node.state = NONE;
614                     received_node.sticky_state = SHARP;
615                     version (EPOCH_LOG) {
616                         log("received sharp %s", received_node.channel.cutHex);
617                     }
618                     const sharp_response = sharpResponse(received_wave);
619                     return sharp_response;
620                 case RIPPLE:
621                     received_node.state = RIPPLE;
622                     received_node.sticky_state = RIPPLE;
623 
624                     if (areWeInGraph) {
625                         break;
626                     }
627 
628                     // if we receive a ripplewave, we must add the eva events to our own graph.
629                     const received_epacks = received_wave.epacks;
630                     foreach (epack; received_epacks) {
631                         const epack_node = getNode(epack.pubkey);
632                         auto first_event = new Event(epack, this);
633                         if (epack_node.event is null) {
634                             check(first_event.isEva, ConsensusFailCode.GOSSIPNET_FIRST_EVENT_MUST_BE_EVA);
635                         }
636                         _event_cache[first_event.fingerprint] = first_event;
637                         front_seat(first_event);
638                     }
639 
640                     const contain_all =
641                         _nodes
642                             .byValue
643                             .all!((n) => n._event !is null);
644 
645                     if (contain_all && node_size == _nodes.length) {
646                         const own_epacks = _nodes
647                             .byValue
648                             .map!((n) => n[])
649                             .joiner
650                             .map!((e) => e.event_package)
651                             .array;
652                         version (EPOCH_LOG) {
653                             log("%s going to init witnesses, areweingraph %s", _owner_node.channel.cutHex, areWeInGraph);
654                         }
655                         initialize_witness(own_epacks);
656                     }
657                     break;
658                 case COHERENT:
659                     received_node.state = NONE;
660                     received_node.sticky_state = COHERENT;
661                     version (EPOCH_LOG) {
662                         log("received coherent from: %s, self %s", received_node.channel.cutHex, _owner_node.channel
663                                 .cutHex);
664                     }
665                     if (!areWeInGraph) {
666                         try {
667                             version (EPOCH_LOG) {
668                                 log("GOING to init");
669                             }
670                             initialize_witness(received_wave.epacks);
671                             _owner_node.sticky_state = COHERENT;
672                             _joining = No.joining;
673                         }
674                         catch (ConsensusException e) {
675                             // initialized witness not correct
676                         }
677                     }
678                     break;
679                 case TIDAL_WAVE: ///
680                     if (received_node.state !is NONE || !areWeInGraph || joining) {
681                         received_node.state = NONE;
682                         return buildWavefront(BREAKING_WAVE);
683                     }
684                     check(received_wave.epacks.length is 0, ConsensusFailCode
685                             .GOSSIPNET_TIDAL_WAVE_CONTAINS_EVENTS);
686                     received_node.state = received_wave.state;
687                     immutable epack = event_pack(time, null, payload());
688                     const registered = registerEventPackage(epack);
689                     assert(registered);
690 
691                     const wave = buildWavefront(FIRST_WAVE, received_wave.tides);
692 
693                     return wave;
694                 case BREAKING_WAVE:
695                     received_node.state = NONE;
696                     break;
697                 case FIRST_WAVE:
698                     if (received_node.state !is INIT_TIDE || !areWeInGraph) {
699                         received_node.state = NONE;
700                         return buildWavefront(BREAKING_WAVE);
701                     }
702                     received_node.state = NONE;
703 
704                     const from_front_seat = register_wavefront(received_wave, from_channel);
705                     immutable epack = event_pack(time, from_front_seat, payload());
706                     const registreted = registerEventPackage(epack);
707                     assert(registreted, "The event package has not been registered correct (The wave should be dumped)");
708                     return buildWavefront(SECOND_WAVE, received_wave.tides);
709                 case SECOND_WAVE:
710                     if (received_node.state !is TIDAL_WAVE || !areWeInGraph || joining) {
711                         received_node.state = NONE;
712                         return buildWavefront(BREAKING_WAVE);
713                     }
714                     received_node.state = NONE;
715                     const from_front_seat = register_wavefront(received_wave, from_channel);
716                     immutable epack = event_pack(time, from_front_seat, payload());
717                     const registrated = registerEventPackage(epack);
718                     assert(registrated, "The event package has not been registered correct (The wave should be dumped)");
719                 }
720                 return buildWavefront(NONE);
721             }
722         }
723 
724         const return_wavefront = wavefront_response;
725         if (return_wavefront.state !is ExchangeState.NONE) {
726             const sender = hirpc.wavefront(return_wavefront);
727             response(sender);
728         }
729     }
730 
731     void front_seat(Event event)
732     in {
733         assert(event, "event must be defined");
734     }
735     do {
736         getNode(event.channel).front_seat(event);
737     }
738 
739     @safe
740     class Node {
741         ExchangeState state;
742         immutable size_t node_id;
743         immutable(Pubkey) channel;
744         private bool _offline;
745         private this(const Pubkey channel, const size_t node_id) pure nothrow {
746             this.node_id = node_id;
747             this.channel = channel;
748         }
749 
750         protected ExchangeState _sticky_state = ExchangeState.RIPPLE;
751 
752         void sticky_state(const(ExchangeState) state) pure nothrow @nogc {
753 
754             if (state > _sticky_state) {
755                 _sticky_state = state;
756             }
757         }
758 
759         final bool offline() const pure nothrow @nogc {
760             return _offline;
761         }
762 
763         const(ExchangeState) sticky_state() const pure nothrow @nogc {
764             return _sticky_state;
765         }
766         /++
767          Register first event
768          +/
769         private void front_seat(Event event)
770         in {
771             assert(event.channel == channel, "Wrong channel");
772         }
773         do {
774             if (_event is null) {
775                 _event = event;
776             }
777             else if (higher(event.altitude, _event.altitude)) {
778                 // Event.check(event.mother !is null, ConsensusFailCode.EVENT_MOTHER_LESS);
779                 _event = event;
780             }
781         }
782 
783         private Event _event; /// This is the last event in this Node
784 
785         @nogc
786         const(Event) event() const pure nothrow {
787             return _event;
788         }
789 
790         @nogc pure nothrow {
791             package final Event event() {
792                 return _event;
793             }
794 
795             final bool isOnline() const {
796                 return (_event !is null);
797             }
798 
799             final int altitude() const
800             in {
801                 assert(_event !is null, "This node has no events so the altitude is not set yet");
802             }
803             out {
804                 assert(_event.isFront);
805             }
806             do {
807                 return _event.altitude;
808             }
809 
810             package Event.Range!false opSlice() {
811                 if (_event) {
812                     return _event[];
813                 }
814                 return Event.Range!false(null);
815             }
816 
817             Event.Range!true opSlice() const {
818                 if (_event) {
819                     return _event[];
820                 }
821                 return Event.Range!true(null);
822             }
823         }
824     }
825 
826     import std.traits : fullyQualifiedName;
827 
828     alias NodeRange = typeof((cast(const) _nodes).byValue);
829 
830     @nogc
831     NodeRange opSlice() const pure nothrow {
832         return _nodes.byValue;
833     }
834 
835     @nogc
836     size_t active_nodes() const pure nothrow {
837         return _nodes.length;
838     }
839 
840     @nogc
841     const(SecureNet) net() const pure nothrow {
842         return hirpc.net;
843     }
844 
845     package Node getNode(Pubkey channel) pure {
846         const next_id = next_node_id;
847         return _nodes.require(channel, new Node(channel, next_id));
848     }
849 
850     @nogc
851     bool isMajority(const size_t voting) const pure nothrow {
852         return .isMajority(voting, node_size);
853     }
854 
855     private void remove_node(Node n) nothrow
856     in {
857         assert(n !is null);
858         assert(n.channel in _nodes, __format("Node id %d is not removable because it does not exist", n
859                 .node_id));
860     }
861     do {
862         _nodes.remove(n.channel);
863     }
864 
865     bool remove_node(const Pubkey pkey) nothrow {
866         if (pkey in _nodes) {
867             _nodes.remove(pkey);
868             return true;
869         }
870         return false;
871     }
872 
873     void mark_offline(const(size_t) node_id) nothrow {
874 
875         auto mark_node = _nodes.byKeyValue
876             .filter!((pair) => !pair.value._offline)
877             .filter!((pair) => pair.value.node_id == node_id)
878             .map!(pair => pair.value);
879         if (mark_node.empty) {
880             return;
881         }
882         mark_node.front._offline = true;
883     }
884 
885     @nogc
886     uint next_event_id() pure nothrow {
887         event_id++;
888         if (event_id is event_id.init) {
889             return event_id.init + 1;
890         }
891         return event_id;
892     }
893 
894     size_t next_node_id() const pure nothrow {
895         if (_nodes.length is 0) {
896             return 0;
897         }
898         BitMask used_nodes;
899         _nodes.byValue
900             .map!(a => a.node_id)
901             .each!((n) { used_nodes[n] = true; });
902         return (~used_nodes)[].front;
903     }
904 
905     //bool disable_scrapping;
906 
907     enum max_package_size = 0x1000;
908     enum round_clean_limit = 10;
909 
910     /++
911      Dumps all events in the Hashgraph to a file
912      +/
913     void fwrite(string filename, Pubkey[string] node_labels = null) {
914         import tagion.hashgraphview.EventView;
915         import tagion.hibon.HiBONFile : fwrite;
916 
917         size_t[Pubkey] node_id_relocation;
918         if (node_labels.length) {
919             // assert(node_labels.length is _nodes.length);
920             auto names = node_labels.keys;
921             names.sort;
922             foreach (i, name; names) {
923                 node_id_relocation[node_labels[name]] = i;
924             }
925 
926         }
927         auto events = new HiBON;
928         (() @trusted {
929             foreach (n; _nodes) {
930                 const node_id = (node_id_relocation.length is 0) ? size_t.max : node_id_relocation[n.channel];
931                 n[]
932                     .filter!((e) => !e.isGrounded)
933                     .each!((e) => events[e.id] = EventView(e, node_id));
934             }
935         })();
936         auto h = new HiBON;
937         h[Params.size] = node_size;
938         h[Params.events] = events;
939         filename.fwrite(h);
940     }
941 
942 }