By Rainie Li | Software Engineer, Stream Processing Platform Team
Background
At Pinterest, stream processing allows us to unlock value from real time data for pinners and partners. The Stream Processing Platform team is working on building a reliable and scalable platform to support many critical streaming applications including real-time experiment analytics and real time machine learning signals.
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It provides features including exactly-once guarantees, low latency, high throughput, and powerful computation model. At Pinterest, we adopt Flink as the unified streaming processing engine.
Requirements
Standardize Flink Build
At Pinterest, we use Bazel as a build system. We need a standardized Bazel rule to build all Flink jobs without changing Makefiles. Once build is done, instead of asking users to copy Flink jars to YARN clusters, jars should be automatically uploaded to remote storage.
Deployment and Operations History
Users used to copy Flink jars to YARN clusters and manually run commands. It was hard to track previous execution histories if we needed to recover failed jobs. We need to provide standard Flink operations such as launching, killing, triggering savepoint, and resuming jobs from the most recent savepoint.
Job Deduplication
Flink applications are deployed as services, therefore one instance should be running at a time for each Flink application. We need to prevent cases when users accidentally deploy twice for the same job, meaning both instances might write to the same Kafka topic. This would mean double writes to Kafka and could affect downstream jobs.
Deployment Framework
We built our Flink deployment framework on top of Bazel, Hermez (internal continuous deployment platform), Job Submission Service (internal service), and YARN clusters.
Figure 1. Deployment high level architecture
Create Bazel BUILD file
The BUILD file needs to contain load(“flink_release”). Users also need to insert a Bazel rule like this:
Define Hermez Deployment File
Hermez is the Pinterest Continuous Deployment System. In order to launch a Flink job with Hermez, users need to create a Hermez.yml file. This file contains information including which YARN cluster Flink jobs to run in, what YARN parameters to use, what resources to use, etc. For each instance of Flink job, users should set up a separate YAML file. For example, if users run their jobs in dev, staging, and prod environments, they will need to have three different YAML files (one for each environment).
Here’s an example of yml file:
Automatically Flink Job Building
The following numbers are referring to steps in Figure 1: Deployment high level architecture
Whenever a user lands a change to Git repo, Jenkins job will be triggered to build Flink job JARs (1). Jenkins job will follow flink_relase rules that are described in the BUILD file to build Flink JAR and upload it to the S3 bucket (3). Meanwhile, it will upload deployment related Hermez YAML files to Artifactory (2). Hermez monitors Artifactory; when it sees a new yml file, it will display it on UI to allow users to launch a job using that yml (5).
Flink Job Launching
When users launch a Flink job, Hermez converts the yml file into a JSON and submits it to Job Submission Service (JSS) (6). JSS is a service maintained by Pinterest that has the ability to schedule and launch Flink jobs to YARN clusters.
JSS examines the request and ensures that Flink JARs and Flink job state exist in S3 (7). If everything is alright, JSS will first launch a shell-runner job which will execute a command on a YARN cluster cluster (8). The shell-runner job downloads the Flink job’s JAR from S3 and then kicks off the actual Flink job using the configuration provided by JSS (9). The reason we add a shell-runner job is to keep JSS as a thin layer without dealing with different compute engine clients (Flink, Spark, MapReduce, etc.) and different configurations for each cluster.
JSS Deduplication
When resuming a Flink job, we provide several options including resume from most recent savepoint or checkpoint, fresh state, and specify a savepoint or checkpoint path. Job deduplication features ensure that there is only one instance of your Flink job running at a time.
The way job deduplication works is that each job has a unique name when a job is submitted. If there is already an instance of the job running, JSS will trigger a safepoint and stop it first, then submit the new job. If the stop request fails because savepoint fails, then the submitted request will fail and the running instance remains running. If there is one deployment in progress, the new job submission would be rejected
Flink Job Configuration Hotfix
Due to Flink configuration being packaged together with Flink job binary, users used to check in config changes to Repo and rebuild the package. This whole process could take more than 10 minutes. This can be a problem if we would like to quickly adjust parameters during incidents. For example, when Flink jobs failed in production due to lack of resources, we used to go through the entire build process to rollout resource config changes. After the incidents got resolved, we needed to check in another change to roll back these configs. To speed up this process, we provide a hotfix feature on Hermez to overwrite Flink job configuration without code change. Users can adjust Flink configuration values during deployment. Behind the scenes, Hermez will directly overwrite these values in ymls which Hermez read from Artifactory.
What’s Next
Reducing Deployment Latency
The current approach launches shell-runner first. Then, shell-runner launches Flink jobs to YARN clusters which could increase latency. We plan to improve this process to reduce end-to-end Flink job launch time.
Automatically Job Failover
To further improve platform and Flink application availability, we built YARN clusters in multiple AWS Availability Zones (AZ) to provide backup when one cluster or one AZ become unavailable. We are also building a service that could automatically detect any cluster failure and failover failed jobs to backup clusters in different AZs or detect application failures and restart the application automatically.
Stay tuned!
Acknowledgments
Thanks to Steven Bairos-Novak and Yu Yang for their countless contributions. Thanks Ang Zhang for updating this blog. This project is a joint effort across multiple teams at Pinterest. Thanks to the Engineering Productivity Team for Hermez support.