#!/usr/bin/env python import operator from optparse import OptionGroup import sys from time import time from digress.cli import Dispatcher as _Dispatcher from digress.errors import ComparisonError, FailedTestError, DisabledTestError from digress.testing import depends, comparer, Fixture, Case from digress.comparers import compare_pass from digress.scm import git as x264git from subprocess import Popen, PIPE, STDOUT import os import re import shlex import inspect from random import randrange, seed from math import ceil from itertools import imap, izip os.chdir(os.path.join(os.path.dirname(__file__), "..")) # options OPTIONS = [ [ "--tune %s" % t for t in ("film", "zerolatency") ], ("", "--intra-refresh"), ("", "--no-cabac"), ("", "--interlaced"), ("", "--slice-max-size 1000"), ("", "--frame-packing 5"), [ "--preset %s" % p for p in ("ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow", "placebo") ] ] # end options def compare_yuv_output(width, height): def _compare_yuv_output(file_a, file_b): size_a = os.path.getsize(file_a) size_b = os.path.getsize(file_b) if size_a != size_b: raise ComparisonError("%s is not the same size as %s" % ( file_a, file_b )) BUFFER_SIZE = 8196 offset = 0 with open(file_a) as f_a: with open(file_b) as f_b: for chunk_a, chunk_b in izip( imap( lambda i: f_a.read(BUFFER_SIZE), xrange(size_a // BUFFER_SIZE + 1) ), imap( lambda i: f_b.read(BUFFER_SIZE), xrange(size_b // BUFFER_SIZE + 1) ) ): chunk_size = len(chunk_a) if chunk_a != chunk_b: for i in xrange(chunk_size): if chunk_a[i] != chunk_b[i]: # calculate the macroblock, plane and frame from the offset offs = offset + i y_plane_area = width * height u_plane_area = y_plane_area + y_plane_area * 0.25 v_plane_area = u_plane_area + y_plane_area * 0.25 pixel = offs % v_plane_area frame = offs // v_plane_area if pixel < y_plane_area: plane = "Y" pixel_x = pixel % width pixel_y = pixel // width macroblock = (ceil(pixel_x / 16.0), ceil(pixel_y / 16.0)) elif pixel < u_plane_area: plane = "U" pixel -= y_plane_area pixel_x = pixel % width pixel_y = pixel // width macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0)) else: plane = "V" pixel -= u_plane_area pixel_x = pixel % width pixel_y = pixel // width macroblock = (ceil(pixel_x / 8.0), ceil(pixel_y / 8.0)) macroblock = tuple([ int(x) for x in macroblock ]) raise ComparisonError("%s differs from %s at frame %d, " \ "macroblock %s on the %s plane (offset %d)" % ( file_a, file_b, frame, macroblock, plane, offs) ) offset += chunk_size return _compare_yuv_output def program_exists(program): def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None class x264(Fixture): scm = x264git class Compile(Case): @comparer(compare_pass) def test_configure(self): Popen([ "make", "distclean" ], stdout=PIPE, stderr=STDOUT).communicate() configure_proc = Popen([ "./configure" ] + self.fixture.dispatcher.configure, stdout=PIPE, stderr=STDOUT) output = configure_proc.communicate()[0] if configure_proc.returncode != 0: raise FailedTestError("configure failed: %s" % output.replace("\n", " ")) @depends("configure") @comparer(compare_pass) def test_make(self): make_proc = Popen([ "make", "-j5" ], stdout=PIPE, stderr=STDOUT) output = make_proc.communicate()[0] if make_proc.returncode != 0: raise FailedTestError("make failed: %s" % output.replace("\n", " ")) _dimension_pattern = re.compile(r"\w+ [[]info[]]: (\d+)x(\d+)[pi] \d+:\d+ @ \d+/\d+ fps [(][vc]fr[)]") def _YUVOutputComparisonFactory(): class YUVOutputComparison(Case): _dimension_pattern = _dimension_pattern depends = [ Compile ] options = [] def __init__(self): for name, meth in inspect.getmembers(self): if name[:5] == "test_" and name[5:] not in self.fixture.dispatcher.yuv_tests: delattr(self.__class__, name) def _run_x264(self): x264_proc = Popen([ "./x264", "-o", "%s.264" % self.fixture.dispatcher.video, "--dump-yuv", "x264-output.yuv" ] + self.options + [ self.fixture.dispatcher.video ], stdout=PIPE, stderr=STDOUT) output = x264_proc.communicate()[0] if x264_proc.returncode != 0: raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " ")) matches = _dimension_pattern.match(output) return (int(matches.group(1)), int(matches.group(2))) @comparer(compare_pass) def test_jm(self): if not program_exists("ldecod"): raise DisabledTestError("jm unavailable") try: runres = self._run_x264() jm_proc = Popen([ "ldecod", "-i", "%s.264" % self.fixture.dispatcher.video, "-o", "jm-output.yuv" ], stdout=PIPE, stderr=STDOUT) output = jm_proc.communicate()[0] if jm_proc.returncode != 0: raise FailedTestError("jm did not complete properly: %s" % output.replace("\n", " ")) try: compare_yuv_output(*runres)("x264-output.yuv", "jm-output.yuv") except ComparisonError, e: raise FailedTestError(e) finally: try: os.remove("x264-output.yuv") except: pass try: os.remove("%s.264" % self.fixture.dispatcher.video) except: pass try: os.remove("jm-output.yuv") except: pass try: os.remove("log.dec") except: pass try: os.remove("dataDec.txt") except: pass @comparer(compare_pass) def test_ffmpeg(self): if not program_exists("ffmpeg"): raise DisabledTestError("ffmpeg unavailable") try: runres = self._run_x264() ffmpeg_proc = Popen([ "ffmpeg", "-vsync 0", "-i", "%s.264" % self.fixture.dispatcher.video, "ffmpeg-output.yuv" ], stdout=PIPE, stderr=STDOUT) output = ffmpeg_proc.communicate()[0] if ffmpeg_proc.returncode != 0: raise FailedTestError("ffmpeg did not complete properly: %s" % output.replace("\n", " ")) try: compare_yuv_output(*runres)("x264-output.yuv", "ffmpeg-output.yuv") except ComparisonError, e: raise FailedTestError(e) finally: try: os.remove("x264-output.yuv") except: pass try: os.remove("%s.264" % self.fixture.dispatcher.video) except: pass try: os.remove("ffmpeg-output.yuv") except: pass return YUVOutputComparison class Regression(Case): depends = [ Compile ] _psnr_pattern = re.compile(r"x264 [[]info[]]: PSNR Mean Y:\d+[.]\d+ U:\d+[.]\d+ V:\d+[.]\d+ Avg:\d+[.]\d+ Global:(\d+[.]\d+) kb/s:\d+[.]\d+") _ssim_pattern = re.compile(r"x264 [[]info[]]: SSIM Mean Y:(\d+[.]\d+) [(]\d+[.]\d+db[)]") def __init__(self): if self.fixture.dispatcher.x264: self.__class__.__name__ += " %s" % " ".join(self.fixture.dispatcher.x264) def test_psnr(self): try: x264_proc = Popen([ "./x264", "-o", "%s.264" % self.fixture.dispatcher.video, "--psnr" ] + self.fixture.dispatcher.x264 + [ self.fixture.dispatcher.video ], stdout=PIPE, stderr=STDOUT) output = x264_proc.communicate()[0] if x264_proc.returncode != 0: raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " ")) for line in output.split("\n"): if line.startswith("x264 [info]: PSNR Mean"): return float(self._psnr_pattern.match(line).group(1)) raise FailedTestError("no PSNR output caught from x264") finally: try: os.remove("%s.264" % self.fixture.dispatcher.video) except: pass def test_ssim(self): try: x264_proc = Popen([ "./x264", "-o", "%s.264" % self.fixture.dispatcher.video, "--ssim" ] + self.fixture.dispatcher.x264 + [ self.fixture.dispatcher.video ], stdout=PIPE, stderr=STDOUT) output = x264_proc.communicate()[0] if x264_proc.returncode != 0: raise FailedTestError("x264 did not complete properly: %s" % output.replace("\n", " ")) for line in output.split("\n"): if line.startswith("x264 [info]: SSIM Mean"): return float(self._ssim_pattern.match(line).group(1)) raise FailedTestError("no PSNR output caught from x264") finally: try: os.remove("%s.264" % self.fixture.dispatcher.video) except: pass def _generate_random_commandline(): commandline = [] for suboptions in OPTIONS: commandline.append(suboptions[randrange(0, len(suboptions))]) return filter(None, reduce(operator.add, [ shlex.split(opt) for opt in commandline ])) _generated = [] fixture = x264() fixture.register_case(Compile) fixture.register_case(Regression) class Dispatcher(_Dispatcher): video = "akiyo_qcif.y4m" products = 50 configure = [] x264 = [] yuv_tests = [ "jm" ] def _populate_parser(self): super(Dispatcher, self)._populate_parser() # don't do a whole lot with this tcase = _YUVOutputComparisonFactory() yuv_tests = [ name[5:] for name, meth in filter(lambda pair: pair[0][:5] == "test_", inspect.getmembers(tcase)) ] group = OptionGroup(self.optparse, "x264 testing-specific options") group.add_option( "-v", "--video", metavar="FILENAME", action="callback", dest="video", type=str, callback=lambda option, opt, value, parser: setattr(self, "video", value), help="yuv video to perform testing on (default: %s)" % self.video ) group.add_option( "-s", "--seed", metavar="SEED", action="callback", dest="seed", type=int, callback=lambda option, opt, value, parser: setattr(self, "seed", value), help="seed for the random number generator (default: unix timestamp)" ) group.add_option( "-p", "--product-tests", metavar="NUM", action="callback", dest="video", type=int, callback=lambda option, opt, value, parser: setattr(self, "products", value), help="number of cartesian products to generate for yuv comparison testing (default: %d)" % self.products ) group.add_option( "--configure-with", metavar="FLAGS", action="callback", dest="configure", type=str, callback=lambda option, opt, value, parser: setattr(self, "configure", shlex.split(value)), help="options to run ./configure with" ) group.add_option( "--yuv-tests", action="callback", dest="yuv_tests", type=str, callback=lambda option, opt, value, parser: setattr(self, "yuv_tests", [ val.strip() for val in value.split(",") ]), help="select tests to run with yuv comparisons (default: %s, available: %s)" % ( ", ".join(self.yuv_tests), ", ".join(yuv_tests) ) ) group.add_option( "--x264-with", metavar="FLAGS", action="callback", dest="x264", type=str, callback=lambda option, opt, value, parser: setattr(self, "x264", shlex.split(value)), help="additional options to run ./x264 with" ) self.optparse.add_option_group(group) def pre_dispatch(self): if not hasattr(self, "seed"): self.seed = int(time()) print "Using seed: %d" % self.seed seed(self.seed) for i in xrange(self.products): YUVOutputComparison = _YUVOutputComparisonFactory() commandline = _generate_random_commandline() counter = 0 while commandline in _generated: counter += 1 commandline = _generate_random_commandline() if counter > 100: print >>sys.stderr, "Maximum command-line regeneration exceeded. " \ "Try a different seed or specify fewer products to generate." sys.exit(1) commandline += self.x264 _generated.append(commandline) YUVOutputComparison.options = commandline YUVOutputComparison.__name__ = ("%s %s" % (YUVOutputComparison.__name__, " ".join(commandline))) fixture.register_case(YUVOutputComparison) Dispatcher(fixture).dispatch()