1 module tagion.tools.collider.schedule; 2 3 import core.thread; 4 import std.algorithm; 5 import std.array; 6 import std.datetime.systime; 7 import std.file : exists, mkdirRecurse; 8 import std.format; 9 import std.path : buildNormalizedPath, setExtension; 10 import std.process; 11 import std.range; 12 import std.stdio; 13 import std.traits; 14 import std.path; 15 import std.typecons : Tuple, tuple; 16 import tagion.hibon.HiBONJSON; 17 import tagion.tools.Basic : dry_switch, verbose_switch; 18 import tagion.tools.collider.BehaviourOptions; 19 import tagion.tools.collider.trace : ScheduleTrace; 20 import tagion.tools.toolsexception; 21 import tagion.utils.JSONCommon; 22 import tagion.utils.envexpand; 23 24 @safe 25 struct RunUnit { 26 string[] stages; 27 string[string] envs; 28 string[] args; 29 double timeout; 30 mixin JSONCommon; 31 } 32 33 @safe 34 struct Schedule { 35 RunUnit[string] units; 36 mixin JSONCommon; 37 mixin JSONConfig; 38 auto stages() const pure nothrow { 39 return units 40 .byValue 41 .map!(u => u.stages) 42 .join 43 .dup 44 .sort 45 .uniq; 46 47 } 48 } 49 50 alias Runner = Tuple!( 51 // ProcessPipes, "pipe", 52 Pid, "pid", 53 File, "fout", 54 RunUnit, "unit", 55 string, "name", 56 string, "stage", 57 SysTime, "time", 58 long, "jobid", 59 ); 60 61 enum TEST_STAGE = "TEST_STAGE"; 62 enum COLLIDER_ROOT = "COLLIDER_ROOT"; 63 enum BDD_LOG = "BDD_LOG"; 64 enum BDD_RESULTS = "BDD_RESULTS"; 65 @safe 66 struct ScheduleRunner { 67 Schedule schedule; 68 const(string[]) stages; 69 const uint jobs; 70 ScheduleTrace report; 71 const BehaviourOptions opts; 72 const bool cov_enable; 73 @disable this(); 74 this( 75 ref Schedule schedule, 76 const(string[]) stages, 77 const uint jobs, 78 const BehaviourOptions opts, 79 const bool cov_enable, 80 ScheduleTrace report = null) pure nothrow 81 in (jobs > 0) 82 in (stages.length > 0) 83 do { 84 this.schedule = schedule; 85 this.stages = stages; 86 this.jobs = jobs; 87 this.opts = opts; 88 this.report = report; 89 this.cov_enable = cov_enable; 90 } 91 92 static void sleep(Duration val) nothrow @nogc @trusted { 93 Thread.sleep(val); 94 } 95 96 void opDispatch(string op, Args...)(Args args) { 97 if (report) { 98 enum code = format(q{report.%s(args);}, op); 99 mixin(code); 100 } 101 } 102 103 void progress(Args...)(const string fmt, Args args) @trusted { 104 if (!opts.silent) { 105 import tagion.utils.Term; 106 107 writef(CLEAREOL ~ fmt ~ "\r", args); 108 stdout.flush; 109 } 110 } 111 112 void setEnv(ref string[string] env, string stage) { 113 if (stage) { 114 env[TEST_STAGE] = stage; 115 } 116 if (COLLIDER_ROOT in env) { 117 env[BDD_LOG] = buildNormalizedPath(env[COLLIDER_ROOT], stage); 118 env[BDD_RESULTS] = buildNormalizedPath(env[COLLIDER_ROOT], stage, "results"); 119 if (!env[BDD_LOG].exists) { 120 env[BDD_LOG].mkdirRecurse; 121 } 122 if (!env[BDD_RESULTS].exists) { 123 env[BDD_RESULTS].mkdirRecurse; 124 } 125 } 126 } 127 128 static void kill(Pid pid) @trusted { 129 try { 130 131 132 133 .kill(pid); //.ifThown!ProcessException; 134 } 135 catch (ProcessException e) { 136 // ignore 137 } 138 } 139 140 void showEnv(const(string[string]) env, const(RunUnit) unit) { 141 if (verbose_switch) { 142 writeln("Environment:"); 143 env.byKeyValue 144 .each!(e => writefln("%s = %s", e.key, e.value)); 145 return; 146 } 147 if (dry_switch) { 148 writeln("Collider environment:"); 149 const env_list = [COLLIDER_ROOT, BDD_LOG, BDD_RESULTS, TEST_STAGE] ~ unit.envs.keys; 150 env_list 151 .each!(name => writefln("%s = %s", name, env.get(name, null))); 152 } 153 154 } 155 156 int run(scope const(char[])[] args) { 157 alias Stage = Tuple!(RunUnit, "unit", string, "name", string, "stage"); 158 auto schedule_list = stages 159 .map!(stage => schedule.units 160 .byKeyValue 161 .filter!(unit => unit.value.stages.canFind(stage)) 162 .map!(unit => Stage(unit.value, unit.key, stage))) 163 .joiner; 164 if (schedule_list.empty) { 165 writefln("None of the stage %s available", stages); 166 writefln("Available stages %s", schedule.stages); 167 return 1; 168 } 169 auto runners = new Runner[jobs]; 170 171 void batch( 172 const ptrdiff_t job_index, 173 const SysTime time, 174 const(char[][]) cmd, 175 const(string) log_filename, 176 const(string[string]) env) { 177 static uint job_count; 178 scope (exit) { 179 job_count++; 180 } 181 if (dry_switch) { 182 const line_length = cmd.map!(c => c.length).sum; 183 writefln("%-(%s%)", '#'.repeat(max(min(line_length, 30), 80))); 184 writefln("%d] %-(%s %)", job_count, cmd); 185 writefln("Log file %s", log_filename); 186 writefln("Unit = %s", schedule_list.front.unit.toJSON.toPrettyString); 187 } 188 else { 189 auto fout = File(log_filename, "w"); 190 auto _stdin = (() @trusted => stdin)(); 191 192 Pid pid; 193 if (!cov_enable) { 194 pid = spawnProcess(cmd, _stdin, fout, fout, env); 195 } 196 else { 197 const cov_path = buildPath(environment.get(BDD_LOG, "logs"), "cov").relativePath; 198 const cov_flags = format(" --DRT-covopt=\"dstpath:%s merge:1\"", cov_path); 199 mkdirRecurse(cov_path); 200 // For some reason the drt cov flags don't work when spawned as a process 201 // so we just run it in a shell 202 pid = spawnShell(cmd.join(" ") ~ cov_flags, _stdin, fout, fout, env); 203 } 204 205 writefln("%d] %-(%s %) # pid=%d", job_index, cmd, 206 pid.processID); 207 runners[job_index] = Runner( 208 pid, 209 fout, 210 schedule_list.front.unit, 211 schedule_list.front.name, 212 schedule_list.front.stage, 213 time, 214 job_index 215 ); 216 } 217 showEnv(env, schedule_list.front.unit); 218 } 219 220 void teminate(ref Runner runner) { 221 this.stopped(runner); 222 runner = Runner.init; 223 } 224 225 uint count; 226 static immutable progress_meter = [ 227 "|", 228 "/", 229 "-", 230 "\\", 231 ]; 232 233 while (!schedule_list.empty || runners.any!(r => r.pid !is r.pid.init)) { 234 if (!schedule_list.empty) { 235 const job_index = runners.countUntil!(r => r.pid is r.pid.init); 236 if (job_index >= 0) { 237 try { 238 auto time = Clock.currTime; 239 auto env = environment.toAA; 240 schedule_list.front.unit.envs.byKeyValue 241 .each!(e => env[e.key] = envExpand(e.value, env)); 242 const cmd = args ~ 243 schedule_list.front.name ~ 244 schedule_list.front.unit.args 245 .map!(arg => envExpand(arg, env)) 246 .array; 247 setEnv(env, schedule_list.front.stage); 248 check((BDD_RESULTS in env) !is null, 249 format("Environment variable %s or %s must be defined", BDD_RESULTS, COLLIDER_ROOT)); 250 const log_filename = buildNormalizedPath(env[BDD_RESULTS], 251 schedule_list.front.name).setExtension("log"); 252 batch(job_index, time, cmd, log_filename, env); 253 schedule_list.popFront; 254 } 255 catch (Exception e) { 256 writefln("Error %s", e.msg); 257 runners[job_index].fout.writeln("Error: %s", e.msg); 258 runners[job_index].fout.close; 259 kill(runners[job_index].pid); 260 runners[job_index] = Runner.init; 261 } 262 } 263 } 264 265 runners 266 .filter!(r => r.pid !is r.pid.init) 267 .filter!(r => tryWait(r.pid).terminated) 268 .each!((ref r) => teminate(r)); 269 progress("%s Running jobs %s", 270 progress_meter[count % progress_meter.length], 271 runners 272 .enumerate 273 .filter!(r => r.value.pid !is r.value.pid.init) 274 .map!(r => r.index), 275 ); 276 count++; 277 sleep(100.msecs); 278 } 279 progress("Done"); 280 return 0; 281 } 282 }