EachPromise.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. <?php
  2. declare(strict_types=1);
  3. namespace GuzzleHttp\Promise;
  4. /**
  5. * Represents a promise that iterates over many promises and invokes
  6. * side-effect functions in the process.
  7. *
  8. * @final
  9. */
  10. class EachPromise implements PromisorInterface
  11. {
  12. private $pending = [];
  13. private $nextPendingIndex = 0;
  14. /** @var \Iterator|null */
  15. private $iterable;
  16. /** @var callable|int|null */
  17. private $concurrency;
  18. /** @var callable|null */
  19. private $onFulfilled;
  20. /** @var callable|null */
  21. private $onRejected;
  22. /** @var Promise|null */
  23. private $aggregate;
  24. /** @var bool|null */
  25. private $mutex;
  26. /**
  27. * Configuration hash can include the following key value pairs:
  28. *
  29. * - fulfilled: (callable) Invoked when a promise fulfills. The function
  30. * is invoked with three arguments: the fulfillment value, the index
  31. * position from the iterable list of the promise, and the aggregate
  32. * promise that manages all of the promises. The aggregate promise may
  33. * be resolved from within the callback to short-circuit the promise.
  34. * - rejected: (callable) Invoked when a promise is rejected. The
  35. * function is invoked with three arguments: the rejection reason, the
  36. * index position from the iterable list of the promise, and the
  37. * aggregate promise that manages all of the promises. The aggregate
  38. * promise may be resolved from within the callback to short-circuit
  39. * the promise.
  40. * - concurrency: (integer) Pass this configuration option to limit the
  41. * allowed number of outstanding concurrently executing promises,
  42. * creating a capped pool of promises. There is no limit by default.
  43. *
  44. * @param mixed $iterable Promises or values to iterate.
  45. * @param array $config Configuration options
  46. */
  47. public function __construct($iterable, array $config = [])
  48. {
  49. $this->iterable = Create::iterFor($iterable);
  50. if (isset($config['concurrency'])) {
  51. $this->concurrency = $config['concurrency'];
  52. }
  53. if (isset($config['fulfilled'])) {
  54. $this->onFulfilled = $config['fulfilled'];
  55. }
  56. if (isset($config['rejected'])) {
  57. $this->onRejected = $config['rejected'];
  58. }
  59. }
  60. /** @psalm-suppress InvalidNullableReturnType */
  61. public function promise(): PromiseInterface
  62. {
  63. if ($this->aggregate) {
  64. return $this->aggregate;
  65. }
  66. try {
  67. $this->createPromise();
  68. /** @psalm-assert Promise $this->aggregate */
  69. $this->iterable->rewind();
  70. $this->refillPending();
  71. } catch (\Throwable $e) {
  72. $this->aggregate->reject($e);
  73. }
  74. /**
  75. * @psalm-suppress NullableReturnStatement
  76. */
  77. return $this->aggregate;
  78. }
  79. private function createPromise(): void
  80. {
  81. $this->mutex = false;
  82. $this->aggregate = new Promise(function (): void {
  83. if ($this->checkIfFinished()) {
  84. return;
  85. }
  86. reset($this->pending);
  87. // Consume a potentially fluctuating list of promises while
  88. // ensuring that indexes are maintained (precluding array_shift).
  89. while ($promise = current($this->pending)) {
  90. next($this->pending);
  91. $promise->wait();
  92. if (Is::settled($this->aggregate)) {
  93. return;
  94. }
  95. }
  96. });
  97. // Clear the references when the promise is resolved.
  98. $clearFn = function (): void {
  99. $this->iterable = $this->concurrency = $this->pending = null;
  100. $this->onFulfilled = $this->onRejected = null;
  101. $this->nextPendingIndex = 0;
  102. };
  103. $this->aggregate->then($clearFn, $clearFn);
  104. }
  105. private function refillPending(): void
  106. {
  107. if (!$this->concurrency) {
  108. // Add all pending promises.
  109. while ($this->addPending() && $this->advanceIterator()) {
  110. }
  111. return;
  112. }
  113. // Add only up to N pending promises.
  114. $concurrency = is_callable($this->concurrency)
  115. ? ($this->concurrency)(count($this->pending))
  116. : $this->concurrency;
  117. $concurrency = max($concurrency - count($this->pending), 0);
  118. // Concurrency may be set to 0 to disallow new promises.
  119. if (!$concurrency) {
  120. return;
  121. }
  122. // Add the first pending promise.
  123. $this->addPending();
  124. // Note this is special handling for concurrency=1 so that we do
  125. // not advance the iterator after adding the first promise. This
  126. // helps work around issues with generators that might not have the
  127. // next value to yield until promise callbacks are called.
  128. while (--$concurrency
  129. && $this->advanceIterator()
  130. && $this->addPending()) {
  131. }
  132. }
  133. private function addPending(): bool
  134. {
  135. if (!$this->iterable || !$this->iterable->valid()) {
  136. return false;
  137. }
  138. $promise = Create::promiseFor($this->iterable->current());
  139. $key = $this->iterable->key();
  140. // Iterable keys may not be unique, so we use a counter to
  141. // guarantee uniqueness
  142. $idx = $this->nextPendingIndex++;
  143. $this->pending[$idx] = $promise->then(
  144. function ($value) use ($idx, $key): void {
  145. if ($this->onFulfilled) {
  146. ($this->onFulfilled)(
  147. $value,
  148. $key,
  149. $this->aggregate
  150. );
  151. }
  152. $this->step($idx);
  153. },
  154. function ($reason) use ($idx, $key): void {
  155. if ($this->onRejected) {
  156. ($this->onRejected)(
  157. $reason,
  158. $key,
  159. $this->aggregate
  160. );
  161. }
  162. $this->step($idx);
  163. }
  164. );
  165. return true;
  166. }
  167. private function advanceIterator(): bool
  168. {
  169. // Place a lock on the iterator so that we ensure to not recurse,
  170. // preventing fatal generator errors.
  171. if ($this->mutex) {
  172. return false;
  173. }
  174. $this->mutex = true;
  175. try {
  176. $this->iterable->next();
  177. $this->mutex = false;
  178. return true;
  179. } catch (\Throwable $e) {
  180. $this->aggregate->reject($e);
  181. $this->mutex = false;
  182. return false;
  183. }
  184. }
  185. private function step(int $idx): void
  186. {
  187. // If the promise was already resolved, then ignore this step.
  188. if (Is::settled($this->aggregate)) {
  189. return;
  190. }
  191. unset($this->pending[$idx]);
  192. // Only refill pending promises if we are not locked, preventing the
  193. // EachPromise to recursively invoke the provided iterator, which
  194. // cause a fatal error: "Cannot resume an already running generator"
  195. if ($this->advanceIterator() && !$this->checkIfFinished()) {
  196. // Add more pending promises if possible.
  197. $this->refillPending();
  198. }
  199. }
  200. private function checkIfFinished(): bool
  201. {
  202. if (!$this->pending && !$this->iterable->valid()) {
  203. // Resolve the promise if there's nothing left to do.
  204. $this->aggregate->resolve(null);
  205. return true;
  206. }
  207. return false;
  208. }
  209. }