See More

# Example multi-language pipelines This project provides examples of Apache Beam [multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines): ## Using Java transforms from Python * **python/wordcount_external** - A Python pipeline that runs the Word Count workflow using three external Java SchemaTransforms. This example demonstrates the updated `ExternalTransformProvider` API. #### _Outdated examples:_ * **python/addprefix** - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input. * **python/javacount** - A Python pipeline that counts words using the Java `Count.perElement()` transform. * **python/javadatagenerator** - A Python pipeline that produces a set of strings generated from Java. This example demonstrates the `JavaExternalTransform` API. ### Instructions for running the pipelines #### 1) Start the expansion service 1. Download the latest 'beam-examples-multi-language' JAR. Starting with Apache Beam 2.36.0, you can find it in [the Maven Central Repository](https://search.maven.org/search?q=g:org.apache.beam). 2. Run the following command, replacing `` and `` with valid values: `java -jar beam-examples-multi-language-.jar --javaClassLookupAllowlistFile='*'` #### 2) Set up a Python virtual environment for Beam 1. See [the Python quickstart](https://beam.apache.org/get-started/quickstart-py/) for more information. #### 3) Execute the Python pipeline 1. In a new shell, run a pipeline in the **python** directory using a Beam runner that supports multi-language pipelines. The Python files contain details about the actual commands to run. ## Using Python transforms from Java ### Sklearn Mnist Classification Performs image classification on handwritten digits from the [MNIST](https://en.wikipedia.org/wiki/MNIST_database) database. Please see [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) for context and information regarding the corresponding Python pipeline. Please note that the Java pipeline is [available in the Beam Java examples module](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/SklearnMnistClassification.java). #### Setup * Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in GCS. An example input is available [here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv). * Create a model file that contains the pickled file of a scikit-learn model trained on MNIST data and store it in GCS. An example model file is available [here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_model). This model was generated by by running the program given [here](https://python-course.eu/machine-learning/training-and-testing-with-mnist.php) on the [example input dataset](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv). * Perform Beam runner specific setup according to instructions [here](https://beam.apache.org/get-started/quickstart-java/#run-a-pipeline). Following instructions are for running the pipeline with the Dataflow runner. For other portable runners, please modify the instructions according to the guidelines [here](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#run-with-directrunner) #### Instructions for running the Java pipeline on released Beam (Beam 2.43.0 and later). * Checkout the Beam examples Maven archetype for the relevant Beam version. ``` export BEAM_VERSION= mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=$BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=multi-language-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false ``` * Run the pipeline. ``` export GCP_PROJECT= export GCP_BUCKET= export GCP_REGION= mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \ -Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \ --region=$GCP_REGION \ --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \ --output=gs://$GCP_BUCKET/multi-language-beam/output" \ -Pdataflow-runner ``` * Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit. ``` gsutil cat gs://$GCP_BUCKET/multi-language-beam/output* ``` #### Instructions for running the Java pipeline at HEAD (Beam 2.41.0 and 2.42.0). * Activate a new virtual environment following [these instructions](https://beam.apache.org/get-started/quickstart-py/#create-and-activate-a-virtual-environment). * 2. Install Apache Beam package with gcp support and the `sklearn` package. ``` pip install apache-beam[gcp] pip install sklearn ``` * Startup the expansion service ``` python -m apache_beam.runners.portability.expansion_service_main -p --fully_qualified_name_glob "*" ``` * Make sure that Docker is installed and available on your system. * In a different shell, build and push Python and Java Docker containers. ``` export DOCKER_ROOT= ./gradlew :sdks:python:container:py39:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest docker push $DOCKER_ROOT/beam_python3.9_sdk:latest ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME docker push $DOCKER_ROOT/beam_java11_sdk:latest ``` * Run the pipeline using the following Gradle command (this guide assumes Dataflow runner). Note that we override both the Java and Python SDK harness containers here. ``` export GCP_PROJECT= export GCP_BUCKET= export GCP_REGION= export EXPANSION_SERVICE_PORT= # This removes any existing output. gsutil rm gs://$GCP_BUCKET/multi-language-beam/output* ./gradlew :examples:multi-language:sklearnMinstClassification --args=" \ --runner=DataflowRunner \ --project=$GCP_PROJECT \ --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \ --output=gs://$GCP_BUCKET/multi-language-beam/output \ --sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \ --sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.9_sdk:latest \ --expansionService=localhost:$EXPANSION_SERVICE_PORT \ --region=${GCP_REGION}" ``` * Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit. ``` gsutil cat gs://$GCP_BUCKET/multi-language-beam/output* ``` ### Python Dataframe Wordcount This example is covered in the [Java multi-language pipelines quickstart](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/). The pipeline source code is available at [PythonDataframeWordCount.java](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java).