| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 | 
							- <?php
 
- namespace GuzzleHttp\Promise;
 
- /**
 
-  * Represents a promise that iterates over many promises and invokes
 
-  * side-effect functions in the process.
 
-  */
 
- class EachPromise implements PromisorInterface
 
- {
 
-     private $pending = [];
 
-     private $nextPendingIndex = 0;
 
-     /** @var \Iterator|null */
 
-     private $iterable;
 
-     /** @var callable|int|null */
 
-     private $concurrency;
 
-     /** @var callable|null */
 
-     private $onFulfilled;
 
-     /** @var callable|null */
 
-     private $onRejected;
 
-     /** @var Promise|null */
 
-     private $aggregate;
 
-     /** @var bool|null */
 
-     private $mutex;
 
-     /**
 
-      * Configuration hash can include the following key value pairs:
 
-      *
 
-      * - fulfilled: (callable) Invoked when a promise fulfills. The function
 
-      *   is invoked with three arguments: the fulfillment value, the index
 
-      *   position from the iterable list of the promise, and the aggregate
 
-      *   promise that manages all of the promises. The aggregate promise may
 
-      *   be resolved from within the callback to short-circuit the promise.
 
-      * - rejected: (callable) Invoked when a promise is rejected. The
 
-      *   function is invoked with three arguments: the rejection reason, the
 
-      *   index position from the iterable list of the promise, and the
 
-      *   aggregate promise that manages all of the promises. The aggregate
 
-      *   promise may be resolved from within the callback to short-circuit
 
-      *   the promise.
 
-      * - concurrency: (integer) Pass this configuration option to limit the
 
-      *   allowed number of outstanding concurrently executing promises,
 
-      *   creating a capped pool of promises. There is no limit by default.
 
-      *
 
-      * @param mixed $iterable Promises or values to iterate.
 
-      * @param array $config   Configuration options
 
-      */
 
-     public function __construct($iterable, array $config = [])
 
-     {
 
-         $this->iterable = Create::iterFor($iterable);
 
-         if (isset($config['concurrency'])) {
 
-             $this->concurrency = $config['concurrency'];
 
-         }
 
-         if (isset($config['fulfilled'])) {
 
-             $this->onFulfilled = $config['fulfilled'];
 
-         }
 
-         if (isset($config['rejected'])) {
 
-             $this->onRejected = $config['rejected'];
 
-         }
 
-     }
 
-     /** @psalm-suppress InvalidNullableReturnType */
 
-     public function promise()
 
-     {
 
-         if ($this->aggregate) {
 
-             return $this->aggregate;
 
-         }
 
-         try {
 
-             $this->createPromise();
 
-             /** @psalm-assert Promise $this->aggregate */
 
-             $this->iterable->rewind();
 
-             $this->refillPending();
 
-         } catch (\Throwable $e) {
 
-             /**
 
-              * @psalm-suppress NullReference
 
-              * @phpstan-ignore-next-line
 
-              */
 
-             $this->aggregate->reject($e);
 
-         } catch (\Exception $e) {
 
-             /**
 
-              * @psalm-suppress NullReference
 
-              * @phpstan-ignore-next-line
 
-              */
 
-             $this->aggregate->reject($e);
 
-         }
 
-         /**
 
-          * @psalm-suppress NullableReturnStatement
 
-          * @phpstan-ignore-next-line
 
-          */
 
-         return $this->aggregate;
 
-     }
 
-     private function createPromise()
 
-     {
 
-         $this->mutex = false;
 
-         $this->aggregate = new Promise(function () {
 
-             if ($this->checkIfFinished()) {
 
-                 return;
 
-             }
 
-             reset($this->pending);
 
-             // Consume a potentially fluctuating list of promises while
 
-             // ensuring that indexes are maintained (precluding array_shift).
 
-             while ($promise = current($this->pending)) {
 
-                 next($this->pending);
 
-                 $promise->wait();
 
-                 if (Is::settled($this->aggregate)) {
 
-                     return;
 
-                 }
 
-             }
 
-         });
 
-         // Clear the references when the promise is resolved.
 
-         $clearFn = function () {
 
-             $this->iterable = $this->concurrency = $this->pending = null;
 
-             $this->onFulfilled = $this->onRejected = null;
 
-             $this->nextPendingIndex = 0;
 
-         };
 
-         $this->aggregate->then($clearFn, $clearFn);
 
-     }
 
-     private function refillPending()
 
-     {
 
-         if (!$this->concurrency) {
 
-             // Add all pending promises.
 
-             while ($this->addPending() && $this->advanceIterator());
 
-             return;
 
-         }
 
-         // Add only up to N pending promises.
 
-         $concurrency = is_callable($this->concurrency)
 
-             ? call_user_func($this->concurrency, count($this->pending))
 
-             : $this->concurrency;
 
-         $concurrency = max($concurrency - count($this->pending), 0);
 
-         // Concurrency may be set to 0 to disallow new promises.
 
-         if (!$concurrency) {
 
-             return;
 
-         }
 
-         // Add the first pending promise.
 
-         $this->addPending();
 
-         // Note this is special handling for concurrency=1 so that we do
 
-         // not advance the iterator after adding the first promise. This
 
-         // helps work around issues with generators that might not have the
 
-         // next value to yield until promise callbacks are called.
 
-         while (--$concurrency
 
-             && $this->advanceIterator()
 
-             && $this->addPending());
 
-     }
 
-     private function addPending()
 
-     {
 
-         if (!$this->iterable || !$this->iterable->valid()) {
 
-             return false;
 
-         }
 
-         $promise = Create::promiseFor($this->iterable->current());
 
-         $key = $this->iterable->key();
 
-         // Iterable keys may not be unique, so we use a counter to
 
-         // guarantee uniqueness
 
-         $idx = $this->nextPendingIndex++;
 
-         $this->pending[$idx] = $promise->then(
 
-             function ($value) use ($idx, $key) {
 
-                 if ($this->onFulfilled) {
 
-                     call_user_func(
 
-                         $this->onFulfilled,
 
-                         $value,
 
-                         $key,
 
-                         $this->aggregate
 
-                     );
 
-                 }
 
-                 $this->step($idx);
 
-             },
 
-             function ($reason) use ($idx, $key) {
 
-                 if ($this->onRejected) {
 
-                     call_user_func(
 
-                         $this->onRejected,
 
-                         $reason,
 
-                         $key,
 
-                         $this->aggregate
 
-                     );
 
-                 }
 
-                 $this->step($idx);
 
-             }
 
-         );
 
-         return true;
 
-     }
 
-     private function advanceIterator()
 
-     {
 
-         // Place a lock on the iterator so that we ensure to not recurse,
 
-         // preventing fatal generator errors.
 
-         if ($this->mutex) {
 
-             return false;
 
-         }
 
-         $this->mutex = true;
 
-         try {
 
-             $this->iterable->next();
 
-             $this->mutex = false;
 
-             return true;
 
-         } catch (\Throwable $e) {
 
-             $this->aggregate->reject($e);
 
-             $this->mutex = false;
 
-             return false;
 
-         } catch (\Exception $e) {
 
-             $this->aggregate->reject($e);
 
-             $this->mutex = false;
 
-             return false;
 
-         }
 
-     }
 
-     private function step($idx)
 
-     {
 
-         // If the promise was already resolved, then ignore this step.
 
-         if (Is::settled($this->aggregate)) {
 
-             return;
 
-         }
 
-         unset($this->pending[$idx]);
 
-         // Only refill pending promises if we are not locked, preventing the
 
-         // EachPromise to recursively invoke the provided iterator, which
 
-         // cause a fatal error: "Cannot resume an already running generator"
 
-         if ($this->advanceIterator() && !$this->checkIfFinished()) {
 
-             // Add more pending promises if possible.
 
-             $this->refillPending();
 
-         }
 
-     }
 
-     private function checkIfFinished()
 
-     {
 
-         if (!$this->pending && !$this->iterable->valid()) {
 
-             // Resolve the promise if there's nothing left to do.
 
-             $this->aggregate->resolve(null);
 
-             return true;
 
-         }
 
-         return false;
 
-     }
 
- }
 
 
  |