# Example multi-language pipelines
This project provides examples of Apache Beam
[multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines):
## Using Java transforms from Python
* **python/wordcount_external** - A Python pipeline that runs the Word Count workflow using three external Java
SchemaTransforms. This example demonstrates the updated `ExternalTransformProvider` API.
#### _Outdated examples:_
* **python/addprefix** - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
* **python/javacount** - A Python pipeline that counts words using the Java `Count.perElement()` transform.
* **python/javadatagenerator** - A Python pipeline that produces a set of strings generated from Java.
This example demonstrates the `JavaExternalTransform` API.
### Instructions for running the pipelines
#### 1) Start the expansion service
1. Download the latest 'beam-examples-multi-language' JAR. Starting with Apache Beam 2.36.0,
you can find it in [the Maven Central Repository](https://search.maven.org/search?q=g:org.apache.beam).
2. Run the following command, replacing `` and `` with valid values:
`java -jar beam-examples-multi-language-.jar --javaClassLookupAllowlistFile='*'`
#### 2) Set up a Python virtual environment for Beam
1. See [the Python quickstart](https://beam.apache.org/get-started/quickstart-py/)
for more information.
#### 3) Execute the Python pipeline
1. In a new shell, run a pipeline in the **python** directory using a Beam runner that supports
multi-language pipelines.
The Python files contain details about the actual commands to run.
## Using Python transforms from Java
### Sklearn Mnist Classification
Performs image classification on handwritten digits from the [MNIST](https://en.wikipedia.org/wiki/MNIST_database)
database.
Please see [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) for
context and information regarding the corresponding Python pipeline.
Please note that the Java pipeline is
[available in the Beam Java examples module](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/SklearnMnistClassification.java).
#### Setup
* Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in
GCS. An example input is available
[here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv).
* Create a model file that contains the pickled file of a scikit-learn model
trained on MNIST data and store it in GCS. An example model file is available
[here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_model).
This model was generated by by running the program given
[here](https://python-course.eu/machine-learning/training-and-testing-with-mnist.php)
on the
[example input dataset](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv).
* Perform Beam runner specific setup according to instructions
[here](https://beam.apache.org/get-started/quickstart-java/#run-a-pipeline).
Following instructions are for running the pipeline with the Dataflow runner. For other portable runners,
please modify the instructions according to the guidelines
[here](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#run-with-directrunner)
#### Instructions for running the Java pipeline on released Beam (Beam 2.43.0 and later).
* Checkout the Beam examples Maven archetype for the relevant Beam version.
```
export BEAM_VERSION=
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
```
* Run the pipeline.
```
export GCP_PROJECT=
export GCP_BUCKET=
export GCP_REGION=
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
```
* Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of
the digit. The second item is the predicted label of the digit.
```
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
```
#### Instructions for running the Java pipeline at HEAD (Beam 2.41.0 and 2.42.0).
* Activate a new virtual environment following
[these instructions](https://beam.apache.org/get-started/quickstart-py/#create-and-activate-a-virtual-environment).
* 2. Install Apache Beam package with gcp support and the `sklearn` package.
```
pip install apache-beam[gcp]
pip install sklearn
```
* Startup the expansion service
```
python -m apache_beam.runners.portability.expansion_service_main -p --fully_qualified_name_glob "*"
```
* Make sure that Docker is installed and available on your system.
* In a different shell, build and push Python and Java Docker containers.
```
export DOCKER_ROOT=
./gradlew :sdks:python:container:py39:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
docker push $DOCKER_ROOT/beam_python3.9_sdk:latest
./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME
docker push $DOCKER_ROOT/beam_java11_sdk:latest
```
* Run the pipeline using the following Gradle command (this guide assumes Dataflow runner).
Note that we override both the Java and Python SDK harness containers here.
```
export GCP_PROJECT=
export GCP_BUCKET=
export GCP_REGION=
export EXPANSION_SERVICE_PORT=
# This removes any existing output.
gsutil rm gs://$GCP_BUCKET/multi-language-beam/output*
./gradlew :examples:multi-language:sklearnMinstClassification --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output \
--sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \
--sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.9_sdk:latest \
--expansionService=localhost:$EXPANSION_SERVICE_PORT \
--region=${GCP_REGION}"
```
* Inspect the output. Each line has data separated by a comma ",". The first item is the actual label
of the digit. The second item is the predicted label of the digit.
```
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
```
### Python Dataframe Wordcount
This example is covered in the [Java multi-language pipelines quickstart](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/).
The pipeline source code is available at
[PythonDataframeWordCount.java](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java).