From f0f1a949cc3ce45ff30f41a598672740207aab2c Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Fri, 4 Jul 2025 13:30:55 +0200 Subject: [PATCH 01/11] Add StreamRdfBatchSink as a fallback for non-streaming formats --- .../cli/util/jena/StreamRdfBatchSink.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala new file mode 100644 index 0000000..433e8b6 --- /dev/null +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala @@ -0,0 +1,25 @@ +package eu.neverblink.jelly.cli.util.jena + +import org.apache.jena.riot.system.StreamRDF +import org.apache.jena.rdf.model.ModelFactory +import org.apache.jena.sparql.core.Quad +import org.apache.jena.graph.Triple +import org.apache.jena.rdf.model.Model +import org.apache.jena.riot.system.StreamRDFLib +import java.io.OutputStream +import org.apache.jena.riot.Lang +import org.apache.jena.riot.RDFDataMgr + +/** A StreamRDF implementation that collects everything into a Model. When finishing, formats + * everything according to the lang, and emits it to the outputStream. This is meant to be a + * fallback for non-streaming RDF formats, as it requires all data to be loaded in memory. + */ +class StreamRdfBatchSink(val outputStream: OutputStream, val lang: Lang) extends StreamRDF: + private val model: Model = ModelFactory.createDefaultModel() + private val modelStream: StreamRDF = StreamRDFLib.graph(model.getGraph) + override def quad(quad: Quad): Unit = modelStream.quad(quad) + override def triple(triple: Triple): Unit = modelStream.triple(triple) + override def prefix(prefix: String, iri: String): Unit = modelStream.prefix(prefix, iri) + override def base(base: String): Unit = modelStream.base(base) + override def finish(): Unit = RDFDataMgr.write(outputStream, model, lang) + override def start(): Unit = () From 712cd08d7d1f325250f13a7a62d7f20d25a6134f Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Fri, 4 Jul 2025 13:33:13 +0200 Subject: [PATCH 02/11] Make jsonld and rdfxml formats use StreamRdfBatchSink --- .../jelly/cli/command/rdf/RdfFromJelly.scala | 14 ++++++++------ .../jelly/cli/command/rdf/util/RdfFormat.scala | 5 +++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 42336a9..40fa2e8 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -5,13 +5,14 @@ import eu.neverblink.jelly.cli.* import eu.neverblink.jelly.cli.command.rdf.util.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.* import eu.neverblink.jelly.cli.util.args.IndexRange +import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchSink import eu.neverblink.jelly.convert.jena.JenaConverterFactory import eu.neverblink.jelly.core.JellyOptions import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler import eu.neverblink.jelly.core.proto.v1.RdfStreamFrame import eu.neverblink.jelly.core.proto.google.v1 as google import org.apache.jena.graph.{Node, Triple} -import org.apache.jena.riot.Lang +import org.apache.jena.riot.system.StreamRDF import org.apache.jena.riot.system.StreamRDFWriter import org.apache.jena.sparql.core.Quad @@ -58,7 +59,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ lazy val printUtil: RdfCommandPrintUtil[RdfFormat.Writeable] = RdfFromJellyPrint val defaultAction: (InputStream, OutputStream) => Unit = - jellyToLang(RdfFormat.NQuads.jenaLang, _, _) + (in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, RdfFormat.NQuads.jenaLang)) private def takeFrames: IndexRange = IndexRange(getOptions.takeFrames, "--take-frames") @@ -73,7 +74,10 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ format: RdfFormat.Writeable, ): Option[(InputStream, OutputStream) => Unit] = format match - case j: RdfFormat.Jena.Writeable => Some(jellyToLang(j.jenaLang, _, _)) + case j: RdfFormat.Jena.Writeable => + Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang))) + case j: RdfFormat.Jena.BatchWriteable => + Some((in, out) => jellyToLang(in, StreamRdfBatchSink(out, j.jenaLang))) case RdfFormat.JellyText => Some(jellyBinaryToText) /** This method reads the Jelly file, rewrites it to specified format and writes it to some output @@ -86,11 +90,9 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ * OutputStream */ private def jellyToLang( - jenaLang: Lang, inputStream: InputStream, - outputStream: OutputStream, + writer: StreamRDF, ): Unit = - val writer = StreamRDFWriter.getWriterStream(outputStream, jenaLang) // Whether the output is active at this moment var outputEnabled = false val handler = new AnyStatementHandler[Node] { diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala index 42cb8a2..a2962cc 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala @@ -18,6 +18,7 @@ object RdfFormat: object Jena: sealed trait Writeable extends Jena, RdfFormat.Writeable sealed trait Readable extends Jena, RdfFormat.Readable + sealed trait BatchWriteable extends Jena, RdfFormat.Writeable case object NQuads extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: override val fullName: String = "N-Quads" @@ -49,12 +50,12 @@ object RdfFormat: override val cliOptions: List[String] = List("jenathrift", "jena-thrift") override val jenaLang: Lang = RDFLanguages.RDFTHRIFT - case object RdfXml extends RdfFormat.Jena.Readable: + case object RdfXml extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable: override val fullName: String = "RDF/XML" override val cliOptions: List[String] = List("rdfxml", "rdf-xml") override val jenaLang: Lang = RDFLanguages.RDFXML - case object JsonLd extends RdfFormat.Jena.Readable: + case object JsonLd extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable: override val fullName: String = "JSON-LD" override val cliOptions: List[String] = List("jsonld", "json-ld") override val jenaLang: Lang = RDFLanguages.JSONLD From 7bfdce3eb49a4aefdecc8d8b67e72a6b6e4460d9 Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Mon, 7 Jul 2025 12:45:28 +0200 Subject: [PATCH 03/11] Add tests for jsonld and rdfxml output conversions --- .../cli/command/rdf/RdfFromJellySpec.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala index 1a97cad..d16a603 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala @@ -15,6 +15,8 @@ import java.nio.file.attribute.PosixFilePermissions import java.nio.file.{Files, Paths} import scala.io.Source import scala.util.Using +import org.apache.jena.riot.RDFDataMgr +import org.apache.jena.rdf.model.ModelFactory class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: @@ -217,6 +219,21 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: } } + for lang <- Seq(RdfFormat.JsonLd, RdfFormat.RdfXml) do + s"handle conversion of Jelly binary to ${lang.fullName}" when { + "input stream to output stream" in { + val input = DataGenHelper.generateJellyInputStream(testCardinality) + RdfFromJelly.setStdIn(input) + val model = DataGenHelper.generateTripleModel(testCardinality) + val (out, err) = RdfFromJelly.runTestCommand( + List("rdf", "from-jelly", "--out-format", lang.cliOptions.head), + ) + val newModel = ModelFactory.createDefaultModel() + RDFDataMgr.read(newModel, new ByteArrayInputStream(out.getBytes()), lang.jenaLang) + model.isIsomorphicWith(newModel) shouldBe true + } + } + "throw proper exception" when { "input file is not found" in { val nonExist = "non-existing-file" From ece12d1c86725a4ee336fb2cd300e91f0205d05b Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Mon, 7 Jul 2025 12:46:03 +0200 Subject: [PATCH 04/11] Remove test for unwriteable formats --- .../cli/command/rdf/RdfFromJellySpec.scala | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala index d16a603..a5c77d0 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala @@ -6,7 +6,6 @@ import eu.neverblink.jelly.cli.command.helpers.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat import eu.neverblink.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamFrame} import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory} -import org.apache.jena.riot.RDFLanguages import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -231,8 +230,8 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: val newModel = ModelFactory.createDefaultModel() RDFDataMgr.read(newModel, new ByteArrayInputStream(out.getBytes()), lang.jenaLang) model.isIsomorphicWith(newModel) shouldBe true + } } - } "throw proper exception" when { "input file is not found" in { @@ -356,34 +355,6 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: } } - "readable but not writable format supplied" in withFullJellyFile { j => - withEmptyJenaFile( - testCode = { q => - val exception = - intercept[ExitException] { - RdfFromJelly.runTestCommand( - List( - "rdf", - "from-jelly", - j, - "--to", - q, - "--out-format", - RdfFormat.RdfXml.cliOptions.head, - ), - ) - } - val msg = InvalidFormatSpecified( - RdfFormat.RdfXml.cliOptions.head, - RdfFromJellyPrint.validFormatsString, - ) - RdfFromJelly.getErrString should include(msg.getMessage) - exception.code should be(1) - }, - jenaLang = RDFLanguages.RDFXML, - ) - } - "invalid --take-frames argument provided" in { val e = intercept[ExitException] { RdfFromJelly.runTestCommand( From 21ba7431cfb19ec2ce7b89c3704430fe9843e340 Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Mon, 7 Jul 2025 14:40:19 +0200 Subject: [PATCH 05/11] Rename RdfFormat.Jena.Writeable to RdfFormat.Jena.StreamWriteable --- .../jelly/cli/command/rdf/RdfFromJelly.scala | 2 +- .../jelly/cli/command/rdf/util/RdfFormat.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 40fa2e8..8f61e45 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -74,7 +74,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ format: RdfFormat.Writeable, ): Option[(InputStream, OutputStream) => Unit] = format match - case j: RdfFormat.Jena.Writeable => + case j: RdfFormat.Jena.StreamWriteable => Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang))) case j: RdfFormat.Jena.BatchWriteable => Some((in, out) => jellyToLang(in, StreamRdfBatchSink(out, j.jenaLang))) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala index a2962cc..3a2d7b7 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala @@ -16,36 +16,36 @@ object RdfFormat: val jenaLang: Lang object Jena: - sealed trait Writeable extends Jena, RdfFormat.Writeable + sealed trait StreamWriteable extends Jena, RdfFormat.Writeable sealed trait Readable extends Jena, RdfFormat.Readable sealed trait BatchWriteable extends Jena, RdfFormat.Writeable - case object NQuads extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object NQuads extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "N-Quads" override val cliOptions: List[String] = List("nq", "nquads") override val jenaLang: Lang = RDFLanguages.NQUADS - case object NTriples extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object NTriples extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "N-Triples" override val cliOptions: List[String] = List("nt", "ntriples") override val jenaLang: Lang = RDFLanguages.NTRIPLES - case object Turtle extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object Turtle extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "Turtle" override val cliOptions: List[String] = List("ttl", "turtle") override val jenaLang: Lang = RDFLanguages.TURTLE - case object TriG extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object TriG extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "TriG" override val cliOptions: List[String] = List("trig") override val jenaLang: Lang = RDFLanguages.TRIG - case object RdfProto extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object RdfProto extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "RDF Protobuf" override val cliOptions: List[String] = List("jenaproto", "jena-proto") override val jenaLang: Lang = RDFLanguages.RDFPROTO - case object Thrift extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object Thrift extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "RDF Thrift" override val cliOptions: List[String] = List("jenathrift", "jena-thrift") override val jenaLang: Lang = RDFLanguages.RDFTHRIFT From e9847e5ee7917147fb5d9ea9bce6bdfb9c5cc158 Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Mon, 7 Jul 2025 14:58:27 +0200 Subject: [PATCH 06/11] Rename StreamRdfBatchSink to StreamRdfBatchWriter --- .../eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala | 4 ++-- .../{StreamRdfBatchSink.scala => StreamRdfBatchWriter.scala} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/main/scala/eu/neverblink/jelly/cli/util/jena/{StreamRdfBatchSink.scala => StreamRdfBatchWriter.scala} (92%) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 8f61e45..16c87c9 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -5,7 +5,7 @@ import eu.neverblink.jelly.cli.* import eu.neverblink.jelly.cli.command.rdf.util.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.* import eu.neverblink.jelly.cli.util.args.IndexRange -import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchSink +import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchWriter import eu.neverblink.jelly.convert.jena.JenaConverterFactory import eu.neverblink.jelly.core.JellyOptions import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler @@ -77,7 +77,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ case j: RdfFormat.Jena.StreamWriteable => Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang))) case j: RdfFormat.Jena.BatchWriteable => - Some((in, out) => jellyToLang(in, StreamRdfBatchSink(out, j.jenaLang))) + Some((in, out) => jellyToLang(in, StreamRdfBatchWriter(out, j.jenaLang))) case RdfFormat.JellyText => Some(jellyBinaryToText) /** This method reads the Jelly file, rewrites it to specified format and writes it to some output diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala similarity index 92% rename from src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala rename to src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala index 433e8b6..8dc09c0 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchSink.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala @@ -14,7 +14,7 @@ import org.apache.jena.riot.RDFDataMgr * everything according to the lang, and emits it to the outputStream. This is meant to be a * fallback for non-streaming RDF formats, as it requires all data to be loaded in memory. */ -class StreamRdfBatchSink(val outputStream: OutputStream, val lang: Lang) extends StreamRDF: +class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) extends StreamRDF: private val model: Model = ModelFactory.createDefaultModel() private val modelStream: StreamRDF = StreamRDFLib.graph(model.getGraph) override def quad(quad: Quad): Unit = modelStream.quad(quad) From 1b040a6cd2301697ebf8b7397394c64ed23cddec Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Tue, 8 Jul 2025 11:47:41 +0200 Subject: [PATCH 07/11] Replace Model with Dataset in StreamRdfBatchWriter --- .../cli/util/jena/StreamRdfBatchWriter.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala index 8dc09c0..ecc2e8a 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala @@ -1,25 +1,30 @@ package eu.neverblink.jelly.cli.util.jena import org.apache.jena.riot.system.StreamRDF -import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.sparql.core.Quad import org.apache.jena.graph.Triple -import org.apache.jena.rdf.model.Model import org.apache.jena.riot.system.StreamRDFLib import java.io.OutputStream import org.apache.jena.riot.Lang import org.apache.jena.riot.RDFDataMgr +import org.apache.jena.query.DatasetFactory +import org.apache.jena.query.Dataset /** A StreamRDF implementation that collects everything into a Model. When finishing, formats * everything according to the lang, and emits it to the outputStream. This is meant to be a * fallback for non-streaming RDF formats, as it requires all data to be loaded in memory. */ class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) extends StreamRDF: - private val model: Model = ModelFactory.createDefaultModel() - private val modelStream: StreamRDF = StreamRDFLib.graph(model.getGraph) - override def quad(quad: Quad): Unit = modelStream.quad(quad) - override def triple(triple: Triple): Unit = modelStream.triple(triple) - override def prefix(prefix: String, iri: String): Unit = modelStream.prefix(prefix, iri) - override def base(base: String): Unit = modelStream.base(base) - override def finish(): Unit = RDFDataMgr.write(outputStream, model, lang) + protected val dataset: Dataset = DatasetFactory.create() + protected val datasetStream: StreamRDF = StreamRDFLib.dataset(dataset.asDatasetGraph()) + override def quad(quad: Quad): Unit = datasetStream.quad(quad) + override def triple(triple: Triple): Unit = datasetStream.triple(triple) + override def prefix(prefix: String, iri: String): Unit = datasetStream.prefix(prefix, iri) + override def base(base: String): Unit = datasetStream.base(base) + override def finish(): Unit = writeOutput() override def start(): Unit = () + def writeOutput(): Unit = + if lang == Lang.RDFXML then + RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang) + else + RDFDataMgr.write(outputStream, dataset, lang) From 54037c359f5db35aca7f7b4d69004b9c52f5f66d Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Tue, 8 Jul 2025 12:29:17 +0200 Subject: [PATCH 08/11] Add support for combining frames --- .../jelly/cli/command/rdf/RdfFromJelly.scala | 23 +++++++++++++++---- .../cli/util/jena/StreamRdfBatchWriter.scala | 14 +++++++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 16c87c9..9e0fea7 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -18,6 +18,7 @@ import org.apache.jena.sparql.core.Quad import java.io.{InputStream, OutputStream} import scala.jdk.CollectionConverters.* +import eu.neverblink.jelly.cli.util.jena.StreamRdfCombiningBatchWriter object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]: override val defaultFormat: RdfFormat = RdfFormat.NQuads @@ -28,7 +29,10 @@ object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]: "If no output file is specified, the output is written to stdout.\n" + "If an error is detected, the program will exit with a non-zero code.\n" + "Otherwise, the program will exit with code 0.\n" + - "Note: this command works in a streaming manner and scales well to large files", + "Note: this command works in a streaming manner where possible and scales well to\n" + + "large files. Non-streaming formats (e.g. RDF/XML) by default work on a\n" + + "frame-by-frame basis, but they can be combined into one object with the\n" + + "--combine option.", ) @ArgsName("") case class RdfFromJellyOptions( @@ -48,6 +52,11 @@ case class RdfFromJellyOptions( IndexRange.helpText, ) takeFrames: String = "", + @HelpMessage( + "Add to combine the results into one object, when using a non-streaming output format. " + + "Ignored otherwise. Take care with input size, as this option will load everything into memory.", + ) + combine: Boolean = false, ) extends HasJellyCommandOptions object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writeable]: @@ -73,12 +82,16 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ override def matchFormatToAction( format: RdfFormat.Writeable, ): Option[(InputStream, OutputStream) => Unit] = - format match - case j: RdfFormat.Jena.StreamWriteable => + (format, getOptions.combine) match + case (j: RdfFormat.Jena.StreamWriteable, _) => Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang))) - case j: RdfFormat.Jena.BatchWriteable => + case (j: RdfFormat.Jena.BatchWriteable, true) => + Some((in, out) => + StreamRdfCombiningBatchWriter(out, j.jenaLang).runAndOutput(x => jellyToLang(in, x)), + ) + case (j: RdfFormat.Jena.BatchWriteable, false) => Some((in, out) => jellyToLang(in, StreamRdfBatchWriter(out, j.jenaLang))) - case RdfFormat.JellyText => Some(jellyBinaryToText) + case (RdfFormat.JellyText, _) => Some(jellyBinaryToText) /** This method reads the Jelly file, rewrites it to specified format and writes it to some output * stream diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala index ecc2e8a..95253a9 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala @@ -24,7 +24,13 @@ class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) exten override def finish(): Unit = writeOutput() override def start(): Unit = () def writeOutput(): Unit = - if lang == Lang.RDFXML then - RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang) - else - RDFDataMgr.write(outputStream, dataset, lang) + if lang == Lang.RDFXML then RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang) + else RDFDataMgr.write(outputStream, dataset, lang) + def runAndOutput(runnable: StreamRDF => Unit): Unit = { + runnable(this) + writeOutput() + } + +class StreamRdfCombiningBatchWriter(outputStream: OutputStream, lang: Lang) + extends StreamRdfBatchWriter(outputStream, lang): + override def finish(): Unit = () From 517bdb4e9f1d0782baa2aacec9fe87ea91481fa2 Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Tue, 8 Jul 2025 12:31:30 +0200 Subject: [PATCH 09/11] Add tests for Dataset handling, frame handling, and frame combining. --- .../cli/command/helpers/DataGenHelper.scala | 11 +++++ .../cli/command/rdf/RdfFromJellySpec.scala | 45 +++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala index 3933bb7..df96898 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala @@ -148,3 +148,14 @@ object DataGenHelper: RDFDataMgr.write(outputStream, dataset, jenaLang) val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray) nQuadStream + + def generateJellyInputStreamDataset( + nGraphs: Int, + nTriplesPerGraph: Int, + differentiator: String, + ): ByteArrayInputStream = + val model = generateDataset(nGraphs, nTriplesPerGraph, differentiator) + val outputStream = new ByteArrayOutputStream() + RDFDataMgr.write(outputStream, model, JellyLanguage.JELLY) + val jellyStream = new ByteArrayInputStream(outputStream.toByteArray) + jellyStream diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala index a5c77d0..68b4529 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala @@ -6,6 +6,7 @@ import eu.neverblink.jelly.cli.command.helpers.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat import eu.neverblink.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamFrame} import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory} +import org.apache.jena.query.DatasetFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -218,9 +219,13 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: } } - for lang <- Seq(RdfFormat.JsonLd, RdfFormat.RdfXml) do - s"handle conversion of Jelly binary to ${lang.fullName}" when { - "input stream to output stream" in { + "handle conversion of Jelly binary to various formats" when { + for (lang, header) <- Seq( + (RdfFormat.JsonLd, "\\{\n {4}\"@graph\":".r), + (RdfFormat.RdfXml, " Date: Tue, 8 Jul 2025 13:23:02 +0200 Subject: [PATCH 10/11] Doc corrections --- .../eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 9e0fea7..9aac84a 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -31,8 +31,8 @@ object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]: "Otherwise, the program will exit with code 0.\n" + "Note: this command works in a streaming manner where possible and scales well to\n" + "large files. Non-streaming formats (e.g. RDF/XML) by default work on a\n" + - "frame-by-frame basis, but they can be combined into one object with the\n" + - "--combine option.", + "frame-by-frame basis, but they can be combined into one dataset with the\n" + + "--combine option. RDF/XML will only serialize the default model.", ) @ArgsName("") case class RdfFromJellyOptions( @@ -53,7 +53,7 @@ case class RdfFromJellyOptions( ) takeFrames: String = "", @HelpMessage( - "Add to combine the results into one object, when using a non-streaming output format. " + + "Add to combine the results into one dataset, when using a non-streaming output format. " + "Ignored otherwise. Take care with input size, as this option will load everything into memory.", ) combine: Boolean = false, From 3cc5fb7778005590af983483cb2c38c2880c4be1 Mon Sep 17 00:00:00 2001 From: niegrzybkowski Date: Tue, 8 Jul 2025 13:30:26 +0200 Subject: [PATCH 11/11] Disable RDF/XML multi-model test --- .../jelly/cli/command/rdf/RdfFromJellySpec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala index 68b4529..903c1ad 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala @@ -248,9 +248,10 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: RDFDataMgr.read(newDataset, new ByteArrayInputStream(out.getBytes()), lang.jenaLang) newDataset.isEmpty shouldBe false dataset.getDefaultModel.isIsomorphicWith(newDataset.getDefaultModel) shouldBe true - dataset.getNamedModel("http://example.org/graph/2").isIsomorphicWith( - newDataset.getNamedModel("http://example.org/graph/2"), - ) shouldBe true + if lang != RdfFormat.RdfXml then + dataset.getNamedModel("http://example.org/graph/2").isIsomorphicWith( + newDataset.getNamedModel("http://example.org/graph/2"), + ) shouldBe true } s"multiple frames input stream to output ${lang.fullName} stream without --combine flag" in {