#!/usr/bin/env python # hwmixvolume - ALSA hardware mixer volume control applet # Copyright (c) 2009-2010 Clemens Ladisch # Copyright (c) 2018 Emmanuel Gil Peyrot # # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM # LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # PERFORMANCE OF THIS SOFTWARE. import gi gi.require_version('GLib', '2.0') gi.require_version('Gtk', '3.0') from gi.repository import GLib, Gtk from pyalsa import alsacard, alsahcontrol INTF_PCM = alsahcontrol.interface_id['PCM'] INTF_MIXER = alsahcontrol.interface_id['MIXER'] TYPE_INTEGER = alsahcontrol.element_type['INTEGER'] EVENT_VALUE = alsahcontrol.event_mask['VALUE'] EVENT_INFO = alsahcontrol.event_mask['INFO'] EVENT_REMOVE = alsahcontrol.event_mask_remove class Stream: def __init__(self, element, parent): self.element = element self.element.set_callback(self) self.parent = parent self.label = None self.scales = [] self.adjustments = [] self.callback(self.element, EVENT_INFO) def destroy(self): self.deactivate() def callback(self, e, mask): if mask == EVENT_REMOVE: self.deactivate() elif (mask & EVENT_INFO) != 0: info = alsahcontrol.Info(self.element) if info.is_inactive: self.deactivate() else: self.activate() elif (mask & EVENT_VALUE) != 0: self.update_scales_from_ctl() def activate(self): if self.label: return info = alsahcontrol.Info(self.element) value = alsahcontrol.Value(self.element) value.read() values = value.get_tuple(TYPE_INTEGER, info.count) self.label = Gtk.Label.new(self.get_label(info)) self.label.set_single_line_mode(True) self.parent.scales_vbox.add(self.label) for i in range(info.count): adj = Gtk.Adjustment(value=values[i], lower=info.min, upper=info.max, step_incr=1, page_incr=(info.max-info.min+1)/8) adj.connect('value-changed', self.update_ctl_from_scale, i) scale = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL, adjustment=adj) scale.set_draw_value(False) self.parent.scales_vbox.add(scale) self.scales.append(scale) self.adjustments.append(adj) self.parent.scales_vbox.show_all() self.parent.update_msg_label() def deactivate(self): if not self.label: return self.label.destroy() for s in self.scales: s.destroy() self.label = None self.scales = [] self.adjustments = [] self.parent.update_msg_label() def update_scales_from_ctl(self): if not self.label: return count = len(self.adjustments) value = alsahcontrol.Value(self.element) value.read() values = value.get_tuple(TYPE_INTEGER, count) for i in range(count): self.adjustments[i].set_value(values[i]) def update_ctl_from_scale(self, adj, index): scale_value = adj.get_value() value_to_set = int(round(adj.get_value())) count = len(self.adjustments) value = alsahcontrol.Value(self.element) if self.parent.lock_check.get_active(): values = [value_to_set for i in range(count)] else: value.read() values = value.get_array(TYPE_INTEGER, count) values[index] = value_to_set value.set_array(TYPE_INTEGER, values) value.write() if value_to_set != scale_value: adj.set_value(value_to_set) def get_label(self, info): pid = self.get_pid(info) if pid: cmdline = self.get_pid_cmdline(pid) if cmdline: return cmdline else: return "PID %d" % pid else: name = info.name if name[-7:] == " Volume": name = name[:-7] if name[-9:] == " Playback": name = name[:-9] return name def get_pid(self, info): card = self.parent.current_card device = info.device subdevice = info.subdevice if subdevice == 0: subdevice = info.index filename = "/proc/asound/card%d/pcm%dp/sub%d/status" % (card, device, subdevice) try: with open(filename, "r") as f: for line in f: if line[:9] == "owner_pid": return int(line.split(':')[1].strip()) except IOError: return None return None def get_pid_cmdline(self, pid): try: with open("/proc/%d/cmdline" % pid, "r") as f: cmdline = f.read() except IOError: return None return cmdline.replace('\x00', ' ').strip() class MixerWindow(Gtk.Window): card_numbers = alsacard.card_list() current_card = -1 hcontrol = None scales_vbox = None msg_label = None streams = [] hctl_sources = [] def __init__(self): Gtk.Window.__init__(self) self.connect('destroy', lambda w: Gtk.main_quit()) self.set_title("Hardware Mixer Volumes") vbox = Gtk.Grid() vbox.set_orientation(Gtk.Orientation.VERTICAL) self.add(vbox) hbox = Gtk.Grid() vbox.add(hbox) label = Gtk.Label.new_with_mnemonic("_Sound Card: ") hbox.add(label) combo = Gtk.ComboBoxText() combo.set_hexpand(True) for i in self.card_numbers: str = "%d: %s" % (i, alsacard.card_get_name(i)) combo.append_text(str) if len(self.card_numbers) > 0: combo.set_active(0) combo.connect('changed', lambda c: self.change_card(self.card_numbers[combo.get_active()])) hbox.add(combo) label.set_mnemonic_widget(combo) self.lock_check = Gtk.CheckButton.new_with_mnemonic(label="_Lock Channels") self.lock_check.set_active(True) vbox.add(self.lock_check) scrollwin = Gtk.ScrolledWindow() scrollwin.set_policy(hscrollbar_policy=Gtk.PolicyType.NEVER, vscrollbar_policy=Gtk.PolicyType.AUTOMATIC) scrollwin.set_shadow_type(Gtk.ShadowType.NONE) scrollwin.set_vexpand(True) vbox.add(scrollwin) self.scales_vbox = Gtk.Grid() self.scales_vbox.set_orientation(Gtk.Orientation.VERTICAL) scrollwin.add(self.scales_vbox) label = Gtk.Label() label.set_single_line_mode(True) line_height = max(label.get_size_request().height, 0) label.destroy() scale = Gtk.Scale(orientation=Gtk.Orientation.HORIZONTAL) scale.set_draw_value(False) line_height += max(scale.get_size_request().height, 0) scale.destroy() # always have space for at least four sliders scrollwin.set_size_request(width=-1, height=line_height*4+4) # TODO: select the default card or the first card with stream controls if len(self.card_numbers) > 0: self.change_card(self.card_numbers[0]) self.update_msg_label() self.show_all() def change_card(self, cardnum): for s in self.hctl_sources: GLib.source_remove(s) self.hctl_sources = [] self.hcontrol = self.open_hcontrol_for_card(cardnum) for s in self.streams: s.destroy() self.streams = [] self.current_card = cardnum if not self.hcontrol: self.update_msg_label() return for id in self.hcontrol.list(): if not self.is_stream_elem(id): continue elem = alsahcontrol.Element(self.hcontrol, id[0]) info = alsahcontrol.Info(elem) if not self.is_stream_info(info): continue stream = Stream(elem, self) self.streams.append(stream) for fd,condition in self.hcontrol.poll_fds: self.hctl_sources.append(GLib.io_add_watch(fd, 0, GLib.IOCondition(condition), self.hctl_io_callback)) self.update_msg_label() self.scales_vbox.show_all() def update_msg_label(self): needs_msg = len(self.scales_vbox.get_children()) < 2 has_msg = self.msg_label if has_msg and not needs_msg: self.msg_label.destroy() self.msg_label = None elif needs_msg: if len(self.streams) > 0: msg = "There are no open streams." else: msg = "This card does not have stream controls." if not has_msg: self.msg_label = Gtk.Label.new(msg) self.msg_label.set_vexpand(True) self.scales_vbox.add(self.msg_label) self.scales_vbox.show_all() elif self.msg_label.get_text() != msg: self.msg_label.set_text(msg) def open_hcontrol_for_card(self, cardnum): devname = "hw:CARD=" + str(cardnum) try: hc = alsahcontrol.HControl(name=devname, mode=alsahcontrol.open_mode['NONBLOCK']) except: # TODO: alsa error msg dlg = Gtk.MessageDialog(self, Gtk.DialogFlags.MODAL | Gtk.DialogFlags.DESTROY_WITH_PARENT, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, "Cannot open sound card control device.") dlg.run() dlg.destroy() return None return hc def is_stream_elem(self, id): return ((id[1] == INTF_PCM and id[4] in ("PCM Playback Volume", "EMU10K1 PCM Volume")) or (id[1] == INTF_MIXER and id[4] == "VIA DXS Playback Volume")) def is_stream_info(self, info): return info.is_readable and info.is_writable and info.type == TYPE_INTEGER def hctl_io_callback(self, source, condition): self.hcontrol.handle_events() return True def main(): MixerWindow() Gtk.main() main()