Bytesrw
cookbookA few convention and recipes for dealing with byte stream reader and writers.
The contract between a reader and its slice enumerating function. The contract between a writer and its slice iterating function.
The contract between a reader and its client. The contract between a writer and its client.
The reader filter conventions and the writer filter conventions.
Reader filters can easily be applied to strings with Bytesrw.Bytes.Reader.filter_string
:
let id s =
let filters = Bytesrw_zstd.[compress_reads (); decompress_reads ()] in
Bytes.Reader.filter_string filters s
This can also be done with writer filters by using Bytesrw.Bytes.Writer.filter_string
:
let id s =
let filters = Bytesrw_zstd.[decompress_writes (); compress_writes ()] in
Bytes.Writer.filter_string filters s
The pattern for checksumming streams is to apply an identity but side-effecting filter (also known as a tap) on a reader or writer and return a state value. The state value is updated whenever the resulting reader or writer is read or written.
The example below shows how to combine BLAKE3
checksumming with zstd
compression.
This checksums the data before compressing it:
let blake3_and_compress ~plain =
try
let plain, blake3 = Bytesrw_blake3.Blake3.reads plain in
let comp = Bytesrw_zstd.compress_reads () plain in
let comp = Bytes.Reader.to_string comp in
Ok (comp, Bytesrw_blake3.Blake3.value blake3)
with
| Bytes.Stream.Error e -> Bytes.Stream.error_to_result e
This checksums the data after decompressing it:
let decompress_and_blake3 ~comp =
try
let plain = Bytesrw_zstd.decompress_reads () comp in
let r, blake3 = Bytesrw_blake3.Blake3.reads plain in
let s = Bytes.Reader.to_string r in
Ok (s, Bytesrw_blake3.Blake3.value blake3)
with
| Bytes.Stream.Error e -> Bytes.Stream.error_to_result e
By re-odering the operations in the two functions above you could equally have applied the checksum on the compressed data or even have checksums for both compressed and decompressed data.
If you need to limit resource consumption, readers and writers can be bounded with Bytesrw.Bytes.Reader.limit
and Bytesrw.Bytes.Writer.limit
.
For example this makes sure that the zstd
decompressed size of comp
does not exceed quota
bytes. If it does we still return the truncated decompressed data so far.
let limited_decompress ~quota ~comp =
let buf = Buffer.create quota in
try
let plain = Bytesrw_zstd.decompress_reads () comp in
let () = Bytes.Reader.add_to_buffer buf (Bytes.Reader.limit quota plain) in
Ok (`Data (Buffer.contents buf))
with
| Bytes.Stream.Error (Bytes.Stream.Limit _quota, _) ->
Ok (`Quota_exceeded (Buffer.contents buf))
| Bytes.Stream.Error e -> Bytes.Stream.error_to_result e
Tapping streams allow to observe the slices that fly-by in a stream. The functions Bytesrw.Bytes.Reader.tap
and Bytesrw.Bytes.Writer.tap
can be used to tap readers and writers. Taps are just identity stream filters with a side-effect.
Note that readers that result from tap will not tap the push backs that are performed on them. This is a good property if you are using taps for checksumming.
See also tracing streams.
Tracing streams can easily be done by tapping them with Bytesrw.Bytes.Slice.tracer
. For example the following will trace the slices of r
or w
on stderr
.
let rtrace ~id r = Bytes.Reader.tap (Bytes.Slice.tracer ~id) r
let wtrace ~id w = Bytes.Writer.tap (Bytes.Slice.tracer ~id) w
Here is a blueprint you can use to define your own stream error.
module Myformat : sig
(** {1:errors Errors} *)
type Bytesrw.Bytes.Stream.error +=
| Error of string (** *)
(** The type for [myformat] stream errors. *)
(** {1:streams Streams} *)
(* … *)
end = struct
type Bytes.Stream.error += Error of string
let format_error =
let case msg = Error msg in
let message = function Error msg -> msg | _ -> assert false in
Bytes.Stream.make_format_error ~format:"myformat" ~case ~message
let error e = Bytes.Stream.error format_error e
let reader_error r e = Bytes.Reader.error format_error r e
let writer_error w e = Bytes.Writer.error format_error w e
end
In your code you can now selectively pattern match on these errors with
try … with
| Bytes.Stream.Error (Myformat.Error msg, _) -> …
More on the design of the stream error system can be found in the design notes.