Distributed Encoding with Pachyderm

Lokesh Poovaragan
6 min readApr 25, 2021

The Why

You have 1750+ movie files that you want to encode, and Handbrake guesstimates it would be done in about 23 days on your weak i3 laptop (but what if there is a power cut and have to start over?!), you suddenly remember you have 300$ free credits on GCP, now armed with a k8s cluster of a few Nodes, you look for a paradigm to manage Jobs on k8s and find Pachyderm…

realistically though, more like 30 minutes thanks to Pachyderm

The How

If this is your first time trying out Pachyderm, make sure you’ve gone through the local installation or check out PachHub which gives you a free cluster for 4 hours, completing the beginner tutorial would also help.

First, we need a bucket to contain all of the videos we have that is yet to be encoded, for this, we create a Repo, it is conceptually similar to Git’s repos but here we tend to use them as buckets (more on this later),

pachctl create repo videos

Next, let’s add a video so that we can test if the converter pipeline works,

pachctl put file videos@master:aventador.mp4 -f aventador.mp4

To check if it got uploaded successfully, run

pachctl list file videos@master

You should get something like:

NAME           TYPE SIZE     
/aventador.mp4 file 158.8MiB

You can follow along on this by checking out my repo, under simpleencoder

Now we need a pipeline, a Pipeline operates on any files that are added to a repo, in our case, we can have a “converter” pipeline that acts on videos added to the “videos” repo

Let the hackery commence

This script takes an input file, and forks a background task to begin the encoding process, most of the parameters you might want to give to ffmpeg can be edited in Line 11 of this script, video codecs, audio codecs, Constant quality settings, presets, and scaling aspect ratios, etc.

Don’t worry if you don’t understand the /pfs directories, they will begin to make sense after you combine it with the pipeline specification:

Here we declare the name of the pipeline and a description,

The input would be the source repo and a Glob lets you specify how you want to break up the jobs into manageable workpieces, what Pachyderm refers to as Datums, in our case, we say that in the Source folder / pick Every * file as an input to the pipeline, so Pachyderm would bring aventador.mp4 to the /pfs/videos/aventador.mp4 location on the container that is about to process this Datum, once it downloads aventador.mp4 to the container, it runs the entrypoint cmd, and Python creates the ffmpeg background fork process and outputs it to /pfs/out another special directory in Pachyderm that is automatically created with the same name as your Pipeline (converter)

The transform cmd is akin to K8s’s cmd (an entrypoint cmd) and the transform image is the name of your Docker image (it uses Dockerhub by default unless you specify otherwise)

Special thanks to jrottenberg for his jrottenberg/ffmpeg image, without which most of my time would have gone into compiling FFMPEG from source

Putting it all together, from the folder that contains the three files above (simpleencoder in my repo), run:

pachctl create pipeline -f converter.json --build --username <YourDockerUsernameHere>

Wait a couple of seconds (or a few minutes if you changed ffmpeg parameters) and run:

pachctl list file converter@master

You should see something like:

NAME           TYPE SIZE     
/aventador.mkv file 158.8MiB

You can collect your encoded video with:

pachctl get file converter@master:aventador.mkv > aventador.mkv

and ls

aventador.mkv  aventador.mp4

and there you have it! your encoded video!

Bravo! You have now built yourself a “works even if there’s a powercut solution” to distributedly encoding your movies!

All I had to do at this point was write a bash script to pachctl put file (s) to my videos repo and I would get encoded videos on the converter repo! Sweet! But…

seriously, I LOVE Distributed Computing ❤

The Here We Go Again

When you add a bunch of videos at once, each Node depending on how beefy they are can process either a single video or multiple videos parallelly, here’s a rundown of the CPU resource utilization if you’re interested

As I was looking into this, I came across ClusterCode, ClusterCode splits each file into 120-second chunks and then proceeds to encode them on multiple machines, this would mean if your cluster was idle and not processing anything and a single video was submitted, your encoding job would be as fast as the number of Nodes on your cluster (possibly more if you run more than one job per Node!), by splitting the task into workable chunks and letting each node process them, you would be able to better utilize the maximum CPU throughput of your k8s Cluster, in the end, we merge the encoded chunks into a single file and you’re good to go!

Here’s how we would do that on Pachyderm, follow along on the splitencoder folder in my repo

Step 1: The Map

To split each file into chunks, we use ffmpeg with a different set of arguments

and the pipeline configuration:

We read /* (every file) from /pfs/videos and write to a folder with the name of the file so we can track intermittent job state under each folder, this step creates chunks that will be used in the next step

create the pipeline with:
pachctl create pipeline -f splitter.json --build --username <YourDockerUsernameHere>

Step 2: The Encoding

This step is not that different from regular encoding, just that each job only lasts on the 120-second chunk, instead of the entire video,

And the pipeline configuration:

Here, notice we use /*/* as the glob pattern, this translates to, for every movie file, pick every chunk and use that as the input to the encoding job, this pipeline outputs to the merger, the input structure remains the same as the output structure, the only difference is that the chunks on the output side have been encoded

Create the pipeline with:

pachctl create pipeline -f converter.json --build --username <YourDockerUsernameHere>

On a survey that asked people what gives people feelings of power

Step 3: The Reduce

This step merges the encoded chunks and outputs a single file, we do this with a set of arguments to ffmpeg

Lastly, the pipeline configuration:

Here, the glob pattern used is /*/ that translates to, return the folder that contains the encoded chunks so that all the chunks present inside the folder can be merged together to form a single file

Create the pipeline with:

pachctl create pipeline -f merger.json --build --username <YourDockerUsernameHere>

Wait a couple of seconds (or a few minutes if you changed ffmpeg parameters in the encoding step) and run:

pachctl list file merger@master

You should see something like:

NAME           TYPE SIZE     
/aventador.mkv file 158.8MiB

Kudos! You have now successfully split your file into smaller chunks, gotten them encoded independently, and merged them back together, all in less than 100 lines of code! To kick your converter pipeline into overdrive to better utilize your cluster, tweak Parallelism spec in Pachyderm, briefly, there are two options, you can go with,

  1. Constant: Creates a constant number of workers that you specify on the cluster
  2. Coefficient: If you specify 0.5, and have 10 nodes in your cluster, it creates 5 workers(only 5 workers across 10 nodes), setting it to 2, creates 20 workers(2 on each Node)

Final Thoughts

Pachyderm was built with Provenance of a DAG (Directed Acyclic Graph) in mind, a side effect of building your pipelines in this way allows for a unique feature, Pachyderm tracks with the help of the repos it created, all the different files that were created as part of the splitting step, and in the converter pipeline, tracks the output of the encoding jobs, retrying failed jobs automatically, and finally when all the datums have succeeded, submits the merge task, leading to reproducibility and fault tolerance in all steps of the DAG

Another bonus is that when you submit your job by uploading a new file to the videos repo, Pachyderm diffs your commit with its previous commit (HEAD~1) and knows that you might not want to reprocess a job that had already succeeded based on the filename and contents, so it skips it.

If you change a parameter in the encoding step, it would affect the output for all the steps downstream(merger), new videos uploaded will use the newer parameters, but to encode existing videos that you had uploaded, you can add --reprocess to the end of the update pipeline step and that forces Pachyderm to rerun steps 2&3 on all the chunks available in the converter and then usesmerger to merge the freshly encoded chunks

Thanks for reading!

— Loki

--

--

Lokesh Poovaragan

theycallmeloki.com, Developer Advocate at Dra.gd, loves Cake and all things pertaining to remarkable Developer Experience