473 lines
16 KiB
PHP
473 lines
16 KiB
PHP
<?php
|
|
|
|
/*
|
|
* This file is part of the Symfony package.
|
|
*
|
|
* (c) Fabien Potencier <fabien@symfony.com>
|
|
*
|
|
* For the full copyright and license information, please view the LICENSE
|
|
* file that was distributed with this source code.
|
|
*/
|
|
|
|
namespace Symfony\Component\HttpClient\Response;
|
|
|
|
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
|
|
use Symfony\Component\HttpClient\Chunk\FirstChunk;
|
|
use Symfony\Component\HttpClient\Chunk\LastChunk;
|
|
use Symfony\Component\HttpClient\Exception\TransportException;
|
|
use Symfony\Contracts\HttpClient\ChunkInterface;
|
|
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
|
|
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
|
|
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
|
|
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
|
use Symfony\Contracts\HttpClient\ResponseInterface;
|
|
|
|
/**
|
|
* Provides a single extension point to process a response's content stream.
|
|
*
|
|
* @author Nicolas Grekas <p@tchwork.com>
|
|
*/
|
|
final class AsyncResponse implements ResponseInterface, StreamableInterface
|
|
{
|
|
use CommonResponseTrait;
|
|
|
|
private const FIRST_CHUNK_YIELDED = 1;
|
|
private const LAST_CHUNK_YIELDED = 2;
|
|
|
|
private $client;
|
|
private $response;
|
|
private $info = ['canceled' => false];
|
|
private $passthru;
|
|
private $stream;
|
|
private $yieldedState;
|
|
|
|
/**
|
|
* @param ?callable(ChunkInterface, AsyncContext): ?\Iterator $passthru
|
|
*/
|
|
public function __construct(HttpClientInterface $client, string $method, string $url, array $options, ?callable $passthru = null)
|
|
{
|
|
$this->client = $client;
|
|
$this->shouldBuffer = $options['buffer'] ?? true;
|
|
|
|
if (null !== $onProgress = $options['on_progress'] ?? null) {
|
|
$thisInfo = &$this->info;
|
|
$options['on_progress'] = static function (int $dlNow, int $dlSize, array $info, ?\Closure $resolve = null) use (&$thisInfo, $onProgress) {
|
|
$onProgress($dlNow, $dlSize, $thisInfo + $info, $resolve);
|
|
};
|
|
}
|
|
$this->response = $client->request($method, $url, ['buffer' => false] + $options);
|
|
$this->passthru = $passthru;
|
|
$this->initializer = static function (self $response, ?float $timeout = null) {
|
|
if (null === $response->shouldBuffer) {
|
|
return false;
|
|
}
|
|
|
|
while (true) {
|
|
foreach (self::stream([$response], $timeout) as $chunk) {
|
|
if ($chunk->isTimeout() && $response->passthru) {
|
|
// Timeouts thrown during initialization are transport errors
|
|
foreach (self::passthru($response->client, $response, new ErrorChunk($response->offset, new TransportException($chunk->getError()))) as $chunk) {
|
|
if ($chunk->isFirst()) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
continue 2;
|
|
}
|
|
|
|
if ($chunk->isFirst()) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
};
|
|
if (\array_key_exists('user_data', $options)) {
|
|
$this->info['user_data'] = $options['user_data'];
|
|
}
|
|
if (\array_key_exists('max_duration', $options)) {
|
|
$this->info['max_duration'] = $options['max_duration'];
|
|
}
|
|
}
|
|
|
|
public function getStatusCode(): int
|
|
{
|
|
if ($this->initializer) {
|
|
self::initialize($this);
|
|
}
|
|
|
|
return $this->response->getStatusCode();
|
|
}
|
|
|
|
public function getHeaders(bool $throw = true): array
|
|
{
|
|
if ($this->initializer) {
|
|
self::initialize($this);
|
|
}
|
|
|
|
$headers = $this->response->getHeaders(false);
|
|
|
|
if ($throw) {
|
|
$this->checkStatusCode();
|
|
}
|
|
|
|
return $headers;
|
|
}
|
|
|
|
public function getInfo(?string $type = null)
|
|
{
|
|
if (null !== $type) {
|
|
return $this->info[$type] ?? $this->response->getInfo($type);
|
|
}
|
|
|
|
return $this->info + $this->response->getInfo();
|
|
}
|
|
|
|
public function toStream(bool $throw = true)
|
|
{
|
|
if ($throw) {
|
|
// Ensure headers arrived
|
|
$this->getHeaders(true);
|
|
}
|
|
|
|
$handle = function () {
|
|
$stream = $this->response instanceof StreamableInterface ? $this->response->toStream(false) : StreamWrapper::createResource($this->response);
|
|
|
|
return stream_get_meta_data($stream)['wrapper_data']->stream_cast(\STREAM_CAST_FOR_SELECT);
|
|
};
|
|
|
|
$stream = StreamWrapper::createResource($this);
|
|
stream_get_meta_data($stream)['wrapper_data']
|
|
->bindHandles($handle, $this->content);
|
|
|
|
return $stream;
|
|
}
|
|
|
|
public function cancel(): void
|
|
{
|
|
if ($this->info['canceled']) {
|
|
return;
|
|
}
|
|
|
|
$this->info['canceled'] = true;
|
|
$this->info['error'] = 'Response has been canceled.';
|
|
$this->close();
|
|
$client = $this->client;
|
|
$this->client = null;
|
|
|
|
if (!$this->passthru) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
|
|
// no-op
|
|
}
|
|
|
|
$this->passthru = null;
|
|
} catch (ExceptionInterface $e) {
|
|
// ignore any errors when canceling
|
|
}
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
$httpException = null;
|
|
|
|
if ($this->initializer && null === $this->getInfo('error')) {
|
|
try {
|
|
self::initialize($this, -0.0);
|
|
$this->getHeaders(true);
|
|
} catch (HttpExceptionInterface $httpException) {
|
|
// no-op
|
|
}
|
|
}
|
|
|
|
if ($this->passthru && null === $this->getInfo('error')) {
|
|
$this->info['canceled'] = true;
|
|
|
|
try {
|
|
foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
|
|
// no-op
|
|
}
|
|
} catch (ExceptionInterface $e) {
|
|
// ignore any errors when destructing
|
|
}
|
|
}
|
|
|
|
if (null !== $httpException) {
|
|
throw $httpException;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @internal
|
|
*/
|
|
public static function stream(iterable $responses, ?float $timeout = null, ?string $class = null): \Generator
|
|
{
|
|
while ($responses) {
|
|
$wrappedResponses = [];
|
|
$asyncMap = new \SplObjectStorage();
|
|
$client = null;
|
|
|
|
foreach ($responses as $r) {
|
|
if (!$r instanceof self) {
|
|
throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r)));
|
|
}
|
|
|
|
if (null !== $e = $r->info['error'] ?? null) {
|
|
yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e));
|
|
$chunk->didThrow() ?: $chunk->getContent();
|
|
continue;
|
|
}
|
|
|
|
if (null === $client) {
|
|
$client = $r->client;
|
|
} elseif ($r->client !== $client) {
|
|
throw new TransportException('Cannot stream AsyncResponse objects with many clients.');
|
|
}
|
|
|
|
$asyncMap[$r->response] = $r;
|
|
$wrappedResponses[] = $r->response;
|
|
|
|
if ($r->stream) {
|
|
yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
|
|
|
|
if (!isset($asyncMap[$response])) {
|
|
array_pop($wrappedResponses);
|
|
}
|
|
|
|
if ($r->response !== $response && !isset($asyncMap[$r->response])) {
|
|
$asyncMap[$r->response] = $r;
|
|
$wrappedResponses[] = $r->response;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!$client || !$wrappedResponses) {
|
|
return;
|
|
}
|
|
|
|
foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) {
|
|
$r = $asyncMap[$response];
|
|
|
|
if (null === $chunk->getError()) {
|
|
if ($chunk->isFirst()) {
|
|
// Ensure no exception is thrown on destruct for the wrapped response
|
|
$r->response->getStatusCode();
|
|
} elseif (0 === $r->offset && null === $r->content && $chunk->isLast()) {
|
|
$r->content = fopen('php://memory', 'w+');
|
|
}
|
|
}
|
|
|
|
if (!$r->passthru) {
|
|
if (null !== $chunk->getError() || $chunk->isLast()) {
|
|
unset($asyncMap[$response]);
|
|
} elseif (null !== $r->content && '' !== ($content = $chunk->getContent()) && \strlen($content) !== fwrite($r->content, $content)) {
|
|
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
|
|
$r->info['error'] = $chunk->getError();
|
|
$r->response->cancel();
|
|
}
|
|
|
|
yield $r => $chunk;
|
|
continue;
|
|
}
|
|
|
|
if (null !== $chunk->getError()) {
|
|
// no-op
|
|
} elseif ($chunk->isFirst()) {
|
|
$r->yieldedState = self::FIRST_CHUNK_YIELDED;
|
|
} elseif (self::FIRST_CHUNK_YIELDED !== $r->yieldedState && null === $chunk->getInformationalStatus()) {
|
|
throw new \LogicException(sprintf('Instance of "%s" is already consumed and cannot be managed by "%s". A decorated client should not call any of the response\'s methods in its "request()" method.', get_debug_type($response), $class ?? static::class));
|
|
}
|
|
|
|
foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
|
|
yield $r => $chunk;
|
|
}
|
|
|
|
if ($r->response !== $response && isset($asyncMap[$response])) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (null === $chunk->getError() && $chunk->isLast()) {
|
|
$r->yieldedState = self::LAST_CHUNK_YIELDED;
|
|
}
|
|
if (null === $chunk->getError() && self::LAST_CHUNK_YIELDED !== $r->yieldedState && $r->response === $response && null !== $r->client) {
|
|
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
|
|
}
|
|
|
|
$responses = [];
|
|
foreach ($asyncMap as $response) {
|
|
$r = $asyncMap[$response];
|
|
|
|
if (null !== $r->client) {
|
|
$responses[] = $asyncMap[$response];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
|
|
*/
|
|
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, ?\SplObjectStorage $asyncMap = null): \Generator
|
|
{
|
|
$r->stream = null;
|
|
$response = $r->response;
|
|
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
|
|
if (null === $stream = ($r->passthru)($chunk, $context)) {
|
|
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
|
|
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
if (!$stream instanceof \Iterator) {
|
|
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
|
|
}
|
|
$r->stream = $stream;
|
|
|
|
yield from self::passthruStream($response, $r, null, $asyncMap);
|
|
}
|
|
|
|
/**
|
|
* @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
|
|
*/
|
|
private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
|
|
{
|
|
while (true) {
|
|
try {
|
|
if (null !== $chunk && $r->stream) {
|
|
$r->stream->next();
|
|
}
|
|
|
|
if (!$r->stream || !$r->stream->valid() || !$r->stream) {
|
|
$r->stream = null;
|
|
break;
|
|
}
|
|
} catch (\Throwable $e) {
|
|
unset($asyncMap[$response]);
|
|
$r->stream = null;
|
|
$r->info['error'] = $e->getMessage();
|
|
$r->response->cancel();
|
|
|
|
yield $r => $chunk = new ErrorChunk($r->offset, $e);
|
|
$chunk->didThrow() ?: $chunk->getContent();
|
|
break;
|
|
}
|
|
|
|
$chunk = $r->stream->current();
|
|
|
|
if (!$chunk instanceof ChunkInterface) {
|
|
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
|
|
}
|
|
|
|
if (null !== $chunk->getError()) {
|
|
// no-op
|
|
} elseif ($chunk->isFirst()) {
|
|
$e = $r->openBuffer();
|
|
|
|
yield $r => $chunk;
|
|
|
|
if ($r->initializer && null === $r->getInfo('error')) {
|
|
// Ensure the HTTP status code is always checked
|
|
$r->getHeaders(true);
|
|
}
|
|
|
|
if (null === $e) {
|
|
continue;
|
|
}
|
|
|
|
$r->response->cancel();
|
|
$chunk = new ErrorChunk($r->offset, $e);
|
|
} elseif ('' !== $content = $chunk->getContent()) {
|
|
if (null !== $r->shouldBuffer) {
|
|
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
|
|
}
|
|
|
|
if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
|
|
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
|
|
$r->info['error'] = $chunk->getError();
|
|
$r->response->cancel();
|
|
}
|
|
}
|
|
|
|
if (null !== $chunk->getError() || $chunk->isLast()) {
|
|
$stream = $r->stream;
|
|
$r->stream = null;
|
|
unset($asyncMap[$response]);
|
|
}
|
|
|
|
if (null === $chunk->getError()) {
|
|
$r->offset += \strlen($content);
|
|
|
|
yield $r => $chunk;
|
|
|
|
if (!$chunk->isLast()) {
|
|
continue;
|
|
}
|
|
|
|
$stream->next();
|
|
|
|
if ($stream->valid()) {
|
|
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
|
|
}
|
|
|
|
$r->passthru = null;
|
|
} else {
|
|
if ($chunk instanceof ErrorChunk) {
|
|
$chunk->didThrow(false);
|
|
} else {
|
|
try {
|
|
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
|
|
} catch (TransportExceptionInterface $e) {
|
|
$chunk = new ErrorChunk($chunk->getOffset(), $e);
|
|
}
|
|
}
|
|
|
|
yield $r => $chunk;
|
|
$chunk->didThrow() ?: $chunk->getContent();
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
private function openBuffer(): ?\Throwable
|
|
{
|
|
if (null === $shouldBuffer = $this->shouldBuffer) {
|
|
throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.');
|
|
}
|
|
|
|
$e = $this->shouldBuffer = null;
|
|
|
|
if ($shouldBuffer instanceof \Closure) {
|
|
try {
|
|
$shouldBuffer = $shouldBuffer($this->getHeaders(false));
|
|
|
|
if (null !== $e = $this->response->getInfo('error')) {
|
|
throw new TransportException($e);
|
|
}
|
|
} catch (\Throwable $e) {
|
|
$this->info['error'] = $e->getMessage();
|
|
$this->response->cancel();
|
|
}
|
|
}
|
|
|
|
if (true === $shouldBuffer) {
|
|
$this->content = fopen('php://temp', 'w+');
|
|
} elseif (\is_resource($shouldBuffer)) {
|
|
$this->content = $shouldBuffer;
|
|
}
|
|
|
|
return $e;
|
|
}
|
|
|
|
private function close(): void
|
|
{
|
|
$this->response->cancel();
|
|
}
|
|
}
|