How to Read Contents of Readable Stream Javascript

Streams—The definitive guide

Acquire how to employ readable, writable, and transform streams with the Streams API.

— Updated

The Streams API allows you to programmatically access streams of information received over the network or created by whatever means locally and procedure them with JavaScript. Streaming involves breaking down a resource that yous want to receive, send, or transform into modest chunks, and then processing these chunks bit by bit. While streaming is something browsers do anyway when receiving assets like HTML or videos to be shown on webpages, this capability has never been available to JavaScript before fetch with streams was introduced in 2015.

Previously, if you wanted to process a resources of some kind (be information technology a video, or a text file, etc.), you would have to download the entire file, look for it to exist deserialized into a suitable format, and so process it. With streams beingness available to JavaScript, this all changes. You can now process raw data with JavaScript progressively as presently equally it is available on the client, without needing to generate a buffer, string, or blob. This unlocks a number of employ cases, some of which I list below:

  • Video furnishings: piping a readable video stream through a transform stream that applies furnishings in existent time.
  • Data (de)compression: piping a file stream through a transform stream that selectively (de)compresses information technology.
  • Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into bitmap data, and and then through some other transform stream that translates bitmaps into PNGs. If installed inside the fetch handler of a service worker, this allows you lot to transparently polyfill new image formats like AVIF.

Core concepts #

Before I become into details on the various types of streams, let me innovate some cadre concepts.

Chunks #

A chunk is a single piece of information that is written to or read from a stream. It can be of any type; streams tin fifty-fifty incorporate chunks of different types. Virtually of the time, a chunk will not be the most atomic unit of data for a given stream. For instance, a byte stream might incorporate chunks consisting of xvi KiB Uint8Array units, instead of single bytes.

Readable streams #

A readable stream represents a source of data from which you can read. In other words, information comes out of a readable stream. Concretely, a readable stream is an instance of the ReadableStream class.

Writable streams #

A writable stream represents a destination for data into which y'all tin write. In other words, data goes in to a writable stream. Concretely, a writable stream is an instance of the WritableStream grade.

Transform streams #

A transform stream consists of a pair of streams: a writable stream, known equally its writable side, and a readable stream, known as its readable side. A existent-world metaphor for this would be a simultaneous interpreter who translates from one linguistic communication to another on-the-fly. In a manner specific to the transform stream, writing to the writable side results in new data existence fabricated available for reading from the readable side. Concretely, whatever object with a writable property and a readable property can serve as a transform stream. However, the standard TransformStream class makes it easier to create such a pair that is properly entangled.

Pipage chains #

Streams are primarily used by pipe them to each other. A readable stream can be piped directly to a writable stream, using the readable stream's pipeTo() method, or it tin be piped through one or more transform streams first, using the readable stream's pipeThrough() method. A prepare of streams piped together in this mode is referred to as a pipe chain.

Backpressure #

Once a pipage chain is constructed, it will propagate signals regarding how fast chunks should flow through information technology. If any stride in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe concatenation, until eventually the original source is told to terminate producing chunks and so fast. This procedure of normalizing flow is called backpressure.

Teeing #

A readable stream can be teed (named afterward the shape of an upper-case letter 'T') using its tee() method. This will lock the stream, that is, make information technology no longer straight usable; however, information technology will create two new streams, called branches, which can be consumed independently. Teeing as well is important considering streams cannot be rewound or restarted, more nigh this later.

Diagram of a pipe chain consisting of a readable stream coming from a call to the fetch API that is then piped through a transform stream whose output is teed and then sent to the browser for the first resulting readable stream and to the service worker cache for the second resulting readable stream.
A pipage chain.

The mechanics of a readable stream #

A readable stream is a data source represented in JavaScript by a ReadableStream object that flows from an underlying source. The ReadableStream() constructor creates and returns a readable stream object from the given handlers. There are two types of underlying source:

  • Push button sources constantly push data at you when you accept accessed them, and it is up to you to start, intermission, or abolish admission to the stream. Examples include live video streams, server-sent events, or WebSockets.
  • Pull sources require yous to explicitly request data from them once connected to. Examples include HTTP operations via fetch() or XMLHttpRequest calls.

Stream data is read sequentially in small pieces chosen chunks. The chunks placed in a stream are said to be enqueued. This means they are waiting in a queue set up to be read. An internal queue keeps rails of the chunks that take not yet been read.

