Apache Beam
















Apache beam is a programming model to define and execute data processing pipelines
- ETL
- batch
- stream (continuous) processing

- You can pick one of its SDK e.g. 
- Java
- Python
- Go

- Have it execute in one of the Beam’s supported runners (distributed processing back-ends) such as 
Apache Apex
Apache Flink
Apache Gearpump
Apache Samza
Apache Spark
Google Cloud Dataflow

It is an implementation of the DataFlow paper.

Data Processing formats:
- Batch
- Stream
- Windowed 
- Watermark






Quick Start Java

Generate sample maven project:
$ mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.13.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false

Some generated files
$ ls src/main/java/org/apache/beam/examples/
DebuggingWordCount.java	WindowedWordCount.java	common
MinimalWordCount.java	WordCount.java

Pick a runner (here run locally) and run!
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
Checkout results
$ ls counts*

Key Concepts:

PipelineOption
Provide runner specific options

Pipeline
Encapsulates your entire data processing task. From start to finish: reading input data, transforming data, and writing out.

PCollection
Distributed data set that pipeline operates on. 
PCollection can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. 

PTransform:
A data processing operation, or a step, in pipeline. 
Takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.
IO transform: read or write data
Creates a new output PCollection without modifying the input collection
PCollections are the inputs and outputs for each PTransform in pipeline.
think of PCollections as variables and PTransforms as functions applied to these variables

Invoking multiple Beam transforms is similar to method chaining, but with one slight difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
                                                        .apply([Second Transform])
                                                        .apply([Third Transform])
 transform does not consume or otherwise alter the input collection–remember that a PCollection is immutable by definition.
[PCollection of database table rows] = [Database Table Reader].apply([Read Transform])
[PCollection of 'A' names] = [PCollection of database table rows].apply([Transform A])
[PCollection of 'B' names] = [PCollection of database table rows].apply([Transform B])


Core Beam transforms
ParDo
Generic parallel processing of items. Considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection. Provide user code in the form of a DoFn object. Similar to map in map/reduce/shuffle scheme
- Filtering
- Process e.g. formatting / type-converting

GroupByKey
 similar to shuffle in map/reduce/shuffle scheme. 
GroupByKey represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values).
using unbounded PCollections, you must use either non-global windowing or an aggregation trigger in order to perform a GroupByKey or CoGroupByKey. This is because a bounded GroupByKey or CoGroupByKey must wait for all the data with a certain key to be collected,

CoGroupByKey (join)
relational join of two or more key/value PCollections that have the same key type. Example

Combine (e.g. sum, foldLeft)

Flatten

Partition
The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection.
number of partitions must be determined at graph construction time.
You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).

e.g. divide a PCollection into percentile groups.

your user code must fulfill at least these requirements:

  • Your function object must be serializable.
  • Your function object must be thread-compatible, and be aware that the Beam SDKs are not thread-safe.
make your function object idempotent: it can be repeated or retried as often as necessary without causing unintended side effects.

Side Input
In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while procesing each element.



A Job
Create a Pipeline object using a PipelineOption object
Apply IO PTransforms on external or in-memory data to transform it to a PCollection
Apply PTransform to change, filter, group, analyze, or process the elements in PCollections
Apply IO PTransforms to transform PCollection to external path.


When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects you’ve created and transforms that you’ve applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous “job” (or equivalent) on that back-end.

Beam uses windowing to divide a continuously updating unbounded PCollection into logical windows of finite size. These logical windows are determined by some characteristic associated with a data element, such as a timestamp. Aggregation transforms (such as GroupByKey and Combine) work on a per-window basis — as the data set is generated, they process each PCollection as a succession of these finite windows.

Each element in a PCollection has an associated intrinsic timestamp. The timestamp for each element is initially assigned by the Source that creates the PCollection. Sources that create an unbounded PCollection often assign each new element a timestamp that corresponds to when the element was read or added.

Note: Sources that create a bounded PCollection for a fixed data set also automatically assign timestamps, but the most common behavior is to assign every element the same timestamp (Long.MIN_VALUE).










Snippets

Command-line parser that you can use to set fields in PipelineOptions using command-line arguments.

PipelineOptions options = PipelineOptionsFactory
    .fromArgs(args)
    .withValidation()   // will check for required command-line arguments and validate argument values.
    .create();
