From: Takashi Sakamoto Date: Sun, 14 Jun 2020 12:36:32 +0000 (+0900) Subject: samples: add Python 3 sample script for ALSARawmidi X-Git-Tag: v0.1.0~41 X-Git-Url: https://git.alsa-project.org/?a=commitdiff_plain;h=e59c7d96f80c455ee275926b4cf768b85b496a2e;p=alsa-gobject.git samples: add Python 3 sample script for ALSARawmidi Signed-off-by: Takashi Sakamoto --- diff --git a/samples/rawmidi b/samples/rawmidi new file mode 100755 index 0000000..53c6bfe --- /dev/null +++ b/samples/rawmidi @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 + +import gi +gi.require_version('GLib', '2.0') +gi.require_version('ALSACtl', '0.0') +gi.require_version('ALSARawmidi', '0.0') +from gi.repository import GLib, ALSACtl, ALSARawmidi + +from sys import exit +from signal import SIGINT + +# Open ALSA Rawmidi chracter device in which a pair of input/output substreams +# is available in the same subdevice ID. +for card_id in ALSACtl.get_card_id_list(): + for device_id in ALSARawmidi.get_device_id_list(card_id): + outputs = ALSARawmidi.get_subdevice_id_list(card_id, device_id, + ALSARawmidi.StreamDirection.OUTPUT) + inputs = ALSARawmidi.get_subdevice_id_list(card_id, device_id, + ALSARawmidi.StreamDirection.OUTPUT) + for subdevice_id in outputs: + if subdevice_id in inputs: + pair = ALSARawmidi.StreamPair.new() + modes = (ALSARawmidi.StreamPairInfoFlag.OUTPUT | + ALSARawmidi.StreamPairInfoFlag.INPUT) + pair.open(card_id, device_id, subdevice_id, modes, 0) + break + else: + continue + break + else: + continue + break +else: + print('ALSA Rawmidi devices not found.') + exit(1) + +# Retrieve the information of substream for both directions. +for direction in (ALSARawmidi.StreamDirection.OUTPUT, + ALSARawmidi.StreamDirection.INPUT): + info = pair.get_substream_info(direction) + print('The information of {} substream:'.format(direction.value_nick)) + print(' direction:', info.get_property('direction').value_nick) + for prop in ('device-id', 'subdevice-id', 'card-id', 'id', 'name', + 'subdevice-name', 'subdevices-count', 'subdevices-avail'): + print(' {}: {}'.format(prop, info.get_property(prop))) + print(' flags:', info.get_property('flags').value_nicks) + +# Configure the substream for both directions. +params = ALSARawmidi.SubstreamParams.new() +for direction in (ALSARawmidi.StreamDirection.OUTPUT, + ALSARawmidi.StreamDirection.INPUT): + params.set_property('buffer-size', 1024) + params.set_property('avail-min', 1) + params.set_property('active-sensing', False) + pair.set_substream_params(direction, params) + print('The parameter of {} substream.'.format(direction.value_nick)) + for prop in ('buffer-size', 'avail-min', 'active-sensing'): + print(' {}: {}'.format(prop, params.get_property(prop))) + +# Register event handler. +def handle_messages(pair): + print('Event:') + status = ALSARawmidi.SubstreamStatus.new() + + status = pair.get_substream_status(ALSARawmidi.StreamDirection.INPUT, status) + avail = status.get_property('avail') + buf = pair.read_from_substream([0] * avail) + print(' {} bytes read.'.format(len(buf))) + + status = pair.get_substream_status(ALSARawmidi.StreamDirection.OUTPUT, status) + avail = status.get_property('avail') + if avail < len(buf): + buf = buf[:avail] + pair.write_to_substream(buf) + print(' {} bytes written.'.format(len(buf))) +pair.connect('handle-messages', handle_messages) + +# Create event dispatcher. +dispatcher = GLib.MainLoop.new(None, False) +src = pair.create_source() +src.attach(dispatcher.get_context()) + +# Register UNIX signal handler. +def handle_unix_signal(dispatcher): + dispatcher.quit() +GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, SIGINT, + handle_unix_signal, dispatcher) + +# Start to dispatch events. +dispatcher.run() + +# Clean intermediate buffers. +pair.drain_substream(ALSARawmidi.StreamDirection.INPUT) +pair.drain_substream(ALSARawmidi.StreamDirection.OUTPUT)