Distributed Encoding with Pachyderm
The Why
You have 1750+ movie files that you want to encode, and Handbrake guesstimates it would be done in about 23 days on your weak i3 laptop (but what if there is a power cut and have to start over?!), you suddenly remember you have 300$ free credits on GCP, now armed with a k8s cluster of a few Nodes, you look for a paradigm to manage Jobs on k8s and find Pachyderm…
The How
If this is your first time trying out Pachyderm, make sure you’ve gone through the local installation or check out PachHub which gives you a free cluster for 4 hours, completing the beginner tutorial would also help.
First, we need a bucket to contain all of the videos we have that is yet to be encoded, for this, we create a Repo, it is conceptually similar to Git’s repos but here we tend to use them as buckets (more on this later),
pachctl create repo videos
Next, let’s add a video so that we can test if the converter pipeline works,
pachctl put file videos@master:aventador.mp4 -f aventador.mp4
To check if it got uploaded successfully, run
pachctl list file videos@master
You should get something like:
NAME TYPE SIZE
/aventador.mp4 file 158.8MiB
You can follow along on this by checking out my repo, under simpleencoder
Now we need a pipeline, a Pipeline operates on any files that are added to a repo, in our case, we can have a “converter” pipeline that acts on videos added to the “videos” repo
Let the hackery commence
This script takes an input file, and forks a background task to begin the encoding process, most of the parameters you might want to give to ffmpeg can be edited in Line 11 of this script, video codecs, audio codecs, Constant quality settings, presets, and scaling aspect ratios, etc.
Don’t worry if you don’t understand the /pfs
directories, they will begin to make sense after you combine it with the pipeline specification:
Here we declare the name of the pipeline and a description,
The input
would be the source repo and a Glob lets you specify how you want to break up the jobs into manageable workpieces, what Pachyderm refers to as Datums, in our case, we say that in the Source folder /
pick Every *
file as an input to the pipeline, so Pachyderm would bring aventador.mp4
to the /pfs/videos/aventador.mp4
location on the container that is about to process this Datum, once it downloads aventador.mp4
to the container, it runs the entrypoint cmd, and Python creates the ffmpeg background fork process and outputs it to /pfs/out
another special directory in Pachyderm that is automatically created with the same name as your Pipeline (converter)
The transform
cmd
is akin to K8s’s cmd (an entrypoint cmd) and the transform
image
is the name of your Docker image (it uses Dockerhub by default unless you specify otherwise)
Special thanks to jrottenberg
for his jrottenberg/ffmpeg
image, without which most of my time would have gone into compiling FFMPEG from source
Putting it all together, from the folder that contains the three files above (simpleencoder in my repo), run:
pachctl create pipeline -f converter.json --build --username <YourDockerUsernameHere>
Wait a couple of seconds (or a few minutes if you changed ffmpeg parameters) and run:
pachctl list file converter@master
You should see something like:
NAME TYPE SIZE
/aventador.mkv file 158.8MiB
You can collect your encoded video with:
pachctl get file converter@master:aventador.mkv > aventador.mkv
and ls
aventador.mkv aventador.mp4
and there you have it! your encoded video!
Bravo! You have now built yourself a “works even if there’s a powercut solution” to distributedly encoding your movies!
All I had to do at this point was write a bash script to pachctl put file
(s) to my videos
repo and I would get encoded videos on the converter
repo! Sweet! But…
The Here We Go Again
When you add a bunch of videos at once, each Node depending on how beefy they are can process either a single video or multiple videos parallelly, here’s a rundown of the CPU resource utilization if you’re interested
As I was looking into this, I came across ClusterCode, ClusterCode splits each file into 120-second chunks and then proceeds to encode them on multiple machines, this would mean if your cluster was idle and not processing anything and a single video was submitted, your encoding job would be as fast as the number of Nodes on your cluster (possibly more if you run more than one job per Node!), by splitting the task into workable chunks and letting each node process them, you would be able to better utilize the maximum CPU throughput of your k8s Cluster, in the end, we merge the encoded chunks into a single file and you’re good to go!
Here’s how we would do that on Pachyderm, follow along on the splitencoder folder in my repo
Step 1: The Map
To split each file into chunks, we use ffmpeg with a different set of arguments
and the pipeline configuration:
We read /*
(every file) from /pfs/videos
and write to a folder with the name of the file so we can track intermittent job state under each folder, this step creates chunks that will be used in the next step
create the pipeline with: pachctl create pipeline -f splitter.json --build --username <YourDockerUsernameHere>
Step 2: The Encoding
This step is not that different from regular encoding, just that each job only lasts on the 120-second chunk, instead of the entire video,
And the pipeline configuration:
Here, notice we use /*/*
as the glob pattern, this translates to, for every movie file, pick every chunk
and use that as the input to the encoding job, this pipeline outputs to the merger, the input structure remains the same as the output structure, the only difference is that the chunks on the output side have been encoded
Create the pipeline with:
pachctl create pipeline -f converter.json --build --username <YourDockerUsernameHere>
Step 3: The Reduce
This step merges the encoded chunks and outputs a single file, we do this with a set of arguments to ffmpeg
Lastly, the pipeline configuration:
Here, the glob pattern used is /*/
that translates to, return the folder that contains the encoded chunks
so that all the chunks present inside the folder can be merged together to form a single file
Create the pipeline with:
pachctl create pipeline -f merger.json --build --username <YourDockerUsernameHere>
Wait a couple of seconds (or a few minutes if you changed ffmpeg parameters in the encoding step) and run:
pachctl list file merger@master
You should see something like:
NAME TYPE SIZE
/aventador.mkv file 158.8MiB
Kudos! You have now successfully split your file into smaller chunks, gotten them encoded independently, and merged them back together, all in less than 100 lines of code! To kick your converter pipeline into overdrive to better utilize your cluster, tweak Parallelism spec in Pachyderm, briefly, there are two options, you can go with,
- Constant: Creates a constant number of workers that you specify on the cluster
- Coefficient: If you specify 0.5, and have 10 nodes in your cluster, it creates 5 workers(only 5 workers across 10 nodes), setting it to 2, creates 20 workers(2 on each Node)
Final Thoughts
Pachyderm was built with Provenance of a DAG (Directed Acyclic Graph) in mind, a side effect of building your pipelines in this way allows for a unique feature, Pachyderm tracks with the help of the repos it created, all the different files that were created as part of the splitting step, and in the converter pipeline, tracks the output of the encoding jobs, retrying failed jobs automatically, and finally when all the datums have succeeded, submits the merge task, leading to reproducibility and fault tolerance in all steps of the DAG
Another bonus is that when you submit your job by uploading a new file to the videos repo, Pachyderm diffs your commit with its previous commit (HEAD~1) and knows that you might not want to reprocess a job that had already succeeded based on the filename and contents, so it skips it.
If you change a parameter in the encoding step, it would affect the output for all the steps downstream(merger), new videos uploaded will use the newer parameters, but to encode existing videos that you had uploaded, you can add --reprocess
to the end of the update pipeline
step and that forces Pachyderm to rerun steps 2&3 on all the chunks available in the converter
and then usesmerger
to merge the freshly encoded chunks
Thanks for reading!
— Loki