Source code for wluma_als_emulator.strategies.webcam_strategy

import math
import os
import os.path
import subprocess
import tempfile
from time import sleep
from PIL import Image, ImageStat

WEBCAM_PROBES = 1


[docs]class WebcamStrategy: def __init__(self, config): self.config = config self.lux = None self.capture_command = [ "ffmpeg", "-hide_banner", "-loglevel", "panic", "-i", self.config.input, "-vframes", "1", ] self.get_sleep_time() if self.config.verbose: print(f"Sleep mode: {self.config.sleep_mode} will be use")
[docs] def get_brighness(self, screenshot_cmd): """ Return brighness from a webcam capture """ f, path = tempfile.mkstemp() os.close(f) os.remove(path) path += ".jpg" try: subprocess.run([*screenshot_cmd, path], check=True) except FileNotFoundError as e: raise (f"The binary is not found?\n{e}") result = None with Image.open(path) as im: stat = ImageStat.Stat(im) r, g, b = stat.rms result = math.sqrt(0.241 * (r ** 2) + 0.691 * (g ** 2) + 0.068 * (b ** 2)) os.remove(path) return int(100 * (result / 255))
[docs] def calculate(self): """ Fetch the brighness WEBCAM_PROBES time And return the average """ results = [] for _ in range(0, WEBCAM_PROBES): results.append(self.get_brighness(self.capture_command)) sleep(1) return int(sum(results) / float(len(results)))
[docs] def get_sleep_time(self): """ Return how many time to sleep between two loop """ self.sleep_time = self.config.sleep_time if self.config.sleep_mode == "fixed": self.sleep_time = self.config.sleep_time elif self.config.sleep_mode == "periods": self.sleep_time = self.config.get_sleep_by_periods() elif self.config.sleep_mode == "lux" and self.lux: # less refresh in the dark self.sleep_time = ( (self.config.sleep_time * 2) if self.lux <= 10 else self.config.sleep_time ) # more refresh if very bright self.sleep_time = ( (self.config.sleep_time * 0.5) if self.lux >= 80 else self.config.sleep_time ) if WEBCAM_PROBES > 1: self.sleep_time = self.sleep_time - WEBCAM_PROBES
def run(self): # get sensor value try: self.lux = self.calculate() except subprocess.CalledProcessError as e: raise (f"Can't get lux, bad input device?\n{e}") self.get_sleep_time() if self.config.verbose: print(f"lux={self.lux} | waiting {self.sleep_time} seconds...")