PromiseStream

Build Status

ReactPHP 在Promise-land和Stream-land之间缺少的链接。

目录

用法

这个轻量级的库仅包含一些简单的功能。 所有功能都位于React\Promise\Stream命名空间下。

以下示例假定您使用与此类似的导入语句:

use React\Promise\Stream;

Stream\buffer(…);

或者,也可以用它们完整的命名空间来引用它们:

\React\Promise\Stream\buffer(…);

buffer()

buffer(ReadableStreamInterface<string> $stream, ?int $maxLength = null): PromiseInterface<string,Exception> 函数可用于创建一个Promise,它可以用流数据缓冲区解析。

$stream = accessSomeJsonStream();

Stream\buffer($stream)->then(function ($contents) {
    var_dump(json_decode($contents));
});

流关闭后,Promise将与所有连接的数据块一起解析。

如果流已经关闭,则Promise将以空字符串解析。

如果流发出错误,则Promise将拒绝。

如果Promise被取消,则Promise将被拒绝。

$maxLength参数(可选)默认为无限制。 如果设定了最大长度,并且流在结束之前发出更多数据, 则将通过OverflowException拒绝诺言。

$stream = accessSomeToLargeStream();

Stream\buffer($stream, 1024)->then(function ($contents) {
    var_dump(json_decode($contents));
}, function ($error) {
    //当流缓冲区超过最大大小时或当流发生错误时触发此处逻辑
    //在此示例中为1024字节,
});

first()

first(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<mixed,Exception> 函数可用于创建Promise,该Promise将在设定事件首次触发时解析。

$stream = accessSomeJsonStream();

Stream\first($stream)->then(function ($chunk) {
    echo 'The first chunk arrived: ' . $chunk;
});

无论第一个事件发出什么,promise都将解析,如果该事件不传递任何数据,则使用null。 如果不传递自定义事件名称,则它将等待第一个data事件,并使用包含第一个数据块的字符串进行解析。

如果流发出错误,则promise将拒绝 - 除error事件。

一旦流关闭,promise就会被拒绝 - 除close事件。

如果流已关闭,则promise将被拒绝。

如果promise被取消,它将被拒绝。

all()

all(ReadableStreamInterface|WritableStreamInterface $stream, string $event = 'data'): PromiseInterface<array,Exception> 函数可用于创建一个Promise,该函数用一个包含所有事件数据的数组进行解析。

$stream = accessSomeJsonStream();

Stream\all($stream)->then(function ($chunks) {
    echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
});

promise将使用一个数组来解析所有发出的事件,如果事件不传递任何数据,则使用null。 如果不传递自定义事件名称,则它将等待所有data事件,并使用包含所有数据块的数组进行解析。

一旦流关闭,promise将用数组解析。

如果流已经关闭,则promise将使用空数组解析。

如果流发出错误,promise将被拒绝。

如果promise被取消,它将被拒绝。

unwrapReadable()

unwrapReadable(PromiseInterface<ReadableStreamInterface,Exception> $promise): ReadableStreamInterface 函数可用于解包解析为ReadableStreamInterfacepromise

该函数立即返回一个可读的流实例(实现ReadableStreamInterface),用作将来的承诺解析的代理。 一旦Promise通过ReadableStreamInterface解析,其数据将通过管道传输到输出流。

//$promise = someFunctionWhichResolvesWithAStream();
$promise = startDownloadStream($uri);

$stream = Stream\unwrapReadable($promise);

$stream->on('data', function ($data) {
    echo $data;
});

$stream->on('end', function () {
    echo 'DONE';
});

如果promise被拒绝或通过ReadableStreamInterface以外的任何实例实现, 那么输出流将发出一个error事件并关闭:

$promise = startDownloadStream($invalidUri);

$stream = Stream\unwrapReadable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});

这里的$promise应该是挂起的,也就是说,在调用这个函数时,它不应该被履行或拒绝。 如果给定的承诺已经履行,并且没有使用ReadableStreamInterface实例解决,那么您将无法接收到error事件。 您可以随时close()结果流,这将尝试cancel()未完成的承诺或尝试close()基础流。

$promise = startDownloadStream($uri);

$stream = Stream\unwrapReadable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});

unwrapWritable()

unwrapWritable(PromiseInterface<WritableStreamInterface,Exception> $promise): WritableStreamInterface 函数可用于解包解析为WritableStreamInterfacepromise

该函数立即返回一个可写的流实例(实现WritableStreamInterface),用作将来的承诺解析的代理。 对Promise的解析将在此实例中进行的所有写操作都缓存在内存中。 一旦Promise通过WritableStreamInterface解析,您写入代理的所有数据都将透明地转发到内部流。

//$promise = someFunctionWhichResolvesWithAStream();
$promise = startUploadStream($uri);

$stream = Stream\unwrapWritable($promise);

$stream->write('hello');
$stream->end('world');

$stream->on('close', function () {
    echo 'DONE';
});

如果给定的承诺已经履行,并且没有使用WritableStreamInterface实例解决,则输出流将发出一个error事件并关闭:

$promise = startUploadStream($invalidUri);

$stream = Stream\unwrapWritable($promise);

$stream->on('error', function (Exception $error) {
    echo 'Error: ' . $error->getMessage();
});

给定的$promise应该是挂起的,也就是说,在调用这个函数时,它不应该被履行或拒绝。 如果给定的承诺已经履行,并且没有使用WritableStreamInterface实例解决,那么您将无法接收error事件。

您可以随时close()结果流,这将尝试cancel()未完成的承诺或尝试close()基础流。

$promise = startUploadStream($uri);

$stream = Stream\unwrapWritable($promise);

$loop->addTimer(2.0, function () use ($stream) {
    $stream->close();
});

安装

推荐的安装这个库的方法是通过ComposerComposer 新手?

该项目遵循SemVer , 默认安装最新支持的版本:

$ composer require react/promise-stream:^1.2

有关版本升级的详细信息,请参见CHANGELOG

该项目旨在在任何平台上运行,因此不需要任何PHP扩展,并支持通过 PHP 7+HHVM在旧版PHP 5.3上运行。

强烈推荐在这个项目中使用PHP 7+

测试

要运行测试套件,首先需要克隆这个存储库,然后安装所有依赖项通过Composer:

$ composer install

要运行测试套件,请转到项目根目录并运行:

$ php vendor/bin/phpunit

License

MIT, see LICENSE file.

results matching ""

    No results matching ""

    results matching ""

      No results matching ""