This interprets command-line arguments that follow the format:
--<option>=<value>
Custom option
define an interface with getter and setter methods for each option
public interface MyOptions extends PipelineOptions {
    @Description("Input for the pipeline")
    @Default.String("gs://my-bucket/input")
    String getInput();
    void setInput(String input);

    @Description("Output for the pipeline")
    @Default.String("gs://my-bucket/input")
    String getOutput();
    void setOutput(String output);
}

When you register your interface with PipelineOptionsFactory, the --help can find your custom options interface and add it to the output of the --help command.
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                                .withValidation()
                                                .as(MyOptions.class);
 



 
package org.apache.beam.examples;

import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;

/**
* An example that counts words in Shakespeare.
*
* MinimalWordCount is the first in a series of four successively more
* detailed 'word count' examples. Here, we don't show any error-checking or
* argument processing, and focus on construction of the pipeline, which chains together the
* application of core transforms.
*
* <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
* {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
* concepts.
*
* <p>Concepts:
*
* <pre>
* 1. Reading data from text files
* 2. Specifying 'inline' transforms
* 3. Counting items in a PCollection
* 4. Writing data to text files
* </pre>
*
* <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
* can see the results in the output files in your current working directory, with names like
* "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
* file service.
*/
public class MinimalWordCount {

public static void main(String[] args) {

// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();

// In order to run your pipeline, you need to make following runner specific changes:
//
// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
// or FlinkRunner.
// CHANGE 2/3: Specify runner-required options.
// For BlockingDataflowRunner, set project and temp location as follows:
// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
// dataflowOptions.setRunner(BlockingDataflowRunner.class);
// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);

// Create the Pipeline object with the options we defined above
Pipeline p = Pipeline.create(options);

// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).

// This example reads a public data set consisting of the complete works of Shakespeare.
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

// Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
// This transform splits the lines in PCollection<String>, where each element is an
// individual word in Shakespeare's collected texts.
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
// We use a Filter transform to avoid empty word
.apply(Filter.by((String word) -> !word.isEmpty()))
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a
// unique word in the text. The associated value is the occurrence count for that word.
.apply(Count.perElement())
// Apply a MapElements transform that formats our PCollection of word counts into a
// printable string, suitable for writing to an output file.
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
// TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcounts-00001-of-00005
.apply(TextIO.write().to("wordcounts"));

p.run().waitUntilFinish();
}
}




package org.apache.beam.examples;

import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
* An example that counts words in Shakespeare and includes Beam best practices.
*
* <p>This class, {@link WordCount}, is the second in a series of four successively more detailed
* 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. After
* you've looked at this example, then see the {@link DebuggingWordCount} pipeline, for introduction
* of additional concepts.
*
* <p>For a detailed walkthrough of this example, see <a
* href="https://beam.apache.org/get-started/wordcount-example/">
* https://beam.apache.org/get-started/wordcount-example/ </a>
*
* <p>Basic concepts, also in the MinimalWordCount example: Reading text files; counting a
* PCollection; writing to text files
*
* <p>New Concepts:
*
* <pre>
* 1. Executing a Pipeline both locally and using the selected runner
* 2. Using ParDo with static DoFns defined out-of-line
* 3. Building a composite transform
* 4. Defining your own pipeline options
* </pre>
*
* <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner.
* These are now command-line options and not hard-coded as they were in the MinimalWordCount
* example.
*
* <p>To change the runner, specify:
*
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }</pre>
*
* <p>To execute this pipeline, specify a local output file (if using the {@code DirectRunner}) or
* output prefix on a supported distributed file system.
*
* <pre>{@code
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
* }</pre>
*
* <p>The input file defaults to a public data set containing the text of of King Lear, by William
* Shakespeare. You can override it and choose your own input with {@code --inputFile}.
*/
public class sa WordCount {

/**
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it to
* a ParDo in the pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}

// Split the line into words.
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);

// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}

/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}

/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

return wordCounts;
}
}

/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments to
* be processed by the command-line parser, and specify default values for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public interface WordCountOptions extends PipelineOptions {

/**
* By default, this example reads from a public dataset containing the text of King Lear. Set
* this option to choose a different input file or glob.
*/
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();

void setInputFile(String value);

/** Set this required option to specify where to write the output. */
@Description("Path of the file to write to")
@Required
String getOutput();

void setOutput(String value);
}

static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);

// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));

p.run().waitUntilFinish();
}

public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);

runWordCount(options);
}
}












Subpages (1): Scio
Comments