1 module tagion.tools.subscribe;
2 
3 import core.time;
4 import nngd;
5 import std.algorithm : countUntil;
6 import std.conv;
7 import std.format;
8 import std.stdio;
9 import std.getopt;
10 import tagion.basic.Types;
11 import tagion.basic.Version;
12 import tagion.hibon.Document;
13 import tagion.hibon.HiBONJSON;
14 import tagion.services.subscription : SubscriptionServiceOptions;
15 import tagion.tools.Basic : Main;
16 import tagion.tools.revision;
17 
18 import std.exception;
19 import tagion.crypto.SecureInterfaceNet;
20 import tagion.communication.HiRPC;
21 import tagion.utils.Result;
22 
23 struct Subscription {
24     string address;
25     string[] tags;
26     SecureNet net;
27     uint max_attempts = 5;
28 
29     private NNGSocket sock;
30     this(string _address, string[] _tags, SecureNet _net = null) @trusted nothrow {
31         address = _address;
32         tags = _tags;
33         net = _net;
34         sock = NNGSocket(nng_socket_type.NNG_SOCKET_SUB);
35         sock.recvtimeout = 100.seconds;
36         foreach (tag; tags) {
37             sock.subscribe(tag);
38         }
39     }
40 
41     private bool _isDial;
42 
43     Result!bool dial() @trusted nothrow {
44         int rc;
45         foreach (_; 0 .. max_attempts) {
46             rc = sock.dial(address);
47             switch (rc) with (nng_errno) {
48             case NNG_OK:
49                 _isDial = true;
50                 return result(true);
51             case NNG_ECONNREFUSED:
52                 nng_sleep(msecs(100));
53                 continue;
54             default:
55                 return Result!bool(false, nng_errstr(rc));
56             }
57         }
58         return Result!bool(false, nng_errstr(rc));
59     }
60 
61     Result!Document receive() @trusted nothrow {
62         alias _Result = Result!Document;
63         if (!_isDial) {
64             auto d = dial;
65             if (d.error) {
66                 return _Result(d.e);
67             }
68         }
69 
70         Buffer data;
71         foreach (_; 0 .. max_attempts) {
72             data = sock.receive!Buffer;
73             if (sock.errno != nng_errno.NNG_OK && sock.errno != nng_errno.NNG_ETIMEDOUT) {
74                 break;
75             }
76         }
77 
78         if (sock.errno != 0) {
79             return _Result(nng_errstr(sock.errno));
80         }
81 
82         if (data.length == 0) {
83             return _Result("Received empty data");
84         }
85 
86         long index = data.countUntil(cast(ubyte) '\0');
87         if (index == -1) {
88             return _Result("Received data does not begin with a tag");
89         }
90 
91         if (data.length <= index + 1) {
92             return _Result("Received data does not contain a document");
93         }
94 
95         try {
96             auto _doc = Document(data[index + 1 .. $]);
97             auto _receiver = HiRPC.Receiver(net, _doc);
98             return result(_receiver.message); // This could be a hirpc error
99         }
100         catch (Exception e) {
101             return _Result(e);
102         }
103     }
104 }
105 
106 mixin Main!(_main);
107 
108 int _main(string[] args) {
109     immutable program = args[0];
110 
111     auto default_sub_opts = SubscriptionServiceOptions();
112     default_sub_opts.setDefault();
113     string address = default_sub_opts.address;
114     bool version_switch;
115     string[] tags;
116     bool watch;
117 
118     auto main_args = getopt(args,
119             "v|version", "Print revision information", &version_switch,
120             "address", "specify the address to subscribe to", &address,
121             "w|watch", "Watch logs", &watch,
122             "tag", "Which tags to subscribe to", &tags,
123     );
124 
125     if (main_args.helpWanted) {
126         defaultGetoptPrinter(
127                 format("Help information for %s\n", program),
128                 main_args.options
129         );
130         return 0;
131     }
132     if (version_switch) {
133         revision_text.writeln;
134         return 0;
135     }
136 
137     if (watch) {
138         NNGSocket sock = NNGSocket(nng_socket_type.NNG_SOCKET_SUB);
139         sock.recvtimeout = msecs(1000);
140 
141         if (tags.length == 0) {
142             stderr.writeln("No tags specified");
143             return 1;
144         }
145 
146         auto sub = Subscription(address, tags);
147         auto dialed = sub.dial;
148         if (dialed.error) {
149             stderr.writefln("Dial error: %s (%s)", dialed.e.message, address);
150             return 1;
151         }
152         stderr.writefln("Listening on, %s", address);
153 
154         while (true) {
155             auto result = sub.receive;
156             if (result.error) {
157                 stderr.writeln(result.e);
158             }
159             else {
160                 writeln(result.get.toPretty);
161             }
162         }
163     }
164     return 0;
165 }