]> git.alsa-project.org Git - alsa-gobject.git/commitdiff
samples: add Python 3 sample script for ALSASeq
authorTakashi Sakamoto <o-takashi@sakamocchi.jp>
Sun, 14 Jun 2020 12:37:10 +0000 (21:37 +0900)
committer坂本 貴史 <o-takashi@sakamocchi.jp>
Sun, 14 Jun 2020 13:11:06 +0000 (22:11 +0900)
Signed-off-by: Takashi Sakamoto <o-takashi@sakamocchi.jp>
samples/seq [new file with mode: 0755]

diff --git a/samples/seq b/samples/seq
new file mode 100755 (executable)
index 0000000..40802b4
--- /dev/null
@@ -0,0 +1,142 @@
+#!/usr/bin/env python3
+
+import gi
+gi.require_version('GLib', '2.0')
+gi.require_version('ALSATimer', '0.0')
+gi.require_version('ALSASeq', '0.0')
+from gi.repository import GLib, ALSATimer, ALSASeq
+
+from signal import SIGINT
+
+# Register my client.
+client = ALSASeq.UserClient.new()
+client.open(0)
+
+# Register client information.
+info = ALSASeq.ClientInfo.new()
+info.set_property('name', 'focal')
+client.set_info(info)
+
+# Dump client information.
+info = client.get_info(info)
+print('The information of client:')
+print('  type: {}'.format(info.get_property('type').value_nick))
+print('  filter-attributes: {}'.format(info.get_property('filter-attributes').value_nicks))
+for prop in ('client-id', 'name', 'use-filter', 'port-count', 'lost-count',
+             'card-id', 'process-id'):
+    print('  {}: {}'.format(prop, info.get_property(prop)))
+
+# Add my port.
+info = ALSASeq.PortInfo.new()
+info.set_property('name', 'fossa')
+caps = (ALSASeq.PortCapFlag.READ |
+        ALSASeq.PortCapFlag.WRITE |
+        ALSASeq.PortCapFlag.DUPLEX |
+        ALSASeq.PortCapFlag.SUBS_READ |
+        ALSASeq.PortCapFlag.SUBS_WRITE)
+info.set_property('caps', caps)
+attrs = (ALSASeq.PortAttrFlag.MIDI_GENERIC |
+         ALSASeq.PortAttrFlag.SOFTWARE |
+         ALSASeq.PortAttrFlag.APPLICATION)
+info.set_property('attrs', attrs)
+info = client.create_port(info)
+
+# Dump port information.
+print('The information of port:')
+for prop in ('name', 'midi-channels', 'midi-voices', 'synth-voices',
+             'read-users', 'write-users', 'timestamp-overwrite', 'queue-id'):
+    print('  {}: {}'.format(prop, info.get_property(prop)))
+for prop in ('caps', 'attrs'):
+    print('  {}: {}'.format(prop, info.get_property(prop).value_nicks))
+addr = info.get_property('addr')
+print('  addr:', addr.get_client_id(), addr.get_port_id())
+port_id = addr.get_port_id()
+
+# Register my queue.
+info = ALSASeq.QueueInfo.new()
+info.set_property('name', '20.04')
+info.set_property('client-id', client.get_property('client-id'))
+info.set_property('locked', True)
+info = client.create_queue(info)
+
+# Dump queue information.
+print('The information of queue:')
+for prop in ('name', 'queue-id', 'client-id', 'locked'):
+    print('  {}: {}'.format(prop, info.get_property(prop)))
+queue_id = info.get_property('queue-id')
+
+# Prepare two events; one to start the queue, another to deliver on the queue.
+ev_cntr = ALSASeq.EventCntr.new(2)
+
+ev_cntr.set_event_type(0, ALSASeq.EventType.START)
+ev_cntr.set_tstamp_mode(0, ALSASeq.EventTimestampMode.REAL)
+ev_cntr.set_time_mode(0, ALSASeq.EventTimeMode.REL)
+ev_cntr.set_priority_mode(0, ALSASeq.EventPriorityMode.NORMAL)
+ev_cntr.set_tag(0, 0)
+ev_cntr.set_queue_id(0, ALSASeq.SpecificQueueId.DIRECT)
+ev_cntr.set_dst(0, ALSASeq.Addr.new(ALSASeq.SpecificClientId.SYSTEM,
+                                    ALSASeq.SpecificPortId.TIMER));
+ev_cntr.set_src(0, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+data = ev_cntr.get_queue_data(0)
+data.set_queue_id(queue_id)
+ev_cntr.set_queue_data(0, data)
+
+ev_cntr.set_event_type(1, ALSASeq.EventType.CONTROLLER)
+ev_cntr.set_tstamp_mode(1, ALSASeq.EventTimestampMode.REAL)
+ev_cntr.set_time_mode(1, ALSASeq.EventTimeMode.REL)
+ev_cntr.set_priority_mode(1, ALSASeq.EventPriorityMode.NORMAL)
+ev_cntr.set_tag(1, 10)
+ev_cntr.set_queue_id(1, queue_id)
+ev_cntr.set_dst(1, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+ev_cntr.set_src(1, ALSASeq.Addr.new(client.get_property('client-id'), 0));
+
+# Schedule the events.
+client.schedule_event(ev_cntr, 2)
+
+# Register event handler.
+def handle_event(client, ev_cntr):
+    count = ev_cntr.count_events()
+    print('{} events:'.format(count))
+    for i in range(count):
+        print('  Event {}:'.format(i))
+        print('    type:', ev_cntr.get_event_type(i).value_nick)
+        print('    tstamp-mode:', ev_cntr.get_tstamp_mode(i).value_nick)
+        print('    time-mode:', ev_cntr.get_time_mode(i).value_nick)
+        print('    length-mode:', ev_cntr.get_length_mode(i).value_nick)
+        print('    priority-mode:', ev_cntr.get_priority_mode(i).value_nick)
+        print('    tag:', ev_cntr.get_tag(i))
+        print('    queue-id:', ev_cntr.get_queue_id(i))
+        tstamp = ev_cntr.get_tstamp(i)
+        if ev_cntr.get_tstamp_mode(i) == ALSASeq.EventTimestampMode.TICK:
+            print('    tstamp:', tstamp.get_tick_time())
+        else:
+            print('    tstamp:', tstamp.get_real_time())
+        dst = ev_cntr.get_dst(i)
+        print('    dst:', dst.get_client_id(), dst.get_port_id())
+        src = ev_cntr.get_src(i)
+        print('    src:', src.get_client_id(), src.get_port_id())
+client.connect('handle-event', handle_event)
+
+# Create event dispatcher.
+dispatcher = GLib.MainLoop.new(None, False)
+src = client.create_source()
+src.attach(dispatcher.get_context())
+
+# Register UNIX signal handler.
+def handle_unix_signal(dispatcher):
+    dispatcher.quit()
+GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, SIGINT,
+                     handle_unix_signal, dispatcher)
+
+# Start to dispatch events.
+dispatcher.run()
+
+# Stop the queue.
+ev_cntr.set_event_type(0, ALSASeq.EventType.STOP)
+client.schedule_event(ev_cntr, 1)
+
+# Delete the queue.
+client.delete_queue(queue_id)
+
+# Delete the port
+client.delete_port(port_id)