Real-time video processing is essential for applications like live event broadcasting, video surveillance, and interactive streaming. Scala, combined with FFmpeg, provides a powerful and efficient solution for handling these demanding tasks.

Challenges in real-time video processing

Real-time video processing involves handling continuous streams of video data with minimal latency. Key challenges include:

  • Maintaining low latency
  • Ensuring high throughput
  • Handling various input sources (RTSP, RTMP)
  • Managing resource-intensive operations efficiently

Setting up Scala and FFmpeg

Compatibility matrix

Before getting started, ensure you're using compatible versions:

Component Minimum Version Recommended Version
Scala 3.3.0 3.6.3
Scala CLI 1.0.0 1.6.2
FFmpeg 4.4 6.0

Installing Scala CLI

Scala CLI provides a modern, convenient way to work with Scala:

# For Ubuntu/Debian
curl -sSLf https://scala-cli.virtuslab.org/get | sh
source ~/.profile  # Required after installation

# For macOS
brew install Virtuslab/scala-cli/scala-cli

# For Windows
winget install virtuslab.scalacli

Installing FFmpeg

FFmpeg is required for video processing:

# Ubuntu/Debian
sudo apt update
sudo apt install ffmpeg
# Current version in Ubuntu repositories: 6.0

# macOS (homebrew)
brew install ffmpeg

# Verify installation
ffmpeg -version

FFmpeg basics

FFmpeg is a versatile multimedia framework capable of transcoding, resizing, watermarking, and streaming video. Here are some basic commands:

  • Transcoding:
ffmpeg -i input.mp4 -c:v libx264 output.mp4
  • Resizing:
ffmpeg -i input.mp4 -vf scale=1280:720 output.mp4
  • Adding watermark:
ffmpeg -i input.mp4 -i watermark.png -filter_complex "overlay=10:10" output.mp4

Integrating FFmpeg with Scala

Scala can execute FFmpeg commands using the Process API. Here's a robust implementation with proper error handling:

import scala.sys.process._
import scala.util.{Try, Success, Failure}

def runFFmpeg(args: Seq[String]): Either[String, Int] = {
  Try(Process("ffmpeg" +: args).!) match {
    case Success(0) => Right(0)
    case Success(code) => Left(s"FFmpeg failed with exit code $code")
    case Failure(e) => Left(s"FFmpeg execution failed: ${e.getMessage}")
  }
}

// Example usage
val result = runFFmpeg(Seq("-i", "input.mp4", "-c:v", "libx264", "output.mp4"))
result match {
  case Right(_) => println("Processing successful")
  case Left(error) => println(s"Error: $error")
}

Real-time video stream processing

Example 1: reading from rtsp/rtmp

To read from a network source with proper error handling:

def processStream(streamUrl: String, outputPath: String): Either[String, Unit] = {
  val args = Seq(
    "-i", streamUrl,
    "-c:v", "copy",
    "-c:a", "copy",
    outputPath
  )

  runFFmpeg(args) match {
    case Right(_) => Right(())
    case Left(error) => Left(s"Stream processing failed: $error")
  }
}

// Example usage
val streamUrl = "rtsp://example.com/live"
processStream(streamUrl, "output.mp4")

Example 2: adding timestamp overlay

Real-time timestamp overlay with error handling:

def addTimestamp(streamUrl: String, outputPath: String): Either[String, Unit] = {
  val args = Seq(
    "-i", streamUrl,
    "-vf", "drawtext=text='%{localtime}':fontcolor=white:fontsize=24:x=10:y=10",
    "-c:v", "libx264",
    "-c:a", "copy",
    outputPath
  )

  runFFmpeg(args) match {
    case Right(_) => Right(())
    case Left(error) => Left(s"Timestamp overlay failed: $error")
  }
}

Example 3: output to HLS or MPEG-DASH

Streaming output with proper configuration:

def createHLSStream(streamUrl: String, outputPath: String): Either[String, Unit] = {
  val args = Seq(
    "-i", streamUrl,
    "-c:v", "libx264",
    "-c:a", "aac",
    "-hls_time", "4",
    "-hls_playlist_type", "event",
    "-hls_segment_filename", s"${outputPath}_%03d.ts",
    s"$outputPath.m3u8"
  )

  runFFmpeg(args) match {
    case Right(_) => Right(())
    case Left(error) => Left(s"HLS streaming failed: $error")
  }
}

Error handling and resource management

When working with external processes, proper resource management is crucial:

import scala.sys.process._
import scala.util.{Try, Success, Failure}
import java.io.{File, IOException}

def processVideoWithCleanup(input: String, output: String): Either[String, Unit] = {
  val tempDir = new File("temp_processing")
  tempDir.mkdir()

  try {
    val args = Seq(
      "-i", input,
      "-c:v", "libx264",
      s"${tempDir.getPath}/temp_output.mp4"
    )

    runFFmpeg(args) match {
      case Right(_) =>
        // Move from temp to final location
        val tempFile = new File(s"${tempDir.getPath}/temp_output.mp4")
        val outputFile = new File(output)
        if (tempFile.renameTo(outputFile)) Right(())
        else Left("Failed to move processed file to final location")

      case Left(error) => Left(error)
    }
  } catch {
    case e: IOException => Left(s"I/O error: ${e.getMessage}")
    case e: Exception => Left(s"Unexpected error: ${e.getMessage}")
  } finally {
    // Clean up temp files
    if (tempDir.exists()) {
      tempDir.listFiles().foreach(_.delete())
      tempDir.delete()
    }
  }
}

Testing FFmpeg integrations

Testing is essential for robust video processing applications:

import org.scalatest.funsuite.AnyFunSuite

class FFmpegIntegrationTest extends AnyFunSuite {
  test("ffmpeg is installed and accessible") {
    val result = runFFmpeg(Seq("-version"))
    assert(result.isRight)
  }

  test("can process a test video file") {
    val testFile = "test_resources/sample.mp4"
    val outputFile = "test_output/result.mp4"
    val result = runFFmpeg(Seq(
      "-i", testFile,
      "-t", "5", // Process only first 5 seconds
      "-c:v", "libx264",
      outputFile
    ))
    assert(result.isRight)
    assert(new File(outputFile).exists())
  }
}

Optimizing performance

Use Scala's concurrency features to improve performance:

import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

def processMultipleStreams(streams: List[(String, String)]): Future[List[Either[String, Unit]]] = {
  val futures = streams.map { case (input, output) =>
    Future {
      val args = Seq(
        "-i", input,
        "-c:v", "libx264",
        output
      )
      runFFmpeg(args) match {
        case Right(_) => Right(())
        case Left(error) => Left(s"Processing $input failed: $error")
      }
    }
  }

  Future.sequence(futures)
}

// Example usage
val streams = List(
  ("rtsp://camera1.example.com/live", "camera1_output.mp4"),
  ("rtsp://camera2.example.com/live", "camera2_output.mp4")
)

val results = Await.result(processMultipleStreams(streams), 10.minutes)
results.foreach {
  case Right(_) => println("Stream processed successfully")
  case Left(error) => println(s"Error: $error")
}

Troubleshooting common issues

When working with FFmpeg and Scala, you might encounter these common issues:

  1. FFmpeg not found in PATH: Ensure FFmpeg is properly installed and accessible in your system's PATH.

  2. Stream connection failures: Network issues can cause RTSP/RTMP connections to fail. Implement retry logic:

def processStreamWithRetry(streamUrl: String, output: String, maxRetries: Int = 3): Either[String, Unit] = {
  var attempts = 0
  var result: Either[String, Unit] = Left("Not attempted yet")

  while (attempts < maxRetries && result.isLeft) {
    attempts += 1
    result = processStream(streamUrl, output)
    if (result.isLeft && attempts < maxRetries) {
      println(s"Attempt $attempts failed, retrying...")
      Thread.sleep(2000) // Wait before retry
    }
  }

  result
}
  1. Resource exhaustion: Video processing is resource-intensive. Monitor system resources and implement throttling if needed.

Advanced techniques

FFmpeg supports custom filters and effects. For example, applying a blur filter with proper error handling:

def applyBlurFilter(input: String, output: String, blurAmount: Int = 10): Either[String, Unit] = {
  val args = Seq(
    "-i", input,
    "-vf", s"boxblur=$blurAmount",
    "-c:v", "libx264",
    output
  )

  runFFmpeg(args) match {
    case Right(_) => Right(())
    case Left(error) => Left(s"Blur filter failed: $error")
  }
}

// Example usage
applyBlurFilter("input.mp4", "output_blurred.mp4", 15)

By leveraging Scala's robust features and FFmpeg's powerful capabilities, developers can build efficient and reliable real-time video processing applications. Transloadit leverages FFmpeg in several of its robots, such as /video/encode and /video/adaptive.