A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue. The queuing strategy assigns a size to each chunk, and compares the total size of all chunks in the queue to a specified number, known as the high water marker.

The chunks inside the stream are read by a reader. This reader retrieves the information 1 chunk at a fourth dimension, allowing you to do whatever kind of performance y'all want to practise on it. The reader plus the other processing lawmaking that goes forth with it is called a consumer.

The next construct in this context is called a controller. Each readable stream has an associated controller that, equally the proper noun suggests, allows you lot to control the stream.

Simply one reader can read a stream at a time; when a reader is created and starts reading a stream (that is, becomes an active reader), it is locked to it. If yous desire another reader to take over reading your stream, you typically need to release the first reader earlier you practise anything else (although you can tee streams).

Creating a readable stream #

You create a readable stream by calling its constructor ReadableStream(). The constructor has an optional argument underlyingSource, which represents an object with methods and properties that ascertain how the constructed stream example volition carry.

The underlyingSource #

This tin can use the following optional, programmer-defined methods:

  • starting time(controller): Called immediately when the object is constructed. The method can access the stream source, and do anything else required to set upwards the stream functionality. If this procedure is to be done asynchronously, the method tin can render a hope to signal success or failure. The controller parameter passed to this method is a ReadableStreamDefaultController.
  • pull(controller): Can be used to control the stream as more chunks are fetched. It is chosen repeatedly as long as the stream'southward internal queue of chunks is non full, upwardly until the queue reaches its high water marking. If the result of calling pull() is a promise, pull() will not exist called over again until said hope fulfills. If the hope rejects, the stream volition go errored.
  • abolish(reason): Called when the stream consumer cancels the stream.
                          const              readableStream              =              new              ReadableStream              (              {              
start ( controller ) {
/* … */
} ,

pull ( controller ) {
/* … */
} ,

cancel ( reason ) {
/* … */
} ,
} ) ;

The ReadableStreamDefaultController supports the following methods:

  • ReadableStreamDefaultController.shut() closes the associated stream.
  • ReadableStreamDefaultController.enqueue() enqueues a given chunk in the associated stream.
  • ReadableStreamDefaultController.error() causes any futurity interactions with the associated stream to error.
                          /* … */              
commencement ( controller ) {
controller. enqueue ( 'The beginning clamper!' ) ;
} ,
/* … */

The queuingStrategy #

The second, likewise optional, argument of the ReadableStream() constructor is queuingStrategy. Information technology is an object that optionally defines a queuing strategy for the stream, which takes 2 parameters:

  • highWaterMark: A non-negative number indicating the high water mark of the stream using this queuing strategy.
  • size(clamper): A function that computes and returns the finite non-negative size of the given clamper value. The result is used to make up one's mind backpressure, manifesting via the appropriate ReadableStreamDefaultController.desiredSize property. Information technology also governs when the underlying source'southward pull() method is called.
                          const              readableStream              =              new              ReadableStream              (              {              
/* … */
} ,
{
highWaterMark: 10 ,
size ( chunk ) {
return chunk.length;
} ,
} ,
) ;

The getReader() and read() methods #

To read from a readable stream, you need a reader, which will be a ReadableStreamDefaultReader. The getReader() method of the ReadableStream interface creates a reader and locks the stream to it. While the stream is locked, no other reader can be acquired until this one is released.

The read() method of the ReadableStreamDefaultReader interface returns a hope providing admission to the side by side chunk in the stream'south internal queue. It fulfills or rejects with a result depending on the state of the stream. The unlike possibilities are equally follows:

  • If a chunk is available, the promise volition be fulfilled with an object of the class
    { value: chunk, done: imitation }.
  • If the stream becomes closed, the promise will exist fulfilled with an object of the grade
    { value: undefined, done: true }.
  • If the stream becomes errored, the hope will be rejected with the relevant fault.
                          const              reader              =              readableStream.              getReader              (              )              ;              
while ( true ) {
const { done, value } = await reader. read ( ) ;
if (washed) {
console. log ( 'The stream is done.' ) ;
break ;
}
panel. log ( 'Just read a chunk:' , value) ;
}

The locked holding #

You lot can check if a readable stream is locked by accessing its ReadableStream.locked property.

                          const              locked              =              readableStream.locked;              
panel. log ( ` The stream is ${locked ? 'indeed' : 'not' } locked. ` ) ;

