Skip to content

Commit

Permalink
Move ipc logic out of main.py, use PyXRLinuxDriverIPC module (#15)
Browse files Browse the repository at this point in the history
Should fix some unreliability with backend server requests, moves to shared logic with the Breezy Desktop UI
  • Loading branch information
wheaney authored May 25, 2024
1 parent 7883512 commit 0a4ccec
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 266 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "defaults/PyXRLinuxDriverIPC"]
path = defaults/PyXRLinuxDriverIPC
url = https://github.com/wheaney/PyXRLinuxDriverIPC.git
1 change: 1 addition & 0 deletions PyXRLinuxDriverIPC
2 changes: 1 addition & 1 deletion bin/package
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tmpdir=$(mktemp -d)
# Create plugin directory structure
mkdir -p "$tmpdir/$pluginname/$pluginname"
cp -r dist "$tmpdir/$pluginname/$pluginname"
cp package.json plugin.json main.py README.md LICENSE* "$tmpdir/$pluginname/$pluginname"
cp -r package.json plugin.json main.py README.md LICENSE defaults/* "$tmpdir/$pluginname/$pluginname"

# Copy binaries if provided
if [ ${#binaryPaths[@]} -gt 0 ]; then
Expand Down
1 change: 1 addition & 0 deletions defaults/PyXRLinuxDriverIPC
Submodule PyXRLinuxDriverIPC added at 50ce02
271 changes: 15 additions & 256 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os
import stat
import subprocess
import sys
import time

# The decky plugin module is located at decky-loader/plugin
Expand All @@ -10,256 +9,35 @@
import decky_plugin
from settings import SettingsManager


CONFIG_FILE_PATH = os.path.join(decky_plugin.DECKY_USER_HOME, ".xreal_driver_config")

# write-only file that the driver reads (but never writes) to get user-specified control flags
CONTROL_FLAGS_FILE_PATH = '/dev/shm/xr_driver_control'

# read-only file that the driver writes (but never reads) to with its current state
DRIVER_STATE_FILE_PATH = '/dev/shm/xr_driver_state'
sys.path.insert(1, decky_plugin.DECKY_PLUGIN_DIR)
from PyXRLinuxDriverIPC.xrdriveripc import XRDriverIPC

INSTALLED_VERSION_SETTING_KEY = "installed_from_plugin_version"
DONT_SHOW_AGAIN_SETTING_KEY = "dont_show_again"
MANIFEST_CHECKSUM_KEY = "manifest_checksum"
CONTROL_FLAGS = ['recenter_screen', 'recalibrate', 'sbs_mode', 'refresh_device_license']
SBS_MODE_VALUES = ['unset', 'enable', 'disable']
MANAGED_EXTERNAL_MODES = ['virtual_display', 'sideview', 'none']
VR_LITE_OUTPUT_MODES = ['mouse', 'joystick']

settings = SettingsManager(name="settings", settings_directory=decky_plugin.DECKY_PLUGIN_SETTINGS_DIR)
settings.read()


def parse_boolean(value, default):
if not value:
return default

return value.lower() == 'true'


def parse_int(value, default):
return int(value) if value.isdigit() else default

def parse_float(value, default):
try:
return float(value)
except ValueError:
return default

def parse_string(value, default):
return value if value else default

def parse_array(value, default):
return value.split(",") if value else default

CONFIG_PARSER_INDEX = 0
CONFIG_DEFAULT_VALUE_INDEX = 1
CONFIG_ENTRIES = {
'disabled': [parse_boolean, True],
'output_mode': [parse_string, 'mouse'],
'external_mode': [parse_array, ['none']],
'mouse_sensitivity': [parse_int, 30],
'display_zoom': [parse_float, 1.0],
'look_ahead': [parse_int, 0],
'sbs_display_size': [parse_float, 1.0],
'sbs_display_distance': [parse_float, 1.0],
'sbs_content': [parse_boolean, False],
'sbs_mode_stretched': [parse_boolean, False],
'sideview_position': [parse_string, 'center'],
'sideview_display_size': [parse_float, 1.0],
'virtual_display_smooth_follow_enabled': [parse_boolean, False],
'sideview_smooth_follow_enabled': [parse_boolean, False]
}
ipc = XRDriverIPC(logger = decky_plugin.logger,
user_home = decky_plugin.DECKY_USER_HOME)

class Plugin:
def __init__(self):
self.breezy_installed = False
self.breezy_installing = False

async def retrieve_config(self):
return self._retrieve_config(self)

def _retrieve_config(self):
config = {}
for key, value in CONFIG_ENTRIES.items():
config[key] = value[CONFIG_DEFAULT_VALUE_INDEX]

try:
with open(CONFIG_FILE_PATH, 'r') as f:
for line in f:
try:
if not line.strip():
continue

key, value = line.strip().split('=')
if key in CONFIG_ENTRIES:
parser = CONFIG_ENTRIES[key][CONFIG_PARSER_INDEX]
default_val = CONFIG_ENTRIES[key][CONFIG_DEFAULT_VALUE_INDEX]
config[key] = parser(value, default_val)
except Exception as e:
decky_plugin.logger.error(f"Error parsing line {line}: {e}")
except FileNotFoundError as e:
decky_plugin.logger.error(f"Config file not found {e}")
return config

config['ui_view'] = self.build_ui_view(self, config)

return config
return ipc.retrieve_config()

async def write_config(self, config):
try:
output = ""

# Since the UI doesn't refresh the config before it updates, the external_mode can get out of sync with
# what's on disk. To avoid losing external_mode values, we retrieve the previous configs to preserve
# any non-managed external modes.
old_config = self._retrieve_config(self)

# remove the UI's "view" data, translate back to config values, and merge them in
view = config.pop('ui_view', None)
config.update(self.headset_mode_to_config(self, view['headset_mode'], view['is_joystick_mode'], old_config['external_mode']))

for key, value in config.items():
if key != "updated":
if isinstance(value, bool):
output += f'{key}={str(value).lower()}\n'
elif isinstance(value, int):
output += f'{key}={value}\n'
elif isinstance(value, list):
output += f'{key}={",".join(value)}\n'
else:
output += f'{key}={value}\n'

temp_file = "temp.txt"

# Write to a temporary file
with open(temp_file, 'w') as f:
f.write(output)

# Atomically replace the old config file with the new one
os.replace(temp_file, CONFIG_FILE_PATH)
os.chmod(CONFIG_FILE_PATH, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)

config['ui_view'] = self.build_ui_view(self, config)

return config
except Exception as e:
decky_plugin.logger.error(f"Error writing config {e}")
raise e

# like a SQL "view," these are computed values that are commonly used in the UI
def build_ui_view(self, config):
view = {}
view['headset_mode'] = self.config_to_headset_mode(self, config)
view['is_joystick_mode'] = config['output_mode'] == 'joystick'
return view

def filter_to_other_external_modes(self, external_modes):
return [mode for mode in external_modes if mode not in MANAGED_EXTERNAL_MODES]

def headset_mode_to_config(self, headset_mode, joystick_mode, old_external_modes):
new_external_modes = self.filter_to_other_external_modes(self, old_external_modes)

config = {}
if headset_mode == "virtual_display":
# TODO - only allow when mode when managed until the driver can support multiple external_mode values
# new_external_modes.append("virtual_display")
new_external_modes = ["virtual_display"]
config['output_mode'] = "external_only"
config['disabled'] = False
elif headset_mode == "vr_lite":
config['output_mode'] = "joystick" if joystick_mode else "mouse"
config['disabled'] = False
elif headset_mode == "sideview":
# TODO - only allow when mode when managed until the driver can support multiple external_mode values
# new_external_modes.append("sideview")
new_external_modes = ["sideview"]
config['output_mode'] = "external_only"
config['disabled'] = False
else:
config['output_mode'] = "external_only"

has_external_mode = len(new_external_modes) > 0
if not has_external_mode:
new_external_modes.append("none")
config['external_mode'] = new_external_modes

return config

def config_to_headset_mode(self, config):
if not config or config['disabled']:
return "disabled"

if config['output_mode'] in VR_LITE_OUTPUT_MODES:
return "vr_lite"

managed_mode = next((mode for mode in MANAGED_EXTERNAL_MODES if mode in config['external_mode']), None)
if managed_mode and managed_mode != "none":
return managed_mode

return "disabled"
return ipc.write_config(config)

async def write_control_flags(self, control_flags):
try:
output = ""
for key, value in control_flags.items():
if key in CONTROL_FLAGS:
if key == 'sbs_mode':
if value not in SBS_MODE_VALUES:
decky_plugin.logger.error(f"Invalid value {value} for sbs_mode flag")
continue
elif not isinstance(value, bool):
decky_plugin.logger.error(f"Invalid value {value} for {key} flag")
continue
output += f'{key}={str(value).lower()}\n'

with open(CONTROL_FLAGS_FILE_PATH, 'w') as f:
f.write(output)
except Exception as e:
decky_plugin.logger.error(f"Error writing control flags {e}")
ipc.write_control_flags(control_flags)

async def retrieve_driver_state(self):
state = {}
state['heartbeat'] = 0
state['connected_device_brand'] = None
state['connected_device_model'] = None
state['calibration_setup'] = "AUTOMATIC"
state['calibration_state'] = "NOT_CALIBRATED"
state['sbs_mode_enabled'] = False
state['sbs_mode_supported'] = False
state['firmware_update_recommended'] = False
state['device_license'] = {}

try:
with open(DRIVER_STATE_FILE_PATH, 'r') as f:
output = f.read()
for line in output.splitlines():
try:
if not line.strip():
continue

key, value = line.strip().split('=')
if key == 'heartbeat':
state[key] = parse_int(value, 0)
elif key in ['calibration_setup', 'calibration_state', 'connected_device_brand', 'connected_device_model']:
state[key] = value
elif key in ['sbs_mode_enabled', 'sbs_mode_supported', 'firmware_update_recommended']:
state[key] = parse_boolean(value, False)
elif key == 'device_license':
state[key] = json.loads(value)
except Exception as e:
decky_plugin.logger.error(f"Error parsing key-value pair {key}={value}: {e}")
except FileNotFoundError:
pass

# state is stale, just send the license
if state['heartbeat'] == 0 or (time.time() - state['heartbeat']) > 5:
return {
'device_license': state['device_license']
}

return state
return ipc.retrieve_driver_state()

async def retrieve_dont_show_again_keys(self):
return [key for key in settings.getSetting(DONT_SHOW_AGAIN_SETTING_KEY, "").split(",") if key]
Expand Down Expand Up @@ -312,7 +90,10 @@ async def is_breezy_installed(self):
return False

output = subprocess.check_output([decky_plugin.DECKY_USER_HOME + "/.local/bin/breezy_vulkan/verify_installation"], stderr=subprocess.STDOUT)
return output.strip() == b"Verification succeeded"
success = output.strip() == b"Verification succeeded"
if not success:
decky_plugin.logger.error(f"Error verifying breezy installation {output}")
return success
except subprocess.CalledProcessError as exc:
decky_plugin.logger.error(f"Error checking driver installation {exc.output}")
return False
Expand Down Expand Up @@ -358,32 +139,10 @@ async def install_breezy(self):
return False

async def request_token(self, email):
decky_plugin.logger.info(f"Requesting a new token for {email}")

# Set the USER environment variable for this command
env_copy = os.environ.copy()
env_copy["USER"] = decky_plugin.DECKY_USER

try:
output = subprocess.check_output([decky_plugin.DECKY_USER_HOME + "/bin/xreal_driver_config", "--request-token", email], stderr=subprocess.STDOUT, env=env_copy)
return output.strip() == b"Token request sent"
except subprocess.CalledProcessError as exc:
decky_plugin.logger.error(f"Error running config script {exc.output}")
return False
return ipc.request_token(email)

async def verify_token(self, token):
decky_plugin.logger.info(f"Verifying token {token}")

# Set the USER environment variable for this command
env_copy = os.environ.copy()
env_copy["USER"] = decky_plugin.DECKY_USER

try:
output = subprocess.check_output([decky_plugin.DECKY_USER_HOME + "/bin/xreal_driver_config", "--verify-token", token], stderr=subprocess.STDOUT, env=env_copy)
return output.strip() == b"Token verified"
except subprocess.CalledProcessError as exc:
decky_plugin.logger.error(f"Error running config script {exc.output}")
return False
return ipc.verify_token(token)

# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
async def _main(self):
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "decky-XRGaming",
"version": "0.8.10",
"version": "0.8.11",
"description": "Virtual display and head-tracking modes for the XREAL Air glasses",
"scripts": {
"build": "shx rm -rf dist && rollup -c",
Expand Down
Loading

0 comments on commit 0a4ccec

Please sign in to comment.