lhns / fs2-compress   2.2.1

Apache License 2.0 GitHub

Compression Algorithms for Fs2

Scala versions: 3.x 2.13 2.12
Scala.js versions: 1.x

fs2-compress

Typelevel Affiliate Project build Release Notes Maven Central Apache License 2.0 Scala Steward badge

Integrations for several compression algorithms with Fs2.

Usage

build.sbt

libraryDependencies += "de.lhns" %% "fs2-compress-gzip" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-zip" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-zip4j" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-tar" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-bzip2" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-zstd" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-brotli" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-brotli4j" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-lz4" % "2.2.1"

Concepts

This library introduces the following abstractions in order to work with several different compression algorithms and archive methods.

Compression

Compressor

The Compressor typeclass abstracts the compression of a stream of bytes.

trait Compressor[F[_]] {
  def compress: Pipe[F, Byte, Byte]
}

Passing a stream of bytes through the Compressor.compress pipe will result in a compressed stream of bytes. 🎉

Decompressor

The Decompressor typeclass abstracts the decompression of a stream of bytes.

trait Decompressor[F[_]] {
  def decompress: Pipe[F, Byte, Byte]
}

Passing a stream of bytes through the Decompressor.decompress pipe will result in a decompressed stream of bytes. 🎉

Archives

The library also provides abstractions for working with archive formats. An archive is a collection of files and directories which may or may not also include compression depending on the archive format.

ArchiveEntry

An ArchiveEntry represents a file or directory in an archive. It has the following signature:

case class ArchiveEntry[+Size[A] <: Option[A], Underlying](name: String, uncompressedSize: Size[Long], underlying: Underlying, ...)

The Size type parameter is used to encode whether the size of the entry is known or not. For some archive formats the size of an entry must be known in advance, and as such the relevant Archiver will require that the Size type parameter is Some.

Archiver

The Archiver typeclass abstracts the creation of an archive from a stream of ArchiveEntry paired with the relevant data.

trait Archiver[F[_], Size[A] <: Option[A]] {
  def archive: Pipe[F, (ArchiveEntry[Size, Any], Stream[F, Byte]), Byte]
}

Unarchiver

The Unarchiver typeclass abstracts the extraction of an archive into a stream of ArchiveEntry paired with the relevant data.

trait Unarchiver[F[_], Size[A] <: Option[A], Underlying] {
  def unarchive: Pipe[F, Byte, (ArchiveEntry[Size, Underlying], Stream[F, Byte])]
}

Examples

The following examples does not check that the paths used are valid. For real world applications you will probably want to add some checks to that effect.

Compression

Compression can be abstracted over using the Compressor typeclass. Adapt the following examples based on which compression algorithm you want to use.

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

// implicit def bzip2[F[_]: Async]: Compressor[F] = Bzip2Compressor.make()
// implicit def lz4[F[_]: Async]: Compressor[F] = Lz4Compressor.make()
// implicit def zstd[F[_]: Async]: Compressor[F] = ZstdCompressor.make()
implicit def gzip[F[_]: Async]: Compressor[F] = GzipCompressor.make()

def compressFile[F[_]: Async](toCompress: Path, writeTo: Path)(implicit compressor: Compressor[F]): F[Unit] =
  Files[F]
    .readAll(toCompress)
    .through(compressor.compress)
    .through(Files[F].writeAll(writeTo))
    .compile
    .drain

Decompression

Similarly, decompression can be abstracted over using the Decompressor typeclass. Adapt the following examples based on which compression algorithm was used to write the source file.

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

// implicit def brotli[F[_]: Async]: Decompressor[F] = BrotliDecompressor.make()
// implicit def bzip2[F[_]: Async]: Decompressor[F] = Bzip2Decompressor.make()
// implicit def lz4[F[_]: Async]: Decompressor[F] = Lz4Decompressor.make()
// implicit def zstd[F[_]: Async]: Decompressor[F] = ZstdDecompressor.make()
implicit def gzip[F[_]: Async]: Decompressor[F] = GzipDecompressor.make()