Readable stream code samples #

The lawmaking sample below shows all the steps in activity. You lot first create a ReadableStream that in its underlyingSource argument (that is, the TimestampSource grade) defines a start() method. This method tells the stream'south controller to enqueue() a timestamp every second during x seconds. Finally, it tells the controller to close() the stream. You consume this stream by creating a reader via the getReader() method and calling read() until the stream is washed.

                          form              TimestampSource              {              
#interval

outset ( controller ) {
this .#interval = setInterval ( ( ) => {
const string = new Date ( ) . toLocaleTimeString ( ) ;
// Add the string to the stream.
controller. enqueue (cord) ;
console. log ( ` Enqueued ${string} ` ) ;
} , 1_000 ) ;

setTimeout ( ( ) => {
clearInterval ( this .#interval) ;
// Shut the stream afterward 10s.
controller. close ( ) ;
} , 10_000 ) ;
}

cancel ( ) {
// This is called if the reader cancels.
clearInterval ( this .#interval) ;
}
}

const stream = new ReadableStream ( new TimestampSource ( ) ) ;

async function concatStringStream ( stream ) {
permit consequence = '' ;
const reader = stream. getReader ( ) ;
while ( true ) {
// The `read()` method returns a hope that
// resolves when a value has been received.
const { done, value } = await reader. read ( ) ;
// Effect objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `washed` is `true`.
if (done) return result;
result += value;
console. log ( ` Read ${issue.length} characters so far ` ) ;
panel. log ( ` Most recently read chunk: ${value} ` ) ;
}
}
concatStringStream (stream) . so ( ( outcome ) => panel. log ( 'Stream consummate' , consequence) ) ;

Asynchronous iteration #

Checking upon each read() loop iteration if the stream is done may non be the most convenient API. Luckily there will before long be a ameliorate style to do this: asynchronous iteration.

                          for              expect              (              const              chunk              of              stream)              {              
panel. log (chunk) ;
}

A workaround to use asynchronous iteration today is to implement the beliefs with a helper office. This allows you to use the feature in your code as shown in the snippet below.

                          function              streamAsyncIterator              (              stream              )              {              
// Go a lock on the stream:
const reader = stream. getReader ( ) ;

return {
next ( ) {
// Stream reads already resolve with {washed, value}, so
// nosotros tin just call read:
render reader. read ( ) ;
} ,
return ( ) {
// Release the lock if the iterator terminates.
reader. releaseLock ( ) ;
return { } ;
} ,
// for-await calls this on whatever it's passed, then
// iterators tend to return themselves.
[Symbol.asyncIterator] ( ) {
return this ;
} ,
} ;
}

async function example ( ) {
const response = look fetch (url) ;
for await ( const chunk of streamAsyncIterator (response.body) ) {
console. log (chunk) ;
}
}

Teeing a readable stream #

The tee() method of the ReadableStream interface tees the current readable stream, returning a ii-element array containing the two resulting branches as new ReadableStream instances. This allows two readers to read a stream simultaneously. You might do this, for example, in a service worker if you want to fetch a response from the server and stream it to the browser, only also stream information technology to the service worker cache. Since a response torso cannot be consumed more than once, you lot need two copies to do this. To cancel the stream, you then demand to cancel both resulting branches. Teeing a stream will generally lock it for the duration, preventing other readers from locking it.

                          const              readableStream              =              new              ReadableStream              (              {              
start ( controller ) {
// Called by constructor.
console. log ( '[start]' ) ;
controller. enqueue ( 'a' ) ;
controller. enqueue ( 'b' ) ;
controller. enqueue ( 'c' ) ;
} ,
pull ( controller ) {
// Called `read()` when the controller'due south queue is empty.
console. log ( '[pull]' ) ;
controller. enqueue ( 'd' ) ;
controller. close ( ) ;
} ,
cancel ( reason ) {
// Called when the stream is canceled.
panel. log ( '[cancel]' , reason) ;
} ,
} ) ;

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream. tee ( ) ;

