Scio





  • Scio Examples - examples with side-by-side explanation






  • sbt new spotify/scio.g8

    target/pack/bin/word-count --output=wc

    cat wc/part-00000-of-00001.txt



     import com.spotify.scio._

    /*
    sbt "runMain [PACKAGE].WordCount
    --project=[PROJECT] --runner=DataflowRunner --zone=[ZONE]
    --input=gs://dataflow-samples/shakespeare/kinglear.txt
    --output=gs://[BUCKET]/[PATH]/wordcount"
    */

    object WordCount {
    def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val exampleData = "gs://dataflow-samples/shakespeare/kinglear.txt"
    val input = args.getOrElse("input", exampleData)
    val output = args("output")

    sc.textFile(input)
    .map(_.trim)
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .countByValue
    .map(t => t._1 + ": " + t._2)
    .saveAsTextFile(output)

    val result = sc.close().waitUntilFinish()
    }
    }





    https://spotify.github.io/scio/Scio,-Beam-and-Dataflow.html
     

    Scio to Apache Beam

    Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts. Also see this comparison between Scio, Scalding and Spark APIs.

    Scio aims to be a thin wrapper on top of Beam while offering idiomatic Scala style API.

    Basics

    ScioContext, PipelineOptions and ScioResult

    • Beam uses PipelineOptions and its subclasses to parse command line arguments. Users have to extend the interface for their application level arguments.
    • ScioContext has a parseArguments method that takes an Array[String] of command line arguments, parses Beam specific ones into a PipelineOptions, and application specific ones into an Args, and returns the (PipelineOptions, Args).
    • ContextAndArgs is a short cut to create a (ScioContext, Args).
    • ScioResult can be used to access accumulator values and job state.

    IO

    • Most IO Read transforms are implemented as methods on ScioContext, e.g. avroFile, textFile, bigQueryTable.
    • Most IO Write transforms are implemented as methods on SCollection, e.g. saveAsAvroFile, saveAsTextFile, saveAsBigQueryTable.
    • These IO operations also detects when the ScioContext is running in a JobTest and manages test IO in memory.
    • Write options also return a ClosedTap. Once the job completes you can open the Tap. Tap abstracts away the logic of reading the dataset directly as an Iterator[T] or re-opening it in another ScioContext. The Future is complete once the job finishes. This can be used to do light weight pipeline orchestration e.g. WordCountOrchestration.scala.

    ByKey operations

    • Beam ByKey transforms require PCollection[KV[K, V]] inputs while Scio uses SCollection[(K, V)]
    • Hence every ByKey transform in PairSCollectionFunctions converts Scala (K, V) to KV[K, V] before and vice versa afterwards. However these are lightweight wrappers and the JVM should be able to optimize them.
    • PairSCollectionFunctions also converts java.lang.Iterable[V] and java.util.List[V] to scala.Iterable[V] in some cases.

    Coders

    • Beam/Dataflow uses Coder for (de)serializing elements in a PCollection during shuffle. There are built-in coders for Java primitive types, collections, and common types in GCP like Avro, ProtoBuf, BigQuery TableRow, Datastore Entity.
    • PCollection uses TypeToken from Guava reflection and to workaround Java type erasure and retrieve type information of elements. This may not always work but there is a PCollection#setCoder method to override.
    • Twitter’s chill library uses kryo to (de)serialize data. Chill includes serializers for common Scala types and cal also automatically derive serializers for arbitrary objects. Scio falls back to KryoAtomicCoder when a built-in one isn’t available.
    • A coder may be non-deterministic if Coder#verifyDeterministic throws an exception. Any data type with such a coder cannot be used as a key in ByKey operations. However KryoAtomicCoder assumes all types are deterministic for simplicity so it’s up to the user’s discretion to not avoid non-deterministic types e.g. tuples or case classes with doubles as keys.
    • Avro GenericRecord requires a schema during deserialization (which is available as GenericRecord#getSchema for serialization) and AvroCoder requires that too during initialization. This is not possible in KryoAtomicCoder, i.e. when nesting GenericRecord inside a Scala type. Instead KryoAtomicCoder serializes the schema before every record so that they can roundtrip safely. This is not optimal but the only way without requiring user to handcraft a custom coder.






















    Comments