1 module tagion.tools.subscribe; 2 3 import core.time; 4 import nngd; 5 import std.algorithm : countUntil; 6 import std.conv; 7 import std.format; 8 import std.stdio; 9 import std.getopt; 10 import tagion.basic.Types; 11 import tagion.basic.Version; 12 import tagion.hibon.Document; 13 import tagion.hibon.HiBONJSON; 14 import tagion.services.subscription : SubscriptionServiceOptions; 15 import tagion.tools.Basic : Main; 16 import tagion.tools.revision; 17 18 import std.exception; 19 import tagion.crypto.SecureInterfaceNet; 20 import tagion.communication.HiRPC; 21 import tagion.utils.Result; 22 23 struct Subscription { 24 string address; 25 string[] tags; 26 SecureNet net; 27 uint max_attempts = 5; 28 29 private NNGSocket sock; 30 this(string _address, string[] _tags, SecureNet _net = null) @trusted nothrow { 31 address = _address; 32 tags = _tags; 33 net = _net; 34 sock = NNGSocket(nng_socket_type.NNG_SOCKET_SUB); 35 sock.recvtimeout = 100.seconds; 36 foreach (tag; tags) { 37 sock.subscribe(tag); 38 } 39 } 40 41 private bool _isDial; 42 43 Result!bool dial() @trusted nothrow { 44 int rc; 45 foreach (_; 0 .. max_attempts) { 46 rc = sock.dial(address); 47 switch (rc) with (nng_errno) { 48 case NNG_OK: 49 _isDial = true; 50 return result(true); 51 case NNG_ECONNREFUSED: 52 nng_sleep(msecs(100)); 53 continue; 54 default: 55 return Result!bool(false, nng_errstr(rc)); 56 } 57 } 58 return Result!bool(false, nng_errstr(rc)); 59 } 60 61 Result!Document receive() @trusted nothrow { 62 alias _Result = Result!Document; 63 if (!_isDial) { 64 auto d = dial; 65 if (d.error) { 66 return _Result(d.e); 67 } 68 } 69 70 Buffer data; 71 foreach (_; 0 .. max_attempts) { 72 data = sock.receive!Buffer; 73 if (sock.errno != nng_errno.NNG_OK && sock.errno != nng_errno.NNG_ETIMEDOUT) { 74 break; 75 } 76 } 77 78 if (sock.errno != 0) { 79 return _Result(nng_errstr(sock.errno)); 80 } 81 82 if (data.length == 0) { 83 return _Result("Received empty data"); 84 } 85 86 long index = data.countUntil(cast(ubyte) '\0'); 87 if (index == -1) { 88 return _Result("Received data does not begin with a tag"); 89 } 90 91 if (data.length <= index + 1) { 92 return _Result("Received data does not contain a document"); 93 } 94 95 try { 96 auto _doc = Document(data[index + 1 .. $]); 97 auto _receiver = HiRPC.Receiver(net, _doc); 98 return result(_receiver.message); // This could be a hirpc error 99 } 100 catch (Exception e) { 101 return _Result(e); 102 } 103 } 104 } 105 106 mixin Main!(_main); 107 108 int _main(string[] args) { 109 immutable program = args[0]; 110 111 auto default_sub_opts = SubscriptionServiceOptions(); 112 default_sub_opts.setDefault(); 113 string address = default_sub_opts.address; 114 bool version_switch; 115 string[] tags; 116 bool watch; 117 118 auto main_args = getopt(args, 119 "v|version", "Print revision information", &version_switch, 120 "address", "specify the address to subscribe to", &address, 121 "w|watch", "Watch logs", &watch, 122 "tag", "Which tags to subscribe to", &tags, 123 ); 124 125 if (main_args.helpWanted) { 126 defaultGetoptPrinter( 127 format("Help information for %s\n", program), 128 main_args.options 129 ); 130 return 0; 131 } 132 if (version_switch) { 133 revision_text.writeln; 134 return 0; 135 } 136 137 if (watch) { 138 NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_SUB); 139 sock.recvtimeout = msecs(1000); 140 141 if (tags.length == 0) { 142 stderr.writeln("No tags specified"); 143 return 1; 144 } 145 146 auto sub = Subscription(address, tags); 147 auto dialed = sub.dial; 148 if (dialed.error) { 149 stderr.writefln("Dial error: %s (%s)", dialed.e.message, address); 150 return 1; 151 } 152 stderr.writefln("Listening on, %s", address); 153 154 while (true) { 155 auto result = sub.receive; 156 if (result.error) { 157 stderr.writeln(result.e); 158 } 159 else { 160 writeln(result.get.toPretty); 161 } 162 } 163 } 164 return 0; 165 }