1 module tagion.tools.collider.schedule;
2 
3 import core.thread;
4 import std.algorithm;
5 import std.array;
6 import std.datetime.systime;
7 import std.file : exists, mkdirRecurse;
8 import std.format;
9 import std.path : buildNormalizedPath, setExtension;
10 import std.process;
11 import std.range;
12 import std.stdio;
13 import std.traits;
14 import std.path;
15 import std.typecons : Tuple, tuple;
16 import tagion.hibon.HiBONJSON;
17 import tagion.tools.Basic : dry_switch, verbose_switch;
18 import tagion.tools.collider.BehaviourOptions;
19 import tagion.tools.collider.trace : ScheduleTrace;
20 import tagion.tools.toolsexception;
21 import tagion.utils.JSONCommon;
22 import tagion.utils.envexpand;
23 
24 @safe
25 struct RunUnit {
26     string[] stages;
27     string[string] envs;
28     string[] args;
29     double timeout;
30     mixin JSONCommon;
31 }
32 
33 @safe
34 struct Schedule {
35     RunUnit[string] units;
36     mixin JSONCommon;
37     mixin JSONConfig;
38     auto stages() const pure nothrow {
39         return units
40             .byValue
41             .map!(u => u.stages)
42             .join
43             .dup
44             .sort
45             .uniq;
46 
47     }
48 }
49 
50 alias Runner = Tuple!(
51         //  ProcessPipes, "pipe",
52         Pid, "pid",
53         File, "fout",
54         RunUnit, "unit",
55         string, "name",
56         string, "stage",
57         SysTime, "time",
58         long, "jobid",
59 );
60 
61 enum TEST_STAGE = "TEST_STAGE";
62 enum COLLIDER_ROOT = "COLLIDER_ROOT";
63 enum BDD_LOG = "BDD_LOG";
64 enum BDD_RESULTS = "BDD_RESULTS";
65 @safe
66 struct ScheduleRunner {
67     Schedule schedule;
68     const(string[]) stages;
69     const uint jobs;
70     ScheduleTrace report;
71     const BehaviourOptions opts;
72     const bool cov_enable;
73     @disable this();
74     this(
75             ref Schedule schedule,
76             const(string[]) stages,
77     const uint jobs,
78     const BehaviourOptions opts,
79     const bool cov_enable,
80     ScheduleTrace report = null) pure nothrow
81     in (jobs > 0)
82     in (stages.length > 0)
83     do {
84         this.schedule = schedule;
85         this.stages = stages;
86         this.jobs = jobs;
87         this.opts = opts;
88         this.report = report;
89         this.cov_enable = cov_enable;
90     }
91 
92     static void sleep(Duration val) nothrow @nogc @trusted {
93         Thread.sleep(val);
94     }
95 
96     void opDispatch(string op, Args...)(Args args) {
97         if (report) {
98             enum code = format(q{report.%s(args);}, op);
99             mixin(code);
100         }
101     }
102 
103     void progress(Args...)(const string fmt, Args args) @trusted {
104         if (!opts.silent) {
105             import tagion.utils.Term;
106 
107             writef(CLEAREOL ~ fmt ~ "\r", args);
108             stdout.flush;
109         }
110     }
111 
112     void setEnv(ref string[string] env, string stage) {
113         if (stage) {
114             env[TEST_STAGE] = stage;
115         }
116         if (COLLIDER_ROOT in env) {
117             env[BDD_LOG] = buildNormalizedPath(env[COLLIDER_ROOT], stage);
118             env[BDD_RESULTS] = buildNormalizedPath(env[COLLIDER_ROOT], stage, "results");
119             if (!env[BDD_LOG].exists) {
120                 env[BDD_LOG].mkdirRecurse;
121             }
122             if (!env[BDD_RESULTS].exists) {
123                 env[BDD_RESULTS].mkdirRecurse;
124             }
125         }
126     }
127 
128     static void kill(Pid pid) @trusted {
129         try {
130 
131             
132 
133                 .kill(pid); //.ifThown!ProcessException;
134         }
135         catch (ProcessException e) {
136             // ignore
137         }
138     }
139 
140     void showEnv(const(string[string]) env, const(RunUnit) unit) {
141         if (verbose_switch) {
142             writeln("Environment:");
143             env.byKeyValue
144                 .each!(e => writefln("%s = %s", e.key, e.value));
145             return;
146         }
147         if (dry_switch) {
148             writeln("Collider environment:");
149             const env_list = [COLLIDER_ROOT, BDD_LOG, BDD_RESULTS, TEST_STAGE] ~ unit.envs.keys;
150             env_list
151                 .each!(name => writefln("%s = %s", name, env.get(name, null)));
152         }
153 
154     }
155 
156     int run(scope const(char[])[] args) {
157         alias Stage = Tuple!(RunUnit, "unit", string, "name", string, "stage");
158         auto schedule_list = stages
159             .map!(stage => schedule.units
160                     .byKeyValue
161                     .filter!(unit => unit.value.stages.canFind(stage))
162                     .map!(unit => Stage(unit.value, unit.key, stage)))
163             .joiner;
164         if (schedule_list.empty) {
165             writefln("None of the stage %s available", stages);
166             writefln("Available stages %s", schedule.stages);
167             return 1;
168         }
169         auto runners = new Runner[jobs];
170 
171         void batch(
172                 const ptrdiff_t job_index,
173                 const SysTime time,
174                 const(char[][]) cmd,
175         const(string) log_filename,
176         const(string[string]) env) {
177             static uint job_count;
178             scope (exit) {
179                 job_count++;
180             }
181             if (dry_switch) {
182                 const line_length = cmd.map!(c => c.length).sum;
183                 writefln("%-(%s%)", '#'.repeat(max(min(line_length, 30), 80)));
184                 writefln("%d] %-(%s %)", job_count, cmd);
185                 writefln("Log file %s", log_filename);
186                 writefln("Unit = %s", schedule_list.front.unit.toJSON.toPrettyString);
187             }
188             else {
189                 auto fout = File(log_filename, "w");
190                 auto _stdin = (() @trusted => stdin)();
191 
192                 Pid pid;
193                 if (!cov_enable) {
194                     pid = spawnProcess(cmd, _stdin, fout, fout, env);
195                 }
196                 else {
197                     const cov_path = buildPath(environment.get(BDD_LOG, "logs"), "cov").relativePath;
198                     const cov_flags = format(" --DRT-covopt=\"dstpath:%s merge:1\"", cov_path);
199                     mkdirRecurse(cov_path);
200                     // For some reason the drt cov flags don't work when spawned as a process 
201                     // so we just run it in a shell
202                     pid = spawnShell(cmd.join(" ") ~ cov_flags, _stdin, fout, fout, env);
203                 }
204 
205                 writefln("%d] %-(%s %) # pid=%d", job_index, cmd,
206                         pid.processID);
207                 runners[job_index] = Runner(
208                         pid,
209                         fout,
210                         schedule_list.front.unit,
211                         schedule_list.front.name,
212                         schedule_list.front.stage,
213                         time,
214                         job_index
215                 );
216             }
217             showEnv(env, schedule_list.front.unit);
218         }
219 
220         void teminate(ref Runner runner) {
221             this.stopped(runner);
222             runner = Runner.init;
223         }
224 
225         uint count;
226         static immutable progress_meter = [
227             "|",
228             "/",
229             "-",
230             "\\",
231         ];
232 
233         while (!schedule_list.empty || runners.any!(r => r.pid !is r.pid.init)) {
234             if (!schedule_list.empty) {
235                 const job_index = runners.countUntil!(r => r.pid is r.pid.init);
236                 if (job_index >= 0) {
237                     try {
238                         auto time = Clock.currTime;
239                         auto env = environment.toAA;
240                         schedule_list.front.unit.envs.byKeyValue
241                             .each!(e => env[e.key] = envExpand(e.value, env));
242                         const cmd = args ~
243                             schedule_list.front.name ~
244                             schedule_list.front.unit.args
245                                 .map!(arg => envExpand(arg, env))
246                                 .array;
247                         setEnv(env, schedule_list.front.stage);
248                         check((BDD_RESULTS in env) !is null,
249                                 format("Environment variable %s or %s must be defined", BDD_RESULTS, COLLIDER_ROOT));
250                         const log_filename = buildNormalizedPath(env[BDD_RESULTS],
251                         schedule_list.front.name).setExtension("log");
252                         batch(job_index, time, cmd, log_filename, env);
253                         schedule_list.popFront;
254                     }
255                     catch (Exception e) {
256                         writefln("Error %s", e.msg);
257                         runners[job_index].fout.writeln("Error: %s", e.msg);
258                         runners[job_index].fout.close;
259                         kill(runners[job_index].pid);
260                         runners[job_index] = Runner.init;
261                     }
262                 }
263             }
264 
265             runners
266                 .filter!(r => r.pid !is r.pid.init)
267                 .filter!(r => tryWait(r.pid).terminated)
268                 .each!((ref r) => teminate(r));
269             progress("%s Running jobs %s",
270                     progress_meter[count % progress_meter.length],
271                     runners
272                     .enumerate
273                     .filter!(r => r.value.pid !is r.value.pid.init)
274                     .map!(r => r.index),
275             );
276             count++;
277             sleep(100.msecs);
278         }
279         progress("Done");
280         return 0;
281     }
282 }