FFmpeg with Scala: real-time video

Real-time video processing is essential for applications like live event broadcasting, video surveillance, and interactive streaming. Scala, combined with FFmpeg, provides a powerful and efficient solution for handling these demanding tasks.
Challenges in real-time video processing
Real-time video processing involves handling continuous streams of video data with minimal latency. Key challenges include:
- Maintaining low latency
- Ensuring high throughput
- Handling various input sources (RTSP, RTMP)
- Managing resource-intensive operations efficiently
Setting up Scala and FFmpeg
Compatibility matrix
Before getting started, ensure you're using compatible versions:
Component | Minimum Version | Recommended Version |
---|---|---|
Scala | 3.3.0 | 3.6.3 |
Scala CLI | 1.0.0 | 1.6.2 |
FFmpeg | 4.4 | 6.0 |
Installing Scala CLI
Scala CLI provides a modern, convenient way to work with Scala:
# For Ubuntu/Debian
curl -sSLf https://scala-cli.virtuslab.org/get | sh
source ~/.profile # Required after installation
# For macOS
brew install Virtuslab/scala-cli/scala-cli
# For Windows
winget install virtuslab.scalacli
Installing FFmpeg
FFmpeg is required for video processing:
# Ubuntu/Debian
sudo apt update
sudo apt install ffmpeg
# Current version in Ubuntu repositories: 6.0
# macOS (homebrew)
brew install ffmpeg
# Verify installation
ffmpeg -version
FFmpeg basics
FFmpeg is a versatile multimedia framework capable of transcoding, resizing, watermarking, and streaming video. Here are some basic commands:
- Transcoding:
ffmpeg -i input.mp4 -c:v libx264 output.mp4
- Resizing:
ffmpeg -i input.mp4 -vf scale=1280:720 output.mp4
- Adding watermark:
ffmpeg -i input.mp4 -i watermark.png -filter_complex "overlay=10:10" output.mp4
Integrating FFmpeg with Scala
Scala can execute FFmpeg commands using the Process API. Here's a robust implementation with proper error handling:
import scala.sys.process._
import scala.util.{Try, Success, Failure}
def runFFmpeg(args: Seq[String]): Either[String, Int] = {
Try(Process("ffmpeg" +: args).!) match {
case Success(0) => Right(0)
case Success(code) => Left(s"FFmpeg failed with exit code $code")
case Failure(e) => Left(s"FFmpeg execution failed: ${e.getMessage}")
}
}
// Example usage
val result = runFFmpeg(Seq("-i", "input.mp4", "-c:v", "libx264", "output.mp4"))
result match {
case Right(_) => println("Processing successful")
case Left(error) => println(s"Error: $error")
}
Real-time video stream processing
Example 1: reading from rtsp/rtmp
To read from a network source with proper error handling:
def processStream(streamUrl: String, outputPath: String): Either[String, Unit] = {
val args = Seq(
"-i", streamUrl,
"-c:v", "copy",
"-c:a", "copy",
outputPath
)
runFFmpeg(args) match {
case Right(_) => Right(())
case Left(error) => Left(s"Stream processing failed: $error")
}
}
// Example usage
val streamUrl = "rtsp://example.com/live"
processStream(streamUrl, "output.mp4")
Example 2: adding timestamp overlay
Real-time timestamp overlay with error handling:
def addTimestamp(streamUrl: String, outputPath: String): Either[String, Unit] = {
val args = Seq(
"-i", streamUrl,
"-vf", "drawtext=text='%{localtime}':fontcolor=white:fontsize=24:x=10:y=10",
"-c:v", "libx264",
"-c:a", "copy",
outputPath
)
runFFmpeg(args) match {
case Right(_) => Right(())
case Left(error) => Left(s"Timestamp overlay failed: $error")
}
}
Example 3: output to HLS or MPEG-DASH
Streaming output with proper configuration:
def createHLSStream(streamUrl: String, outputPath: String): Either[String, Unit] = {
val args = Seq(
"-i", streamUrl,
"-c:v", "libx264",
"-c:a", "aac",
"-hls_time", "4",
"-hls_playlist_type", "event",
"-hls_segment_filename", s"${outputPath}_%03d.ts",
s"$outputPath.m3u8"
)
runFFmpeg(args) match {
case Right(_) => Right(())
case Left(error) => Left(s"HLS streaming failed: $error")
}
}
Error handling and resource management
When working with external processes, proper resource management is crucial:
import scala.sys.process._
import scala.util.{Try, Success, Failure}
import java.io.{File, IOException}
def processVideoWithCleanup(input: String, output: String): Either[String, Unit] = {
val tempDir = new File("temp_processing")
tempDir.mkdir()
try {
val args = Seq(
"-i", input,
"-c:v", "libx264",
s"${tempDir.getPath}/temp_output.mp4"
)
runFFmpeg(args) match {
case Right(_) =>
// Move from temp to final location
val tempFile = new File(s"${tempDir.getPath}/temp_output.mp4")
val outputFile = new File(output)
if (tempFile.renameTo(outputFile)) Right(())
else Left("Failed to move processed file to final location")
case Left(error) => Left(error)
}
} catch {
case e: IOException => Left(s"I/O error: ${e.getMessage}")
case e: Exception => Left(s"Unexpected error: ${e.getMessage}")
} finally {
// Clean up temp files
if (tempDir.exists()) {
tempDir.listFiles().foreach(_.delete())
tempDir.delete()
}
}
}
Testing FFmpeg integrations
Testing is essential for robust video processing applications:
import org.scalatest.funsuite.AnyFunSuite
class FFmpegIntegrationTest extends AnyFunSuite {
test("ffmpeg is installed and accessible") {
val result = runFFmpeg(Seq("-version"))
assert(result.isRight)
}
test("can process a test video file") {
val testFile = "test_resources/sample.mp4"
val outputFile = "test_output/result.mp4"
val result = runFFmpeg(Seq(
"-i", testFile,
"-t", "5", // Process only first 5 seconds
"-c:v", "libx264",
outputFile
))
assert(result.isRight)
assert(new File(outputFile).exists())
}
}
Optimizing performance
Use Scala's concurrency features to improve performance:
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
def processMultipleStreams(streams: List[(String, String)]): Future[List[Either[String, Unit]]] = {
val futures = streams.map { case (input, output) =>
Future {
val args = Seq(
"-i", input,
"-c:v", "libx264",
output
)
runFFmpeg(args) match {
case Right(_) => Right(())
case Left(error) => Left(s"Processing $input failed: $error")
}
}
}
Future.sequence(futures)
}
// Example usage
val streams = List(
("rtsp://camera1.example.com/live", "camera1_output.mp4"),
("rtsp://camera2.example.com/live", "camera2_output.mp4")
)
val results = Await.result(processMultipleStreams(streams), 10.minutes)
results.foreach {
case Right(_) => println("Stream processed successfully")
case Left(error) => println(s"Error: $error")
}
Troubleshooting common issues
When working with FFmpeg and Scala, you might encounter these common issues:
-
FFmpeg not found in PATH: Ensure FFmpeg is properly installed and accessible in your system's PATH.
-
Stream connection failures: Network issues can cause RTSP/RTMP connections to fail. Implement retry logic:
def processStreamWithRetry(streamUrl: String, output: String, maxRetries: Int = 3): Either[String, Unit] = {
var attempts = 0
var result: Either[String, Unit] = Left("Not attempted yet")
while (attempts < maxRetries && result.isLeft) {
attempts += 1
result = processStream(streamUrl, output)
if (result.isLeft && attempts < maxRetries) {
println(s"Attempt $attempts failed, retrying...")
Thread.sleep(2000) // Wait before retry
}
}
result
}
- Resource exhaustion: Video processing is resource-intensive. Monitor system resources and implement throttling if needed.
Advanced techniques
FFmpeg supports custom filters and effects. For example, applying a blur filter with proper error handling:
def applyBlurFilter(input: String, output: String, blurAmount: Int = 10): Either[String, Unit] = {
val args = Seq(
"-i", input,
"-vf", s"boxblur=$blurAmount",
"-c:v", "libx264",
output
)
runFFmpeg(args) match {
case Right(_) => Right(())
case Left(error) => Left(s"Blur filter failed: $error")
}
}
// Example usage
applyBlurFilter("input.mp4", "output_blurred.mp4", 15)
By leveraging Scala's robust features and FFmpeg's powerful capabilities, developers can build efficient and reliable real-time video processing applications. Transloadit leverages FFmpeg in several of its robots, such as /video/encode and /video/adaptive.