1 module tagion.communication.HandlerPool;
2 
3 import core.time;
4 import std.datetime;
5 import std.stdio;
6 import tagion.basic.Types : Buffer;
7 
8 @safe
9 interface ResponseHandler {
10     void setResponse(Buffer response);
11     bool alive();
12     void close();
13 
14     @safe
15     struct Response(TKey) {
16         immutable(TKey) key;
17         Buffer data;
18         this(const TKey key, Buffer data) inout {
19             import std.traits : isArray, isBasicType;
20 
21             static if (isBasicType!TKey) {
22                 this.key = key;
23             }
24             else if (isArray!TKey) {
25                 static if (is(ForeachType!Tkey == immutable)) {
26                     this.key = key;
27                 }
28                 else {
29                     this.key = key.idup;
30                 }
31             }
32             else if (is(key == immutable)) {
33                 this.key = key;
34             }
35             else {
36                 static assert(0, "TKey " ~ TKey.stringof ~ " not supported");
37             }
38             this.data = data;
39         }
40     }
41 }
42 
43 @safe
44 interface HandlerPool(TValue : ResponseHandler, TKey) {
45     protected final class ActiveHandler {
46         protected TValue handler; //TODO: try immutable/const
47         protected SysTime last_timestamp;
48 
49         protected const bool update_timestamp;
50 
51         /*
52             update_timestamp - for long-live connection
53         */
54         this(TValue value, const bool update_timestamp = false) {
55             handler = value;
56             this.update_timestamp = update_timestamp;
57             this.last_timestamp = Clock.currTime();
58         }
59 
60         void setResponse(Buffer response) {
61             if (update_timestamp) {
62                 this.last_timestamp = Clock.currTime();
63             }
64             handler.setResponse(response);
65         }
66 
67         bool isExpired(const Duration dur) {
68             return (Clock.currTime - last_timestamp) > dur;
69         }
70     }
71 
72     ActiveHandler* get(const TKey key);
73     bool contains(const TKey key);
74     void add(const TKey key, ref TValue value, bool long_lived = false);
75     void remove(const TKey key);
76     ulong size();
77     bool empty();
78     void setResponse(immutable ResponseHandler.Response!TKey resp);
79     void tick();
80 }
81 
82 @safe
83 class StdHandlerPool(TValue : ResponseHandler, TKey) : HandlerPool!(TValue, TKey) {
84     protected ActiveHandler[TKey] handlers; //TODO: should be threadsafe?
85     protected immutable Duration timeout;
86 
87     this(const Duration timeout = Duration.zero) {
88         this.timeout = cast(immutable) timeout;
89     }
90 
91     ActiveHandler* get(const TKey key) {
92         auto valuePtr = (key in handlers);
93         // if(valuePtr is null) return null;
94         return valuePtr;
95     }
96 
97     bool contains(const TKey key) {
98         return get(key) !is null;
99     }
100 
101     void add(const TKey key, ref TValue value, bool long_lived = false)
102     in {
103         assert(!contains(key)); //TODO: special case
104     }
105     do {
106         auto active_handler = new ActiveHandler(value, long_lived);
107         handlers[key] = active_handler;
108     }
109 
110     void remove(const TKey key)
111     out {
112         assert(!contains(key));
113     }
114     do {
115         auto value_ptr = get(key);
116         if (value_ptr !is null) {
117             handlers.remove(key);
118             (*value_ptr).handler.close();
119         }
120     }
121 
122     ulong size() {
123         return handlers.length;
124     }
125 
126     bool empty() {
127         return size == 0;
128     }
129 
130     void setResponse(immutable ResponseHandler.Response!TKey resp) { //TODO: scope(exit) destroy(resp.stream); ?
131         // writeln("set response: ", resp.key);
132         auto active_connection_ptr = get(resp.key);
133         if (active_connection_ptr !is null) {
134             auto active_connection = *active_connection_ptr;
135             active_connection.setResponse(resp.data);
136             if (!active_connection.handler.alive) {
137                 remove(resp.key);
138             }
139         }
140         else {
141             writeln("no respponse handler found");
142         }
143         tick;
144     }
145 
146     void tick() {
147         if (timeout != Duration.zero) {
148             foreach (key, activeHandler; handlers) {
149                 if (activeHandler.isExpired(timeout) || !activeHandler.handler.alive()) {
150                     // if(activeHandler.isExpired(timeout)) writeln("EXPIRED HANDLER");
151                     // else writeln("HANDLER NOT ALIVE");
152                     remove(key);
153                 }
154             }
155         }
156     }
157 }
158 
159 @safe
160 unittest {
161     import core.thread;
162 
163     @safe
164     class FakeResponseHandler : ResponseHandler {
165         bool setResponseCalled = false;
166         bool alived = true;
167         bool closed = false;
168         void setResponse(Buffer response) {
169             setResponseCalled = true;
170         }
171 
172         bool alive() {
173             return alived;
174         }
175 
176         void close() {
177             closed = true;
178         }
179     }
180 
181     version (none) { //HandlerPool: remove handler on expired
182         auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs);
183         auto fakeResponseHandler = new FakeResponseHandler();
184         handler_pool.add(0, fakeResponseHandler);
185         assert(!handler_pool.empty);
186         handler_pool.tick;
187         assert(!handler_pool.empty);
188         Thread.sleep(1.msecs);
189         handler_pool.tick;
190         assert(!handler_pool.empty);
191         Thread.sleep(10.msecs);
192         handler_pool.tick;
193         assert(handler_pool.empty);
194     }
195 
196     version (none) { //HandlerPool: update timestamp on set response
197         auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs);
198         auto fakeResponseHandler = new FakeResponseHandler();
199         handler_pool.add(1, fakeResponseHandler, true);
200         assert(!handler_pool.empty);
201         handler_pool.tick;
202         assert(!handler_pool.empty);
203         Thread.sleep(7.msecs);
204         handler_pool.tick;
205         assert(!handler_pool.empty);
206         immutable response = ResponseHandler.Response!uint(1, cast(Buffer)[]);
207         handler_pool.setResponse(response);
208         assert(fakeResponseHandler.setResponseCalled);
209         Thread.sleep(7.msecs);
210         handler_pool.tick;
211         assert(!handler_pool.empty);
212         Thread.sleep(5.msecs);
213         handler_pool.tick;
214         assert(handler_pool.empty);
215     }
216 
217     { //HandlerPool: remove handler if not alive after set response
218         auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs);
219         auto fakeResponseHandler = new FakeResponseHandler();
220         handler_pool.add(0, fakeResponseHandler);
221         assert(!handler_pool.empty);
222 
223         fakeResponseHandler.alived = false;
224         immutable response = immutable(ResponseHandler.Response!uint)(0, null);
225         handler_pool.setResponse(response);
226         assert(fakeResponseHandler.closed);
227         assert(fakeResponseHandler.setResponseCalled);
228         assert(handler_pool.empty);
229     }
230     { //HandlerPool: remove handler if not alive after tick
231         auto handler_pool = new StdHandlerPool!(FakeResponseHandler, uint)(10.msecs);
232         auto fakeResponseHandler = new FakeResponseHandler();
233         handler_pool.add(0, fakeResponseHandler);
234         assert(!handler_pool.empty);
235         handler_pool.tick;
236         assert(!handler_pool.empty);
237 
238         fakeResponseHandler.alived = false;
239         handler_pool.tick;
240         assert(fakeResponseHandler.closed);
241         assert(!fakeResponseHandler.setResponseCalled);
242         assert(handler_pool.empty);
243     }
244 }