Bytesrw cookbook

A few convention and recipes for dealing with byte stream reader and writers.

Contracts and conventions

Reader and writer function contract

The contract between a reader and its slice enumerating function. The contract between a writer and its slice iterating function.

Reader and writer client contracts

The contract between a reader and its client. The contract between a writer and its client.

Reader and writer filters conventions

The reader filter conventions and the writer filter conventions.

Applying filters to strings

Reader filters can easily be applied to strings with Bytesrw.Bytes.Reader.filter_string:

let id s =
  let filters = Bytesrw_zstd.[compress_reads (); decompress_reads ()] in
  Bytes.Reader.filter_string filters s

This can also be done with writer filters by using Bytesrw.Bytes.Writer.filter_string:

let id s =
  let filters = Bytesrw_zstd.[decompress_writes (); compress_writes ()] in
  Bytes.Writer.filter_string filters s

Checksumming streams

The pattern for checksumming streams is to apply an identity but side-effecting filter (also known as a tap) on a reader or writer and return a state value. The state value is updated whenever the resulting reader or writer is read or written.

The example below shows how to combine BLAKE3 checksumming with zstd compression.

This checksums the data before compressing it:

let blake3_and_compress ~plain =
  try
    let plain, blake3 = Bytesrw_blake3.Blake3.reads plain in
    let comp = Bytesrw_zstd.compress_reads () plain in
    let comp = Bytes.Reader.to_string comp in
    Ok (comp, Bytesrw_blake3.Blake3.value blake3)
  with
  | Bytes.Stream.Error e -> Bytes.Stream.error_to_result e

This checksums the data after decompressing it:

let decompress_and_blake3 ~comp =
  try
    let plain = Bytesrw_zstd.decompress_reads () comp in
    let r, blake3 = Bytesrw_blake3.Blake3.reads plain in
    let s = Bytes.Reader.to_string r in
    Ok (s, Bytesrw_blake3.Blake3.value blake3)
  with
  | Bytes.Stream.Error e -> Bytes.Stream.error_to_result e

By re-odering the operations in the two functions above you could equally have applied the checksum on the compressed data or even have checksums for both compressed and decompressed data.

Limiting streams

If you need to limit resource consumption, readers and writers can be bounded with Bytesrw.Bytes.Reader.limit and Bytesrw.Bytes.Writer.limit.

For example this makes sure that the zstd decompressed size of comp does not exceed quota bytes. If it does we still return the truncated decompressed data so far.

let limited_decompress ~quota ~comp =
  let buf = Buffer.create quota in
  try
    let plain = Bytesrw_zstd.decompress_reads () comp in
    let () = Bytes.Reader.add_to_buffer buf (Bytes.Reader.limit quota plain) in
    Ok (`Data (Buffer.contents buf))
  with
  |  Bytes.Stream.Error (Bytes.Stream.Limit _quota, _) ->
      Ok (`Quota_exceeded (Buffer.contents buf))
  |  Bytes.Stream.Error e -> Bytes.Stream.error_to_result e

Tapping streams

Tapping streams allow to observe the slices that fly-by in a stream. The functions Bytesrw.Bytes.Reader.tap and Bytesrw.Bytes.Writer.tap can be used to tap readers and writers. Taps are just identity stream filters with a side-effect.

Note that readers that result from tap will not tap the push backs that are performed on them. This is a good property if you are using taps for checksumming.

See also tracing streams.

Tracing streams

Tracing streams can easily be done by tapping them with Bytesrw.Bytes.Slice.tracer. For example the following will trace the slices of r or w on stderr.

let rtrace ~id r = Bytes.Reader.tap (Bytes.Slice.tracer ~id) r
let wtrace ~id w = Bytes.Writer.tap (Bytes.Slice.tracer ~id) w

Adding your own stream error

Here is a blueprint you can use to define your own stream error.

module Myformat : sig

  (** {1:errors Errors} *)

  type Bytesrw.Bytes.Stream.error +=
  | Error of string (** *)
  (** The type for [myformat] stream errors. *)

  (** {1:streams Streams} *)

  (* … *)
end = struct
  type Bytes.Stream.error += Error of string

  let format_error =
    let case msg = Error msg in
    let message = function Error msg -> msg | _ -> assert false in
    Bytes.Stream.make_format_error ~format:"myformat" ~case ~message

  let error e = Bytes.Stream.error format_error e
  let reader_error r e = Bytes.Reader.error format_error r e
  let writer_error w e = Bytes.Writer.error format_error w e
end

In your code you can now selectively pattern match on these errors with

try … with
| Bytes.Stream.Error (Myformat.Error msg, _) -> …

More on the design of the stream error system can be found in the design notes.