From 26ebf0e25766406c5610ecbcc0bb921a22e22495 Mon Sep 17 00:00:00 2001 From: Takashi Sakamoto Date: Sun, 14 Jun 2020 21:37:10 +0900 Subject: [PATCH] samples: add Python 3 sample script for ALSASeq Signed-off-by: Takashi Sakamoto --- samples/seq | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100755 samples/seq diff --git a/samples/seq b/samples/seq new file mode 100755 index 0000000..40802b4 --- /dev/null +++ b/samples/seq @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 + +import gi +gi.require_version('GLib', '2.0') +gi.require_version('ALSATimer', '0.0') +gi.require_version('ALSASeq', '0.0') +from gi.repository import GLib, ALSATimer, ALSASeq + +from signal import SIGINT + +# Register my client. +client = ALSASeq.UserClient.new() +client.open(0) + +# Register client information. +info = ALSASeq.ClientInfo.new() +info.set_property('name', 'focal') +client.set_info(info) + +# Dump client information. +info = client.get_info(info) +print('The information of client:') +print(' type: {}'.format(info.get_property('type').value_nick)) +print(' filter-attributes: {}'.format(info.get_property('filter-attributes').value_nicks)) +for prop in ('client-id', 'name', 'use-filter', 'port-count', 'lost-count', + 'card-id', 'process-id'): + print(' {}: {}'.format(prop, info.get_property(prop))) + +# Add my port. +info = ALSASeq.PortInfo.new() +info.set_property('name', 'fossa') +caps = (ALSASeq.PortCapFlag.READ | + ALSASeq.PortCapFlag.WRITE | + ALSASeq.PortCapFlag.DUPLEX | + ALSASeq.PortCapFlag.SUBS_READ | + ALSASeq.PortCapFlag.SUBS_WRITE) +info.set_property('caps', caps) +attrs = (ALSASeq.PortAttrFlag.MIDI_GENERIC | + ALSASeq.PortAttrFlag.SOFTWARE | + ALSASeq.PortAttrFlag.APPLICATION) +info.set_property('attrs', attrs) +info = client.create_port(info) + +# Dump port information. +print('The information of port:') +for prop in ('name', 'midi-channels', 'midi-voices', 'synth-voices', + 'read-users', 'write-users', 'timestamp-overwrite', 'queue-id'): + print(' {}: {}'.format(prop, info.get_property(prop))) +for prop in ('caps', 'attrs'): + print(' {}: {}'.format(prop, info.get_property(prop).value_nicks)) +addr = info.get_property('addr') +print(' addr:', addr.get_client_id(), addr.get_port_id()) +port_id = addr.get_port_id() + +# Register my queue. +info = ALSASeq.QueueInfo.new() +info.set_property('name', '20.04') +info.set_property('client-id', client.get_property('client-id')) +info.set_property('locked', True) +info = client.create_queue(info) + +# Dump queue information. +print('The information of queue:') +for prop in ('name', 'queue-id', 'client-id', 'locked'): + print(' {}: {}'.format(prop, info.get_property(prop))) +queue_id = info.get_property('queue-id') + +# Prepare two events; one to start the queue, another to deliver on the queue. +ev_cntr = ALSASeq.EventCntr.new(2) + +ev_cntr.set_event_type(0, ALSASeq.EventType.START) +ev_cntr.set_tstamp_mode(0, ALSASeq.EventTimestampMode.REAL) +ev_cntr.set_time_mode(0, ALSASeq.EventTimeMode.REL) +ev_cntr.set_priority_mode(0, ALSASeq.EventPriorityMode.NORMAL) +ev_cntr.set_tag(0, 0) +ev_cntr.set_queue_id(0, ALSASeq.SpecificQueueId.DIRECT) +ev_cntr.set_dst(0, ALSASeq.Addr.new(ALSASeq.SpecificClientId.SYSTEM, + ALSASeq.SpecificPortId.TIMER)); +ev_cntr.set_src(0, ALSASeq.Addr.new(client.get_property('client-id'), 0)); +data = ev_cntr.get_queue_data(0) +data.set_queue_id(queue_id) +ev_cntr.set_queue_data(0, data) + +ev_cntr.set_event_type(1, ALSASeq.EventType.CONTROLLER) +ev_cntr.set_tstamp_mode(1, ALSASeq.EventTimestampMode.REAL) +ev_cntr.set_time_mode(1, ALSASeq.EventTimeMode.REL) +ev_cntr.set_priority_mode(1, ALSASeq.EventPriorityMode.NORMAL) +ev_cntr.set_tag(1, 10) +ev_cntr.set_queue_id(1, queue_id) +ev_cntr.set_dst(1, ALSASeq.Addr.new(client.get_property('client-id'), 0)); +ev_cntr.set_src(1, ALSASeq.Addr.new(client.get_property('client-id'), 0)); + +# Schedule the events. +client.schedule_event(ev_cntr, 2) + +# Register event handler. +def handle_event(client, ev_cntr): + count = ev_cntr.count_events() + print('{} events:'.format(count)) + for i in range(count): + print(' Event {}:'.format(i)) + print(' type:', ev_cntr.get_event_type(i).value_nick) + print(' tstamp-mode:', ev_cntr.get_tstamp_mode(i).value_nick) + print(' time-mode:', ev_cntr.get_time_mode(i).value_nick) + print(' length-mode:', ev_cntr.get_length_mode(i).value_nick) + print(' priority-mode:', ev_cntr.get_priority_mode(i).value_nick) + print(' tag:', ev_cntr.get_tag(i)) + print(' queue-id:', ev_cntr.get_queue_id(i)) + tstamp = ev_cntr.get_tstamp(i) + if ev_cntr.get_tstamp_mode(i) == ALSASeq.EventTimestampMode.TICK: + print(' tstamp:', tstamp.get_tick_time()) + else: + print(' tstamp:', tstamp.get_real_time()) + dst = ev_cntr.get_dst(i) + print(' dst:', dst.get_client_id(), dst.get_port_id()) + src = ev_cntr.get_src(i) + print(' src:', src.get_client_id(), src.get_port_id()) +client.connect('handle-event', handle_event) + +# Create event dispatcher. +dispatcher = GLib.MainLoop.new(None, False) +src = client.create_source() +src.attach(dispatcher.get_context()) + +# Register UNIX signal handler. +def handle_unix_signal(dispatcher): + dispatcher.quit() +GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, SIGINT, + handle_unix_signal, dispatcher) + +# Start to dispatch events. +dispatcher.run() + +# Stop the queue. +ev_cntr.set_event_type(0, ALSASeq.EventType.STOP) +client.schedule_event(ev_cntr, 1) + +# Delete the queue. +client.delete_queue(queue_id) + +# Delete the port +client.delete_port(port_id) -- 2.47.3