// Read streamA iteratively one by 1. Typically, you
// would not do it this style, but you certainly tin can.
const readerA = streamA. getReader ( ) ;
panel. log ( '[A]' , await readerA. read ( ) ) ; //=> {value: "a", done: false}
console. log ( '[A]' , await readerA. read ( ) ) ; //=> {value: "b", washed: faux}
console. log ( '[A]' , await readerA. read ( ) ) ; //=> {value: "c", done: imitation}
panel. log ( '[A]' , expect readerA. read ( ) ) ; //=> {value: "d", done: false}
panel. log ( '[A]' , expect readerA. read ( ) ) ; //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB. getReader ( ) ;
while ( true ) {
const result = await readerB. read ( ) ;
if (result.done) break ;
console. log ( '[B]' , result) ;
}

Readable byte streams #

For streams representing bytes, an extended version of the readable stream is provided to handle bytes efficiently, in particular by minimizing copies. Byte streams allow for bring-your-own-buffer (BYOB) readers to be acquired. The default implementation tin can give a range of different outputs such as strings or array buffers in the case of WebSockets, whereas byte streams guarantee byte output. In improver, BYOB readers have stability benefits. This is because if a buffer detaches, it can guarantee that ane does not write into the same buffer twice, hence fugitive race conditions. BYOB readers can reduce the number of times the browser needs to run garbage collection, because information technology can reuse buffers.

Creating a readable byte stream #

Yous can create a readable byte stream by passing an boosted type parameter to the ReadableStream() constructor.

                          new              ReadableStream              (              {              type:              'bytes'              }              )              ;                      

The underlyingSource #

The underlying source of a readable byte stream is given a ReadableByteStreamController to manipulate. Its ReadableByteStreamController.enqueue() method takes a clamper argument whose value is an ArrayBufferView. The holding ReadableByteStreamController.byobRequest returns the current BYOB pull request, or null if there is none. Finally, the ReadableByteStreamController.desiredSize property returns the desired size to fill the controlled stream's internal queue.

The queuingStrategy #

The 2d, besides optional, argument of the ReadableStream() constructor is queuingStrategy. It is an object that optionally defines a queuing strategy for the stream, which takes i parameter:

  • highWaterMark: A non-negative number of bytes indicating the loftier water mark of the stream using this queuing strategy. This is used to decide backpressure, manifesting via the advisable ReadableByteStreamController.desiredSize property. It besides governs when the underlying source'southward pull() method is called.

The getReader() and read() methods #

You can then get admission to a ReadableStreamBYOBReader by setting the manner parameter accordingly: ReadableStream.getReader({ mode: "byob" }). This allows for more than precise control over buffer allocation in club to avert copies. To read from the byte stream, you need to call ReadableStreamBYOBReader.read(view), where view is an ArrayBufferView.

Readable byte stream code sample #

                          const              reader              =              readableStream.              getReader              (              {              way:              "byob"              }              )              ;              

let startingAB = new ArrayBuffer ( 1_024 ) ;
const buffer = await readInto (startingAB) ;
console. log ( "The get-go 1024 bytes, or less:" , buffer) ;

async office readInto ( buffer ) {
let commencement = 0 ;

while (get-go < buffer.byteLength) {
const { value: view, done } =
wait reader. read ( new Uint8Array (buffer, beginning, buffer.byteLength - starting time) ) ;
buffer = view.buffer;
if (washed) {
break ;
}
offset += view.byteLength;
}

render buffer;
}

The following role returns readable byte streams that allow for efficient zero-copy reading of a randomly generated array. Instead of using a predetermined clamper size of 1,024, it attempts to make full the developer-supplied buffer, allowing for total control.

                          const              DEFAULT_CHUNK_SIZE              =              1_024              ;              

function makeReadableByteStream ( ) {
render new ReadableStream ( {
blazon: 'bytes' ,

pull ( controller ) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes information technology to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto. getRandomValues (view) ;
controller.byobRequest. reply (view.byteLength) ;
} ,

autoAllocateChunkSize: DEFAULT_CHUNK_SIZE ,
} ) ;
}

The mechanics of a writable stream #

A writable stream is a destination into which you tin write data, represented in JavaScript past a WritableStream object. This serves as an abstraction over the elevation of an underlying sink—a lower-level I/O sink into which raw information is written.

The data is written to the stream via a author, 1 chunk at a fourth dimension. A chunk can take a multitude of forms, just like the chunks in a reader. You can employ whatever code yous like to produce the chunks ready for writing; the author plus the associated lawmaking is called a producer.

When a writer is created and starts writing to a stream (an active writer), it is said to exist locked to it. Simply one writer can write to a writable stream at one time. If y'all want another writer to kickoff writing to your stream, you lot typically demand to release information technology, before y'all and so attach another writer to it.

