#!/bin/env python3 # SPDX-License-Identifier: GPL-2.0 # -*- coding: utf-8 -*- # # Copyright (c) 2017 Benjamin Tissoires # Copyright (c) 2017 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import fcntl import functools import libevdev import os try: import pyudev except ImportError: raise ImportError("UHID is not supported due to missing pyudev dependency") import logging import hidtools.hid as hid from hidtools.uhid import UHIDDevice from hidtools.util import BusType from pathlib import Path from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union logger = logging.getLogger("hidtools.device.base_device") class SysfsFile(object): def __init__(self, path): self.path = path def __set_value(self, value): with open(self.path, "w") as f: return f.write(f"{value}\n") def __get_value(self): with open(self.path) as f: return f.read().strip() @property def int_value(self) -> int: return int(self.__get_value()) @int_value.setter def int_value(self, v: int) -> None: self.__set_value(v) @property def str_value(self) -> str: return self.__get_value() @str_value.setter def str_value(self, v: str) -> None: self.__set_value(v) class LED(object): def __init__(self, sys_path): self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value self.__brightness = SysfsFile(sys_path / "brightness") @property def brightness(self) -> int: return self.__brightness.int_value @brightness.setter def brightness(self, value: int) -> None: self.__brightness.int_value = value class PowerSupply(object): """Represents Linux power_supply_class sysfs nodes.""" def __init__(self, sys_path): self._capacity = SysfsFile(sys_path / "capacity") self._status = SysfsFile(sys_path / "status") self._type = SysfsFile(sys_path / "type") @property def capacity(self) -> int: return self._capacity.int_value @property def status(self) -> str: return self._status.str_value @property def type(self) -> str: return self._type.str_value class HIDIsReady(object): """ Companion class that binds to a kernel mechanism and that allows to know when a uhid device is ready or not. See :meth:`is_ready` for details. """ def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: self.uhid = uhid def is_ready(self: "HIDIsReady") -> bool: """ Overwrite in subclasses: should return True or False whether the attached uhid device is ready or not. """ return False class UdevHIDIsReady(HIDIsReady): _pyudev_context: ClassVar[Optional[pyudev.Context]] = None _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {} def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: super().__init__(uhid) self._init_pyudev() @classmethod def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None: if cls._pyudev_context is None: cls._pyudev_context = pyudev.Context() cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context) cls._pyudev_monitor.filter_by("hid") cls._pyudev_monitor.start() UHIDDevice._append_fd_to_poll( cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback ) @classmethod def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None: if cls._pyudev_monitor is None: return event: pyudev.Device for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): if event.action not in ["bind", "remove", "unbind"]: return logger.debug(f"udev event: {event.action} -> {event}") id = int(event.sys_path.strip().split(".")[-1], 16) device_ready, count = cls._uhid_devices.get(id, (False, 0)) ready = event.action == "bind" if not device_ready and ready: count += 1 cls._uhid_devices[id] = (ready, count) def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]: try: return self._uhid_devices[self.uhid.hid_id] except KeyError: return (False, 0) class EvdevMatch(object): def __init__( self: "EvdevMatch", *, requires: List[Any] = [], excludes: List[Any] = [], req_properties: List[Any] = [], excl_properties: List[Any] = [], ) -> None: self.requires = requires self.excludes = excludes self.req_properties = req_properties self.excl_properties = excl_properties def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool: for m in self.requires: if not evdev.has(m): return False for m in self.excludes: if evdev.has(m): return False for p in self.req_properties: if not evdev.has_property(p): return False for p in self.excl_properties: if evdev.has_property(p): return False return True class EvdevDevice(object): """ Represents an Evdev node and its properties. This is a stub for the libevdev devices, as they are relying on uevent to get the data, saving us some ioctls to fetch the names and properties. """ def __init__(self: "EvdevDevice", sysfs: Path) -> None: self.sysfs = sysfs self.event_node: Any = None self.libevdev: Optional[libevdev.Device] = None self.uevents = {} # all of the interesting properties are stored in the input uevent, so in the parent # so convert the uevent file of the parent input node into a dict with open(sysfs.parent / "uevent") as f: for line in f.readlines(): key, value = line.strip().split("=") self.uevents[key] = value.strip('"') # we open all evdev nodes in order to not miss any event self.open() @property def name(self: "EvdevDevice") -> str: assert "NAME" in self.uevents return self.uevents["NAME"] @property def evdev(self: "EvdevDevice") -> Path: return Path("/dev/input") / self.sysfs.name def matches_application( self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch] ) -> bool: if self.libevdev is None: return False if application in matches: return matches[application].is_a_match(self.libevdev) logger.error( f"application '{application}' is unknown, please update/fix hid-tools" ) assert False # hid-tools likely needs an update def open(self: "EvdevDevice") -> libevdev.Device: self.event_node = open(self.evdev, "rb") self.libevdev = libevdev.Device(self.event_node) assert self.libevdev.fd is not None fd = self.libevdev.fd.fileno() flag = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) return self.libevdev def close(self: "EvdevDevice") -> None: if self.libevdev is not None and self.libevdev.fd is not None: self.libevdev.fd.close() self.libevdev = None if self.event_node is not None: self.event_node.close() self.event_node = None class BaseDevice(UHIDDevice): # default _application_matches that matches nothing. This needs # to be set in the subclasses to have get_evdev() working _application_matches: Dict[str, EvdevMatch] = {} def __init__( self, name, application, rdesc_str: Optional[str] = None, rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None, input_info=None, ) -> None: self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self) if rdesc_str is None and rdesc is None: raise Exception("Please provide at least a rdesc or rdesc_str") super().__init__() if name is None: name = f"uhid gamepad test {self.__class__.__name__}" if input_info is None: input_info = (BusType.USB, 1, 2) self.name = name self.info = input_info self.default_reportID = None self.opened = False self.started = False self.application = application self._input_nodes: Optional[list[EvdevDevice]] = None if rdesc is None: assert rdesc_str is not None self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore else: self.rdesc = rdesc # type: ignore @property def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]: ps = self.walk_sysfs("power_supply", "power_supply/*") if ps is None or len(ps) < 1: return None return PowerSupply(ps[0]) @property def led_classes(self: "BaseDevice") -> List[LED]: leds = self.walk_sysfs("led", "**/max_brightness") if leds is None: return [] return [LED(led.parent) for led in leds] @property def kernel_is_ready(self: "BaseDevice") -> bool: return self._kernel_is_ready.is_ready()[0] and self.started @property def kernel_ready_count(self: "BaseDevice") -> int: return self._kernel_is_ready.is_ready()[1] @property def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: if self._input_nodes is not None: return self._input_nodes if not self.kernel_is_ready or not self.started: return [] self._input_nodes = [ EvdevDevice(path) for path in self.walk_sysfs("input", "input/input*/event*") ] return self._input_nodes def match_evdev_rule(self, application, evdev): """Replace this in subclasses if the device has multiple reports of the same type and we need to filter based on the actual evdev node. returning True will append the corresponding report to `self.input_nodes[type]` returning False will ignore this report / type combination for the device. """ return True def open(self): self.opened = True def _close_all_opened_evdev(self): if self._input_nodes is not None: for e in self._input_nodes: e.close() def __del__(self): self._close_all_opened_evdev() def close(self): self.opened = False def start(self, flags): self.started = True def stop(self): self.started = False self._close_all_opened_evdev() def next_sync_events(self, application=None): evdev = self.get_evdev(application) if evdev is not None: return list(evdev.events()) return [] @property def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]: return self._application_matches @application_matches.setter def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None: self._application_matches = data def get_evdev(self, application=None): if application is None: application = self.application if len(self.input_nodes) == 0: return None assert self._input_nodes is not None if len(self._input_nodes) == 1: evdev = self._input_nodes[0] if self.match_evdev_rule(application, evdev.libevdev): return evdev.libevdev else: for _evdev in self._input_nodes: if _evdev.matches_application(application, self.application_matches): if self.match_evdev_rule(application, _evdev.libevdev): return _evdev.libevdev def is_ready(self): """Returns whether a UHID device is ready. Can be overwritten in subclasses to add extra conditions on when to consider a UHID device ready. This can be: - we need to wait on different types of input devices to be ready (Touch Screen and Pen for example) - we need to have at least 4 LEDs present (len(self.uhdev.leds_classes) == 4) - or any other combinations""" return self.kernel_is_ready