def decompressFile[F[_]: Async](toDecompress: Path, writeTo: Path)(implicit decompressor: Decompressor[F]): F[Unit] =
  Files[F]
    .readAll(toCompress)
    .through(decompressor.decompress)
    .through(Files[F].writeAll(writeTo))
    .compile
    .drain

Archiving

The library supports both .zip and .tar archives, with support for .zip through both the native Java implementation and the zip4j library.

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

// implicit def tar[F[_]: Async]: Archiver[F, Some] = TarArchiver.make()
// implicit def zip4j[F[_]: Async]: Archiver[F, Some] = Zip4JArchiver.make()
implicit def zip[F[_]: Async]: Archiver[F, Option] = ZipArchiver.makeDeflated()

def archiveDirectory[F[_]](directory: Path, writeTo: Path)(implicit archiver: Archiver[F, Option]): F[Unit] =
  Files[F]
    .list(directory)
    .evalMap { path =>
      Files[F]
        .size(path)
        .map { size =>
          // Name the entry based on the relative path between the source directory and the file
          val name = directory.relativize(path).toString
          ArchiveEntry[Some, Unit](name, uncompressedSize = Some(size)) -> Files[F].readAll(path)
        }
    }
    .through(archiver.archive)
    .through(Files[F].writeAll(writeTo))
    .compile
    .drain

Note that .tar doesn't compress the archive, so to create a .tar.gz file you will have to combine the archiver with the GzipCompressor

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

implicit def gzip[F[_]: Async]: Compressor[F] = GzipCompressor.make()
implicit def tar[F[_]: Async]: Archiver[F, Some] = TarArchiver.make()

def tarAndGzip[F[_]: Async](directory: Path, writeTo: Path)(implicit archiver: Archiver[F, Some], compressor: Compressor[F]): F[Unit] =
  Files[F]
    .list(directory)
    .evalMap { path =>
      Files[F]
        .size(path)
        .map { size =>
          // Name the entry based on the relative path between the source directory and the file
          val name = directory.relativize(path).toString
          ArchiveEntry[Some, Unit](name, uncompressedSize = Some(size)) -> Files[F].readAll(path)
        }
    }
    .through(archiver.archive)
    .through(compressor.compress)
    .through(Files[F].writeAll(writeTo))
    .compile
    .drain

Unarchiving

To unarchive we use the Unarchiver typeclass matching our archive format.

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

// implicit def tar[F[_]: Async]: Unarchiver[F, Option] = TarUnarchiver.make()
// implicit def zip4j[F[_]: Async]: Unarchiver[F, Option] = Zip4JUnarchiver.make()
implicit def zip[F[_]: Async]: Unarchiver[F, Option] = ZipUnarchiver.make()

def unArchive[F[_]](archive: Path, writeTo: Path)(implicit archiver: Unarchiver[F, Option]): F[Unit] =
  Files[F]
    .readAll(archive)
    .through(archiver.unarchive)
    .flatMap { case (entry, data) =>
      data.through(Files[F].writeAll(writeTo.resolve(entry.name)))
    }
    .compile
    .drain

Once again if you have a .tar.gz file you will have to combine the Unarchiver with the GzipDecompressor

import cats.effect.Async
import de.lhns.fs2.compress._
import fs2.io.file.{Files, Path}

implicit def gzip[F[_]: Async]: Decompressor[F] = GzipDecompressor.make()
implicit def tar[F[_]: Async]: Unarchiver[F, Option] = TarUnarchiver.make()

def unArchive[F[_]](archive: Path, writeTo: Path)(implicit archiver: Unarchiver[F, Option], decompressor: Decompress[F]): F[Unit] =
  Files[F]
    .readAll(archive)
    .through(decompressor.decompress)
    .through(archiver.unarchive)
    .flatMap { case (entry, data) =>
      data.through(Files[F].writeAll(writeTo.resolve(entry.name)))
    }
    .compile
    .drain

License

This project uses the Apache 2.0 License. See the file called LICENSE.