An internal queue keeps track of the chunks that have been written to the stream but not yet been processed by the underlying sink.

A queuing strategy is an object that determines how a stream should signal backpressure based on the country of its internal queue. The queuing strategy assigns a size to each chunk, and compares the full size of all chunks in the queue to a specified number, known as the loftier water marking.

The last construct is called a controller. Each writable stream has an associated controller that allows you to control the stream (for example, to abort it).

Creating a writable stream #

The WritableStream interface of the Streams API provides a standard brainchild for writing streaming data to a destination, known equally a sink. This object comes with congenital-in backpressure and queuing. You create a writable stream by calling its constructor WritableStream(). It has an optional underlyingSink parameter, which represents an object with methods and backdrop that define how the constructed stream instance will behave.

The underlyingSink #

The underlyingSink tin can include the following optional, developer-defined methods. The controller parameter passed to some of the methods is a WritableStreamDefaultController.

  • start(controller): This method is called immediately when the object is constructed. The contents of this method should aim to become access to the underlying sink. If this process is to be done asynchronously, it can return a promise to point success or failure.
  • write(clamper, controller): This method will be chosen when a new chunk of data (specified in the chunk parameter) is ready to be written to the underlying sink. Information technology can return a promise to bespeak success or failure of the write operation. This method will be chosen only after previous writes have succeeded, and never after the stream is airtight or aborted.
  • close(controller): This method will be called if the app signals that it has finished writing chunks to the stream. The contents should do whatever is necessary to finalize writes to the underlying sink, and release access to it. If this procedure is asynchronous, information technology can return a promise to signal success or failure. This method will be called only later all queued-up writes have succeeded.
  • abort(reason): This method will be called if the app signals that it wishes to abruptly close the stream and put information technology in an errored state. It tin clean up any held resources, much like close(), merely arrest() will be called fifty-fifty if writes are queued up. Those chunks will be thrown abroad. If this process is asynchronous, it can render a promise to bespeak success or failure. The reason parameter contains a DOMString describing why the stream was aborted.
                          const              writableStream              =              new              WritableStream              (              {              
outset ( controller ) {
/* … */
} ,

write ( chunk, controller ) {
/* … */
} ,

close ( controller ) {
/* … */
} ,

arrest ( reason ) {
/* … */
} ,
} ) ;

The WritableStreamDefaultController interface of the Streams API represents a controller allowing control of a WritableStream's state during set upwardly, equally more than chunks are submitted for writing, or at the terminate of writing. When amalgam a WritableStream, the underlying sink is given a corresponding WritableStreamDefaultController instance to dispense. The WritableStreamDefaultController has merely one method: WritableStreamDefaultController.error(), which causes any future interactions with the associated stream to error. WritableStreamDefaultController also supports a signal property which returns an example of AbortSignal, allowing a WritableStream operation to exist stopped if needed.

                          /* … */              
write ( chunk, controller ) {
effort {
// Attempt to do something unsafe with `chunk`.
} catch (mistake) {
controller. error (error.message) ;
}
} ,
/* … */

The queuingStrategy #

The second, likewise optional, argument of the WritableStream() constructor is queuingStrategy. It is an object that optionally defines a queuing strategy for the stream, which takes two parameters:

  • highWaterMark: A non-negative number indicating the loftier water mark of the stream using this queuing strategy.
  • size(chunk): A function that computes and returns the finite non-negative size of the given chunk value. The result is used to determine backpressure, manifesting via the appropriate WritableStreamDefaultWriter.desiredSize holding.

The getWriter() and write() methods #

To write to a writable stream, you need a author, which will be a WritableStreamDefaultWriter. The getWriter() method of the WritableStream interface returns a new instance of WritableStreamDefaultWriter and locks the stream to that instance. While the stream is locked, no other author tin can be acquired until the current one is released.

The write() method of the WritableStreamDefaultWriter interface writes a passed chunk of data to a WritableStream and its underlying sink, then returns a promise that resolves to indicate the success or failure of the write operation. Note that what "success" means is up to the underlying sink; it might indicate that the chunk has been accepted, and non necessarily that it is safely saved to its ultimate destination.

                          const              author              =              writableStream.              getWriter              (              )              ;              
const resultPromise = writer. write ( 'The commencement chunk!' ) ;

