Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ object SseClient {
def apply(client: Client[IO], retryDelay: FiniteDuration, uri: Uri, offset: Offset): EventStream[IO] = {

def go(stream: EventStream[IO], lastEventId: EventId): Pull[IO, ServerSentEvent, Unit] =
stream.pull.uncons1.flatMap {
case Some((event, tail)) =>
Pull.output1(event) >> go(tail, event.id.getOrElse(lastEventId))
case None =>
stream.pull.uncons.flatMap {
case Some((events, tail)) =>
Pull.output(events) >> go(tail, events.last.flatMap(_.id).getOrElse(lastEventId))
case None =>
Pull.eval(IO.sleep(retryDelay)) >>
resume(lastEventId)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk

import cats.effect.IO
import cats.syntax.all.*
import ch.epfl.bluebrain.nexus.delta.kernel.Hex
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
Expand Down Expand Up @@ -52,14 +53,19 @@ final class DiskStorageSaveFile(implicit uuidf: UUIDF) {
// Stores the file while computing the hash and the file size
private def store(algorithm: DigestAlgorithm, fullPath: Path): Pipe[IO, ByteBuffer, (Long, ComputedDigest)] = {
def go(hasher: Hasher[IO], cursor: WriteCursor[IO], stream: FileData): Pull[IO, (Long, ComputedDigest), Unit] = {
stream.pull.uncons1.flatMap {
case Some((buffer, tail)) =>
val chunk = Chunk.byteBuffer(buffer)
Pull.eval(hasher.update(chunk)) >>
Pull.eval(cursor.write(chunk)).flatMap { nc =>
stream.pull.uncons.flatMap {
case Some((buffers, tail)) =>
Pull
.eval(
buffers.foldLeftM(cursor) { (accCursor, buffer) =>
val chunk = Chunk.byteBuffer(buffer)
hasher.update(chunk) >> accCursor.write(chunk)
}
)
.flatMap { nc =>
go(hasher, nc, tail)
}
case None =>
case None =>
Pull.eval(hasher.hash).flatMap { hash =>
val digest = ComputedDigest(algorithm, Hex.valueOf(hash.bytes.toArray))
Pull.eval(cursor.file.size).flatMap { size =>
Expand Down Expand Up @@ -88,7 +94,7 @@ object DiskStorageSaveFile {
_ <- Files[IO].createDirectories(dir).adaptError { e => couldNotCreateDirectory(dir, e.getMessage) }
} yield resolved -> relative

def computeLocation(
private def computeLocation(
project: ProjectRef,
volume: AbsolutePath,
filename: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.data.NonEmptyChain
import cats.effect.IO
import cats.syntax.all.*
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.OperationInOutMatchErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.config.BatchConfig
import fs2.{Pull, Stream}
Expand Down Expand Up @@ -163,39 +163,24 @@ object Operation {
*/
def apply(element: SuccessElem[In]): IO[Elem[Out]]

/**
* Checks if the provided elem has a successful element value of type `I`. If true, it will return it in Right.
* Otherwise it will return it in Left with the type `O`. This is safe because [[Elem]] is covariant.
*
* @param element
* an elem with an Elem to be tested
*/
protected def partitionSuccess[I, O](element: Elem[I]): Either[Elem[O], SuccessElem[I]] =
element match {
case _: SuccessElem[?] => Right(element.asInstanceOf[SuccessElem[I]])
case _: FailedElem | _: DroppedElem => Left(element.asInstanceOf[Elem[O]])
}

override protected[stream] def asFs2: ElemPipe[In, Out] = {
def go(s: ElemStream[In]): Pull[IO, Elem[Out], Unit] = {
s.pull.uncons1.flatMap {
case Some((head, tail)) =>
partitionSuccess(head) match {
case Right(value) =>
Pull
.eval(
apply(value)
.handleErrorWith { err =>
logger
.error(err)(s"Error while applying pipe $name on element ${value.id}")
.as(value.failed(err))
}
)
.flatMap(Pull.output1) >> go(tail)
case Left(other) =>
Pull.output1(other) >> go(tail)
}
case None => Pull.done
s.pull.uncons.flatMap {
case Some((chunk, tail)) =>
Pull
.eval(
chunk.traverse {
case s: SuccessElem[In] =>
apply(s).handleErrorWith { err =>
logger
.error(err)(s"Error while applying pipe $name on element ${s.id}")
.as(s.failed(err))
}
case other => IO.pure(other.asInstanceOf[Elem[Out]])
}
)
.flatMap(Pull.output) >> go(tail)
case None => Pull.done
}
}
in => go(in).stream
Expand Down