Core.pm 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. # ex:ts=8 sw=4:
  2. # $OpenBSD: Core.pm,v 1.83 2016/05/08 09:31:38 espie Exp $
  3. #
  4. # Copyright (c) 2010-2013 Marc Espie <espie@openbsd.org>
  5. #
  6. # Permission to use, copy, modify, and distribute this software for any
  7. # purpose with or without fee is hereby granted, provided that the above
  8. # copyright notice and this permission notice appear in all copies.
  9. #
  10. # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  11. # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  12. # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  13. # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  14. # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  15. # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  16. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  17. use strict;
  18. use warnings;
  19. use DPB::Util;
  20. # here, a "core" is an entity responsible for scheduling cpu, such as
  21. # running a job, which is a collection of tasks.
  22. #
  23. # in DPB terms, to run something AND WAIT FOR IT in an asynchronous way,
  24. # you must schedule it on a core, which gives you a process id that's
  25. # registered
  26. #
  27. # the "abstract core" part only sees about registering/unregistering cores,
  28. # and having a global event handler that gets run whenever possible.
  29. package DPB::Core::Abstract;
  30. use POSIX ":sys_wait_h";
  31. use OpenBSD::Error;
  32. use DPB::Util;
  33. use DPB::Job;
  34. # need to know which host are around for affinity purposes
  35. my %allhosts;
  36. sub matches_affinity
  37. {
  38. my ($self, $v) = @_;
  39. my $hostname = $v->{affinity};
  40. # same host
  41. if ($self->hostname eq $hostname) {
  42. return 1;
  43. }
  44. # ... or host isn't around
  45. return 1 if !defined $allhosts{$hostname};
  46. # okay, try to avoid this
  47. return 0;
  48. }
  49. # note that we play dangerously, e.g., we only keep cores that are running
  50. # something in there, the code can keep some others.
  51. my ($running, $special) = ({}, {});
  52. sub repositories
  53. {
  54. return ($running, $special);
  55. }
  56. my @extra_stuff = ();
  57. sub register_event
  58. {
  59. my ($class, $code) = @_;
  60. push(@extra_stuff, $code);
  61. }
  62. sub handle_events
  63. {
  64. for my $code (@extra_stuff) {
  65. &$code;
  66. }
  67. }
  68. sub is_alive
  69. {
  70. my $self = shift;
  71. return $self->host->is_alive;
  72. }
  73. sub shell
  74. {
  75. my $self = shift;
  76. if ($self->{user}) {
  77. return $self->host->shell->run_as($self->{user});
  78. } else {
  79. return $self->host->shell;
  80. }
  81. }
  82. sub new
  83. {
  84. my ($class, $host, $prop) = @_;
  85. my $c = bless {host => DPB::Host->new($host, $prop)}, $class;
  86. $allhosts{$c->hostname} = 1;
  87. return $c;
  88. }
  89. sub clone
  90. {
  91. my $self = shift;
  92. my $c = ref($self)->new($self->hostname, $self->prop);
  93. return $c;
  94. }
  95. sub host
  96. {
  97. my $self = shift;
  98. return $self->{host};
  99. }
  100. sub prop
  101. {
  102. my $self = shift;
  103. return $self->host->{prop};
  104. }
  105. sub sf
  106. {
  107. my $self = shift;
  108. return $self->prop->{sf};
  109. }
  110. sub stuck_timeout
  111. {
  112. my $self = shift;
  113. return $self->prop->{stuck_timeout};
  114. }
  115. sub fetch_timeout
  116. {
  117. my $self = shift;
  118. return $self->prop->{fetch_timeout};
  119. }
  120. sub memory
  121. {
  122. my $self = shift;
  123. return $self->prop->{memory};
  124. }
  125. sub parallel
  126. {
  127. my $self = shift;
  128. return $self->prop->{parallel};
  129. }
  130. sub hostname
  131. {
  132. my $self = shift;
  133. return $self->host->name;
  134. }
  135. sub lockname
  136. {
  137. my $self = shift;
  138. return "host:".$self->hostname;
  139. }
  140. sub logname
  141. {
  142. &hostname;
  143. }
  144. sub print_parent
  145. {
  146. # Nothing to do
  147. }
  148. sub fullhostname
  149. {
  150. my $self = shift;
  151. return $self->host->fullname;
  152. }
  153. sub register
  154. {
  155. my ($self, $pid) = @_;
  156. $self->{pid} = $pid;
  157. $self->repository->{$self->{pid}} = $self;
  158. }
  159. sub unregister
  160. {
  161. my ($self, $status) = @_;
  162. delete $self->repository->{$self->{pid}};
  163. delete $self->{pid};
  164. $self->{status} = $status;
  165. return $self;
  166. }
  167. sub terminate
  168. {
  169. my $self = shift;
  170. if (defined $self->{pid}) {
  171. waitpid($self->{pid}, 0);
  172. $self->unregister($?);
  173. return $self;
  174. } else {
  175. return undef;
  176. }
  177. }
  178. sub reap_kid
  179. {
  180. my ($class, $kid) = @_;
  181. if (defined $kid && $kid > 0) {
  182. for my $repo ($class->repositories) {
  183. if (defined $repo->{$kid}) {
  184. $repo->{$kid}->unregister($?)->continue;
  185. last;
  186. }
  187. }
  188. }
  189. return $kid;
  190. }
  191. sub reap
  192. {
  193. my ($class, $all) = @_;
  194. my $reaped = 0;
  195. $class->handle_events;
  196. $reaped++ while $class->reap_kid(waitpid(-1, WNOHANG)) > 0;
  197. return $reaped;
  198. }
  199. sub reap_wait
  200. {
  201. my ($class, $reporter) = @_;
  202. return $class->reap_kid(waitpid(-1, 0));
  203. }
  204. sub cleanup
  205. {
  206. my $class = shift;
  207. local $> = 0;
  208. for my $repo ($class->repositories) {
  209. for my $pid (keys %$repo) {
  210. kill INT => $pid;
  211. }
  212. }
  213. }
  214. sub debug_dump
  215. {
  216. my $self = shift;
  217. return $self->hostname;
  218. }
  219. OpenBSD::Handler->register( sub { __PACKAGE__->cleanup });
  220. # this is a core that can run jobs
  221. package DPB::Core::WithJobs;
  222. our @ISA = qw(DPB::Core::Abstract);
  223. sub fh
  224. {
  225. my $self = shift;
  226. return $self->task->{fh};
  227. }
  228. sub job
  229. {
  230. my $self = shift;
  231. return $self->{job};
  232. }
  233. sub debug_dump
  234. {
  235. my $self = shift;
  236. return join(':',$self->hostname, $self->job->debug_dump);
  237. }
  238. sub task
  239. {
  240. my $self = shift;
  241. return $self->job->{task};
  242. }
  243. sub terminate
  244. {
  245. my $self = shift;
  246. $self->task->end if $self->task;
  247. if ($self->SUPER::terminate) {
  248. $self->job->finalize($self);
  249. }
  250. }
  251. sub run_task
  252. {
  253. my $core = shift;
  254. my $pid = $core->task->fork($core);
  255. if (!defined $pid) {
  256. DPB::Util->die_bang("Oops: task ".$core->task->name." couldn't start");
  257. } elsif ($pid == 0) {
  258. $DB::inhibit_exit = 0;
  259. for my $sig (keys %SIG) {
  260. $SIG{$sig} = 'DEFAULT';
  261. }
  262. if (!$core->task->run($core)) {
  263. exit(1);
  264. }
  265. exit(0);
  266. } else {
  267. $core->task->process($core);
  268. $core->register($pid);
  269. }
  270. }
  271. sub continue
  272. {
  273. my $core = shift;
  274. if ($core->task->finalize($core)) {
  275. return $core->start_task;
  276. } else {
  277. return $core->job->finalize($core);
  278. }
  279. }
  280. sub start_task
  281. {
  282. my $core = shift;
  283. my $task = $core->job->next_task($core);
  284. $core->job->{task} = $task;
  285. if (defined $task) {
  286. return $core->run_task;
  287. } else {
  288. return $core->job->finalize($core);
  289. }
  290. }
  291. sub mark_ready
  292. {
  293. my $self = shift;
  294. if ($self->{pid}) {
  295. require Data::Dumper;
  296. #print Data::Dumper::Dumper($self), "\n";
  297. DPB::Util->die("Marking ready an incomplete process");
  298. }
  299. delete $self->{job};
  300. return $self;
  301. }
  302. use Time::HiRes qw(time);
  303. sub start_job
  304. {
  305. my ($core, $job) = @_;
  306. $core->{job} = $job;
  307. $core->{started} = time;
  308. $core->{status} = 0;
  309. $core->start_task;
  310. }
  311. sub success
  312. {
  313. my $self = shift;
  314. $self->host->{consecutive_failures} = 0;
  315. }
  316. sub failure
  317. {
  318. my $self = shift;
  319. $self->host->{consecutive_failures}++;
  320. }
  321. sub start_clock
  322. {
  323. my ($class, $tm) = @_;
  324. DPB::Core::Clock->start($tm);
  325. }
  326. package DPB::Core;
  327. our @ISA = qw(DPB::Core::WithJobs);
  328. my $available = [];
  329. # used to remove cores from the build
  330. my %stopped = ();
  331. my $logdir;
  332. my $lastcount = 0;
  333. sub log_concurrency
  334. {
  335. my ($class, $time, $fh) = @_;
  336. my $j = 0;
  337. while (my ($k, $c) = each %{$class->repository}) {
  338. $j++;
  339. if (defined $c->{swallow}) {
  340. $j += $c->{swallow};
  341. }
  342. if (defined $c->{swallowed}) {
  343. $j += scalar(@{$c->{swallowed}});
  344. }
  345. }
  346. if ($j != $lastcount) {
  347. print $fh "$$ $time $j\n";
  348. $lastcount = $j;
  349. }
  350. }
  351. sub set_logdir
  352. {
  353. my $class = shift;
  354. $logdir = shift;
  355. }
  356. sub is_local
  357. {
  358. my $self = shift;
  359. return $self->host->is_localhost;
  360. }
  361. my @extra_report = ();
  362. my @extra_important = ();
  363. sub register_report
  364. {
  365. my ($self, $code, $important) = @_;
  366. push (@extra_report, $code);
  367. push (@extra_important, $important);
  368. }
  369. sub repository
  370. {
  371. return $running;
  372. }
  373. sub walk_same_host_jobs
  374. {
  375. my ($self, $sub) = @_;
  376. while (my ($pid, $core) = each %{$self->repository}) {
  377. next if $core->hostname ne $self->hostname;
  378. # XXX only interested in "real" jobs now
  379. next if !defined $core->job->{v};
  380. &$sub($pid, $core->job);
  381. }
  382. }
  383. sub same_host_jobs
  384. {
  385. my $self = shift;
  386. my @jobs = ();
  387. $self->walk_same_host_jobs(sub {
  388. my ($pid, $job) = @_;
  389. push(@jobs, $job);
  390. });
  391. return @jobs;
  392. }
  393. sub wake_jobs
  394. {
  395. my $self = shift;
  396. my ($alarm, $sleepin);
  397. for my $core (values %{$self->repository}) {
  398. next if !defined $core->job->{v};
  399. if ($core->job->{wakemeup}) {
  400. $alarm->{$core->hostname} = $core;
  401. }
  402. if ($core->job->{locked}) {
  403. $sleepin->{$core->hostname} = 1;
  404. }
  405. }
  406. while (my ($host, $core) = each %$alarm) {
  407. next if $sleepin->{$host};
  408. $core->job->wake_others($core);
  409. }
  410. }
  411. sub one_core
  412. {
  413. my ($core, $time) = @_;
  414. my $hostname = $core->hostname;
  415. my $s = $core->job->name;
  416. if ($core->{squiggle}) {
  417. $s = '~'.$s;
  418. }
  419. if (defined $core->{swallowed}) {
  420. $s = (scalar(@{$core->{swallowed}})+1).'*'.$s;
  421. }
  422. if ($core->{inmem}) {
  423. $s .= '+';
  424. }
  425. $s .= " [$core->{pid}]";
  426. if (!DPB::Host->name_is_localhost($hostname)) {
  427. $s .= " on ".$hostname;
  428. }
  429. if ($core->job) {
  430. $s .= $core->job->watched($time, $core);
  431. }
  432. return $s;
  433. }
  434. sub report
  435. {
  436. my $current = time();
  437. my $s = join("\n", map {one_core($_, $current)} sort {$a->{started} <=> $b->{started}} values %$running). "\n";
  438. for my $a (@extra_report) {
  439. $s .= &$a;
  440. }
  441. return $s;
  442. }
  443. sub important
  444. {
  445. my $current = time();
  446. my $s = '';
  447. for my $j (values %$running) {
  448. if ($j->job->really_watch($current)) {
  449. $s .= one_core($j, $current)."\n";
  450. }
  451. }
  452. for my $a (@extra_important) {
  453. $s .= &$a;
  454. }
  455. return $s;
  456. }
  457. sub mark_ready
  458. {
  459. my $self = shift;
  460. $self->SUPER::mark_ready;
  461. my $hostname = $self->hostname;
  462. if (-e "$logdir/stop-$hostname") {
  463. push(@{$stopped{$hostname}}, $self);
  464. } else {
  465. $self->mark_available($self);
  466. }
  467. return $self;
  468. }
  469. sub avail
  470. {
  471. my $self = shift;
  472. for my $h (keys %stopped) {
  473. if (!-e "$logdir/stop-$h") {
  474. for my $c (@{$stopped{$h}}) {
  475. $self->mark_available($c);
  476. }
  477. delete $stopped{$h};
  478. }
  479. }
  480. return scalar(@{$self->available});
  481. }
  482. sub available
  483. {
  484. return $available;
  485. }
  486. sub can_swallow
  487. {
  488. my ($core, $n) = @_;
  489. $core->{swallow} = $n;
  490. $core->{swallowed} = [];
  491. $core->{realjobs} = $n+1;
  492. $core->host->{swallow}{$core} = $core;
  493. # try to reswallow freed things right away.
  494. if (@$available > 0) {
  495. my @l = @$available;
  496. $available = [];
  497. $core->mark_available(@l);
  498. }
  499. }
  500. sub unswallow
  501. {
  502. my $self = shift;
  503. return unless defined $self->{swallowed};
  504. my $l = $self->{swallowed};
  505. # first prevent the recursive call from taking us into
  506. # account
  507. delete $self->{swallowed};
  508. delete $self->host->{swallow}{$self};
  509. delete $self->{swallow};
  510. delete $self->{realjobs};
  511. # then free up our swallowed jobs
  512. $self->mark_available(@$l);
  513. }
  514. sub mark_available
  515. {
  516. my $self = shift;
  517. LOOP: for my $core (@_) {
  518. # okay, if this core swallowed stuff, then we release
  519. # the swallowed stuff first
  520. $core->unswallow;
  521. # if this host has cores that swallow things, let us
  522. # be swallowed
  523. if ($core->can_be_swallowed) {
  524. for my $c (values %{$core->host->{swallow}}) {
  525. $core->unsquiggle;
  526. push(@{$c->{swallowed}}, $core);
  527. if (--$c->{swallow} == 0) {
  528. delete $core->host->{swallow}{$c};
  529. }
  530. next LOOP;
  531. }
  532. }
  533. push(@{$self->available}, $core);
  534. }
  535. }
  536. sub running
  537. {
  538. return scalar(%$running);
  539. }
  540. sub get
  541. {
  542. my $self = shift;
  543. $a = $self->available;
  544. if (@$a > 1) {
  545. if (DPB::HostProperties->has_sf) {
  546. @$a = sort {$b->sf <=> $a->sf} @$a;
  547. } else {
  548. my %cores;
  549. for my $c (@$a) {
  550. $cores{$c->hostname}++;
  551. }
  552. @$a = sort {$cores{$b->hostname} <=> $cores{$a->hostname}} @$a;
  553. }
  554. }
  555. my $core = shift @$a;
  556. if ($core->may_unsquiggle) {
  557. return $core;
  558. }
  559. if (!$core->{squiggle} && $core->host->{wantsquiggles}) {
  560. if ($core->host->{wantsquiggles} < 1) {
  561. if (rand() <= $core->host->{wantsquiggles}) {
  562. $core->{squiggle} = $core->host->{wantsquiggles};
  563. $core->host->{wantsquiggles} = 0;
  564. }
  565. } else {
  566. $core->host->{wantsquiggles}--;
  567. $core->{squiggle} = 1;
  568. }
  569. }
  570. return $core;
  571. }
  572. sub can_be_swallowed
  573. {
  574. my $core = shift;
  575. return defined $core->host->{swallow};
  576. }
  577. sub may_unsquiggle
  578. {
  579. my $core = shift;
  580. if ($core->{squiggle} && $core->{squiggle} < 1) {
  581. if (rand() >= $core->{squiggle}) {
  582. $core->unsquiggle;
  583. return 1;
  584. }
  585. }
  586. return 0;
  587. }
  588. sub unsquiggle
  589. {
  590. my $core = shift;
  591. if ($core->{squiggle}) {
  592. $core->host->{wantsquiggles} += $core->{squiggle};
  593. delete $core->{squiggle};
  594. }
  595. return $core;
  596. }
  597. sub get_affinity
  598. {
  599. my ($self, $v) = @_;
  600. my $host = $v->{affinity};
  601. my $l = [];
  602. while (@$available > 0) {
  603. my $core = shift @$available;
  604. if ($core->hostname eq $host) {
  605. push(@$available, @$l);
  606. return $core;
  607. }
  608. push(@$l, $core);
  609. }
  610. $available = $l;
  611. return undef
  612. }
  613. sub get_compatible
  614. {
  615. my ($self, $v) = @_;
  616. my $l = [];
  617. while (@$available > 0) {
  618. my $core = shift @$available;
  619. if (!$core->prop->taint_incompatible($v)) {
  620. push(@$available, @$l);
  621. return $core;
  622. }
  623. push(@$l, $core);
  624. }
  625. $available = $l;
  626. return undef
  627. }
  628. my @all_cores = ();
  629. sub all_sf
  630. {
  631. my $l = [];
  632. for my $j (@all_cores) {
  633. next unless $j->is_alive;
  634. push(@$l, $j->sf);
  635. }
  636. return [sort {$a <=> $b} @$l];
  637. }
  638. sub new
  639. {
  640. my ($class, $host, $prop) = @_;
  641. my $o = $class->SUPER::new($host, $prop);
  642. push(@all_cores, $o);
  643. return $o;
  644. }
  645. sub new_noreg
  646. {
  647. my ($class, $host, $prop) = @_;
  648. $class->SUPER::new($host, $prop);
  649. }
  650. sub start_pipe
  651. {
  652. my ($self, $code, $name) = @_;
  653. $self->start_job(DPB::Job::Pipe->new($code, $name));
  654. }
  655. package DPB::Core::Special;
  656. our @ISA = qw(DPB::Core::WithJobs);
  657. sub repository
  658. {
  659. return $special;
  660. }
  661. package DPB::Core::Local;
  662. our @ISA = qw(DPB::Core);
  663. my $host;
  664. sub hostname
  665. {
  666. if (!defined $host) {
  667. chomp($host = `hostname`);
  668. }
  669. return $host;
  670. }
  671. package DPB::Core::Fetcher;
  672. our @ISA = qw(DPB::Core::Local);
  673. my $fetchcores = [];
  674. sub available
  675. {
  676. return $fetchcores;
  677. }
  678. sub may_unsquiggle
  679. {
  680. return 1;
  681. }
  682. sub can_be_swallowed
  683. {
  684. return 0;
  685. }
  686. sub new
  687. {
  688. my ($class, $host, $prop) = @_;
  689. my $c = $class->SUPER::new($host, $prop);
  690. $c->{user} = $prop->{fetch_user};
  691. return $c;
  692. }
  693. package DPB::Core::Clock;
  694. our @ISA = qw(DPB::Core::Special);
  695. sub start
  696. {
  697. my ($class, $reporter) = @_;
  698. my $core = $class->new('localhost');
  699. $core->start_job(DPB::Job::Infinite->new(DPB::Task::Fork->new(sub {
  700. sleep($reporter->timeout);
  701. exit(0);
  702. }), 'clock'));
  703. }
  704. 1;