The locked property #

You tin can cheque if a writable stream is locked by accessing its WritableStream.locked belongings.

                          const              locked              =              writableStream.locked;              
console. log ( ` The stream is ${locked ? 'indeed' : 'not' } locked. ` ) ;

Writable stream code sample #

The code sample below shows all steps in action.

                          const              writableStream              =              new              WritableStream              (              {              
start ( controller ) {
console. log ( '[commencement]' ) ;
} ,
async write ( clamper, controller ) {
panel. log ( '[write]' , clamper) ;
// Expect for next write.
await new Hope ( ( resolve ) => setTimeout ( ( ) => {
document.torso.textContent += chunk;
resolve ( ) ;
} , 1_000 ) ) ;
} ,
close ( controller ) {
console. log ( '[close]' ) ;
} ,
abort ( reason ) {
console. log ( '[abort]' , reason) ;
} ,
} ) ;

const writer = writableStream. getWriter ( ) ;
const start = Date. now ( ) ;
for ( const char of 'abcdefghijklmnopqrstuvwxyz' ) {
// Expect to add to the write queue.
await author.ready;
console. log ( '[gear up]' , Date. now ( ) - outset, 'ms' ) ;
// The Promise is resolved after the write finishes.
writer. write (char) ;
}
await writer. shut ( ) ;

Piping a readable stream to a writable stream #

A readable stream can exist piped to a writable stream through the readable stream's pipeTo() method. ReadableStream.pipeTo() pipes the electric current ReadableStreamto a given WritableStream and returns a promise that fulfills when the pipe process completes successfully, or rejects if whatsoever errors were encountered.

                          const              readableStream              =              new              ReadableStream              (              {              
beginning ( controller ) {
// Chosen by constructor.
console. log ( '[start readable]' ) ;
controller. enqueue ( 'a' ) ;
controller. enqueue ( 'b' ) ;
controller. enqueue ( 'c' ) ;
} ,
pull ( controller ) {
// Called when controller's queue is empty.
console. log ( '[pull]' ) ;
controller. enqueue ( 'd' ) ;
controller. close ( ) ;
} ,
cancel ( reason ) {
// Called when the stream is canceled.
console. log ( '[cancel]' , reason) ;
} ,
} ) ;

const writableStream = new WritableStream ( {
kickoff ( controller ) {
// Called past constructor
console. log ( '[start writable]' ) ;
} ,
async write ( chunk, controller ) {
// Called upon author.write()
console. log ( '[write]' , chunk) ;
// Look for adjacent write.
await new Promise ( ( resolve ) => setTimeout ( ( ) => {
certificate.trunk.textContent += chunk;
resolve ( ) ;
} , 1_000 ) ) ;
} ,
shut ( controller ) {
console. log ( '[shut]' ) ;
} ,
arrest ( reason ) {
panel. log ( '[arrest]' , reason) ;
} ,
} ) ;

look readableStream. pipeTo (writableStream) ;
console. log ( '[finished]' ) ;

Creating a transform stream #

The TransformStream interface of the Streams API represents a set of transformable data. Yous create a transform stream by calling its constructor TransformStream(), which creates and returns a transform stream object from the given handlers. The TransformStream() constructor accepts as its starting time statement an optional JavaScript object representing the transformer. Such objects can contain any of the following methods:

The transformer #

  • start(controller): This method is called immediately when the object is synthetic. Typically this is used to enqueue prefix chunks, using controller.enqueue(). Those chunks will exist read from the readable side simply practice not depend on whatever writes to the writable side. If this initial process is asynchronous, for instance because it takes some endeavor to learn the prefix chunks, the function can render a promise to signal success or failure; a rejected hope will error the stream. Any thrown exceptions will be re-thrown past the TransformStream() constructor.
  • transform(chunk, controller): This method is called when a new chunk originally written to the writable side is fix to be transformed. The stream implementation guarantees that this part volition be chosen simply afterward previous transforms have succeeded, and never earlier first() has completed or after flush() has been called. This function performs the bodily transformation work of the transform stream. It can enqueue the results using controller.enqueue(). This permits a single chunk written to the writable side to consequence in zilch or multiple chunks on the readable side, depending on how many times controller.enqueue() is called. If the procedure of transforming is asynchronous, this function can render a promise to point success or failure of the transformation. A rejected hope volition fault both the readable and writable sides of the transform stream. If no transform() method is supplied, the identity transform is used, which enqueues chunks unchanged from the writable side to the readable side.
  • flush(controller): This method is called after all chunks written to the writable side take been transformed by successfully passing through transform(), and the writable side is nearly to be closed. Typically this is used to enqueue suffix chunks to the readable side, before that as well becomes closed. If the flushing process is asynchronous, the part can return a promise to signal success or failure; the result will exist communicated to the caller of stream.writable.write(). Additionally, a rejected promise volition error both the readable and writable sides of the stream. Throwing an exception is treated the same as returning a rejected promise.
                          const              transformStream              =              new              TransformStream              (              {              
showtime ( controller ) {
/* … */
} ,

transform ( clamper, controller ) {
/* … */
} ,

flush ( controller ) {
/* … */
} ,
} ) ;

