-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpianobar_control.py
115 lines (88 loc) · 3.06 KB
/
pianobar_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import json
import re
import subprocess
import os
from threading import Timer
PIANOBAR_BIN = '/opt/pianobar/bin/pianobar'
PIANOBAR_FOLDER = os.path.join('/home/spksel/.config/pianobar')
PIANOBAR_CTL = os.path.join(PIANOBAR_FOLDER, 'ctl')
PIANOBAR_STATUS = os.path.join(PIANOBAR_FOLDER, 'status')
PIANOBAR_ENV = {'HOME': '/home/spksel'}
def is_pianobar_running():
return subprocess.call(['pidof', 'pianobar']) == 0
def kill_other_pianobars():
subprocess.call(['killall', '-9', 'pianobar'])
class PianoBar(object):
def __init__(self):
self.proc = None
self.devnull = open('/dev/null', 'w')
# self.mode = STATION_SELECT
self._is_paused = True
def run(self):
if is_pianobar_running():
kill_other_pianobars()
try:
os.remove(PIANOBAR_STATUS)
except:
pass
# logfh = open('/home/spksel/logs/pianobar.log', 'w')
logfh = self.devnull
self.proc = subprocess.Popen(PIANOBAR_BIN,
stdin=subprocess.PIPE,
stdout=logfh,
stderr=logfh,
close_fds=True,
env=PIANOBAR_ENV)
# select no station to start
self._write_pianobar('')
def quit(self):
if self.proc:
QUIT_TIMEOUT = 1 # seconds
try:
# Try to quit gracefully to allow pianobar to save its state file
self._write_pianobar('q')
# Timeout and force kill if necessary
timer = Timer(QUIT_TIMEOUT, lambda: self.proc.kill())
timer.start()
self.proc.wait()
timer.cancel()
except IOError:
# in case of broken pipe
self.proc.kill()
self.proc = None
# kill_other_pianobars()
def is_running(self):
return self.proc is not None
def status(self):
try:
with open(PIANOBAR_STATUS, 'r') as fh:
status = json.load(fh)
status.update({'paused': self._is_paused})
return status
except IOError:
return None
def select_station(self, station_id):
assert re.match(r'\d+', station_id)
self._write_pianobar('s' + station_id)
def play(self):
self._write_pianobar('P')
status = self.status()
if status and status.get('stationName'):
self._is_paused = False
def pause(self):
self._write_pianobar('S')
self._is_paused = True
def skip(self):
self._write_pianobar('n')
def love(self):
self._write_pianobar('+')
def ban(self):
self._write_pianobar('-')
def tired(self):
self._write_pianobar('t')
def _write_pianobar(self, command):
if self.proc:
self.proc.stdin.write(command + '\n')
print("Pianobar>>:", command + '\n')
else:
print("Pianobar NOT RUNNING>>:", command + '\n')