1 module tagion.services.monitor;
2 
3 import core.thread;
4 import std.format;
5 import std.socket;
6 import tagion.actor;
7 import tagion.actor.exceptions;
8 import tagion.basic.Types : FileExtension;
9 import tagion.basic.tagionexceptions : TagionException;
10 import tagion.hibon.Document;
11 import tagion.logger.Logger;
12 import tagion.network.ListenerSocket;
13 import tagion.utils.JSONCommon;
14 import tagion.utils.pretend_safe_concurrency;
15 
16 @safe
17 struct MonitorOptions {
18     bool enable = false; /// When enabled the Monitor is started
19     ushort port = 10900; /// Monitor port
20     uint timeout = 500; ///.service.server.listerne timeout in msecs
21     FileExtension dataformat = FileExtension.json;
22     string url = "127.0.0.1";
23     string taskname = "monitor";
24     mixin JSONCommon;
25 }
26 
27 void monitorServiceTask(immutable(MonitorOptions) opts) @trusted nothrow {
28     try {
29         // setState(Ctrl.STARTING);
30         // scope (exit) {
31         //     setState(Ctrl.END);
32         // }
33 
34         log.task_name = opts.taskname;
35 
36         log("SockectThread port=%d addresss=%s", opts.port, opts.url);
37         auto listener_socket = ListenerSocket(opts.url,
38                 opts.port, opts.timeout, opts.taskname);
39         auto listener_socket_thread = listener_socket.start;
40 
41         scope (exit) {
42             listener_socket.stop;
43         }
44 
45         void taskfailure(immutable(TaskFailure) t) @safe {
46             ownerTid.send(t);
47         }
48 
49         setState(Ctrl.ALIVE);
50         while (!thisActor.stop) {
51             pragma(msg, "REV: Should the 500.msecs be an opts");
52             receiveTimeout(500.msecs, //Control the thread
53                     &signal,
54                     &ownerTerminated,
55                     (string json) @trusted { listener_socket.broadcast(json); },
56                     (immutable(ubyte)[] hibon_bytes) @trusted { listener_socket.broadcast(hibon_bytes); },
57                     (Document doc) @trusted { listener_socket.broadcast(doc); },
58                     &taskfailure
59             );
60         }
61     }
62     catch (Throwable t) {
63         import std.stdio;
64 
65         fail(t);
66     }
67 }