The writableStrategy and readableStrategy queueing strategies #

The 2d and tertiary optional parameters of the TransformStream() constructor are optional writableStrategy and readableStrategy queueing strategies. They are defined as outlined in the readable and the writable stream sections respectively.

Transform stream code sample #

The post-obit lawmaking sample shows a unproblematic transform stream in activity.

                          // Annotation that `TextEncoderStream` and `TextDecoderStream` exist now.              
// This example shows how you lot would have washed it before.
const textEncoderStream = new TransformStream ( {
transform ( clamper, controller ) {
console. log ( '[transform]' , chunk) ;
controller. enqueue ( new TextEncoder ( ) . encode (clamper) ) ;
} ,
affluent ( controller ) {
console. log ( '[flush]' ) ;
controller. stop ( ) ;
} ,
} ) ;

( async ( ) => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;

const writer = writeStream. getWriter ( ) ;
for ( const char of 'abc' ) {
writer. write (char) ;
}
author. shut ( ) ;

const reader = readStream. getReader ( ) ;
for ( let result = await reader. read ( ) ; !result.done; result = await reader. read ( ) ) {
panel. log ( '[value]' , result.value) ;
}
} ) ( ) ;

Pipe a readable stream through a transform stream #

The pipeThrough() method of the ReadableStream interface provides a chainable way of piping the current stream through a transform stream or any other writable/readable pair. Piping a stream will mostly lock information technology for the duration of the piping, preventing other readers from locking it.

                          const              transformStream              =              new              TransformStream              (              {              
transform ( chunk, controller ) {
console. log ( '[transform]' , chunk) ;
controller. enqueue ( new TextEncoder ( ) . encode (chunk) ) ;
} ,
flush ( controller ) {
panel. log ( '[flush]' ) ;
controller. cease ( ) ;
} ,
} ) ;

const readableStream = new ReadableStream ( {
start ( controller ) {
// called by constructor
console. log ( '[start]' ) ;
controller. enqueue ( 'a' ) ;
controller. enqueue ( 'b' ) ;
controller. enqueue ( 'c' ) ;
} ,
pull ( controller ) {
// chosen read when controller'due south queue is empty
console. log ( '[pull]' ) ;
controller. enqueue ( 'd' ) ;
controller. close ( ) ; // or controller.error();
} ,
abolish ( reason ) {
// called when rs.cancel(reason)
panel. log ( '[cancel]' , reason) ;
} ,
} ) ;

( async ( ) => {
const reader = readableStream. pipeThrough (transformStream) . getReader ( ) ;
for ( let result = expect reader. read ( ) ; !upshot.done; consequence = await reader. read ( ) ) {
console. log ( '[value]' , consequence.value) ;
}
} ) ( ) ;

The next lawmaking sample (a bit contrived) shows how you could implement a "shouting" version of fetch() that uppercases all text by consuming the returned response promise every bit a stream and uppercasing chunk past chunk. The advantage of this approach is that you do not need to await for the whole document to be downloaded, which tin can make a huge deviation when dealing with large files.

                          part              upperCaseStream              (              )              {              
return new TransformStream ( {
transform ( clamper, controller ) {
controller. enqueue (chunk. toUpperCase ( ) ) ;
} ,
} ) ;
}

role appendToDOMStream ( el ) {
return new WritableStream ( {
write ( chunk ) {
el. suspend (clamper) ;
}
} ) ;
}

fetch ( './lorem-ipsum.txt' ) . then ( ( response ) =>
response.body
. pipeThrough ( new TextDecoderStream ( ) )
. pipeThrough ( upperCaseStream ( ) )
. pipeTo ( appendToDOMStream (document.body) )
) ;

Browser support and polyfill #

Support for the Streams API in browsers varies. Be sure to cheque Can I use for detailed compatibility data. Note that some browsers simply have partial implementations of certain features, so exist sure to check the data thoroughly.

The good news is that there is a reference implementation bachelor and a polyfill targeted at product utilize.

Demo #

The demo beneath shows readable, writable, and transform streams in action. It likewise includes examples of pipeThrough() and pipeTo() piping chains, and also demonstrates tee(). You can optionally run the demo in its own window or view the source code.

Useful streams bachelor in the browser #

At that place are a number of useful streams congenital right into the browser. You can hands create a ReadableStream from a blob. The Hulk interface's stream() method returns a ReadableStream which upon reading returns the data contained within the blob. Also recall that a File object is a specific kind of a Blob, and tin be used in any context that a blob can.

                          const              readableStream              =              new              Hulk              (              [              'hello world'              ]              ,              {              type:              'text/plain'              }              )              .              stream              (              )              ;                      

The streaming variants of TextDecoder.decode() and TextEncoder.encode() are called TextDecoderStream and TextEncoderStream respectively.

                          const              response              =              await              fetch              (              'https://streams.spec.whatwg.org/'              )              ;              
const decodedStream = response.body. pipeThrough ( new TextDecoderStream ( ) ) ;

Compressing or decompressing a file is easy with the CompressionStream and DecompressionStream transform streams respectively. The code sample below shows how you can download the Streams spec, shrink (gzip) it right in the browser, and write the compressed file directly to disk.

                          const              response              =              await              fetch              (              'https://streams.spec.whatwg.org/'              )              ;              
const readableStream = response.trunk;
const compressedStream = readableStream. pipeThrough ( new CompressionStream ( 'gzip' ) ) ;

const fileHandle = await showSaveFilePicker ( ) ;
const writableStream = wait fileHandle. createWritable ( ) ;
compressedStream. pipeTo (writableStream) ;

The File Arrangement Access API'due south FileSystemWritableFileStream and the experimental fetch() asking streams are examples of writable streams in the wild.

The Serial API makes heavy use of both readable and writable streams.

                          // Prompt user to select whatsoever series port.              
const port = await navigator.serial. requestPort ( ) ;
// Wait for the series port to open up.
await port. open ( { baudRate: 9_600 } ) ;
const reader = port.readable. getReader ( ) ;

// Listen to data coming from the serial device.
while ( truthful ) {
const { value, washed } = expect reader. read ( ) ;
if (done) {
// Allow the serial port to be closed after.
reader. releaseLock ( ) ;
break ;
}
// value is a Uint8Array.
console. log (value) ;
}

// Write to the series port.
const writer = port.writable. getWriter ( ) ;
const data = new Uint8Array ( [ 104 , 101 , 108 , 108 , 111 ] ) ; // hello
wait writer. write (information) ;
// Allow the serial port to be airtight after.
writer. releaseLock ( ) ;

Finally, the WebSocketStream API integrates streams with the WebSocket API.

                          const              wss              =              new              WebSocketStream              (              WSS_URL              )              ;              
const { readable, writable } = expect wss.connection;
const reader = readable. getReader ( ) ;
const writer = writable. getWriter ( ) ;

while ( true ) {
const { value, done } = wait reader. read ( ) ;
if (done) {
interruption ;
}
const event = wait process (value) ;
look writer. write (upshot) ;
}

Useful resource #

  • Streams specification
  • Accompanying demos
  • Streams polyfill
  • 2016—the twelvemonth of spider web streams
  • Async iterators and generators
  • Stream Visualizer

Acknowledgements #

This commodity was reviewed by Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, and Adam Rice. Jake Archibald'southward blog posts have helped me a lot in understanding streams. Some of the code samples are inspired by GitHub user @bellbind'southward explorations and parts of the prose build heavily on the MDN Spider web Docs on Streams. The Streams Standard's authors have washed a tremendous task on writing this spec. Hero image by Ryan Lara on Unsplash.

Last updated: — Improve article

Return to all manufactures

vesselsfroment84.blogspot.com

Source: https://web.dev/streams/

0 Response to "How to Read Contents of Readable Stream Javascript"

Publicar un comentario

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel