Skip to content

Commit

Permalink
Deprecate acquisition_delay=0 in favor of None (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Scheltienne authored Nov 29, 2024
1 parent a59c84e commit 39d1b51
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 53 deletions.
1 change: 1 addition & 0 deletions doc/development/changes/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ Version 1.8
===========

- Fix epoching with :class:`~mne_lsl.stream.EpochsStream` which was erroneously resetting the property ``n_new_samples`` from the attached :class:`~mne_lsl.stream.StreamLSL` objects (:pr:`371` by `Mathieu Scheltienne`_)
- Deprecate ``acquisition_delay=0`` in favor of ``acquisition_delay=None`` in the connection methods :meth:`~mne_lsl.stream.StreamLSL.connect` and :meth:`~mne_lsl.stream.EpochsStream.connect` (:pr:`372` by `Mathieu Scheltienne`_)
4 changes: 2 additions & 2 deletions examples/10_peak_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def __init__(
# create stream
self._stream = StreamLSL(
bufsize, name=stream_name, source_id=stream_source_id
).connect(acquisition_delay=0, processing_flags="all")
).connect(acquisition_delay=None, processing_flags="all")
self._stream.pick(ch_name)
self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
self._stream.notch_filter(50, picks=ch_name)
Expand Down Expand Up @@ -295,7 +295,7 @@ def __init__(
# create stream
self._stream = StreamLSL(
bufsize, name=stream_name, source_id=stream_source_id
).connect(acquisition_delay=0, processing_flags="all")
).connect(acquisition_delay=None, processing_flags="all")
self._stream.pick(ch_name)
self._stream.set_channel_types({ch_name: "misc"}, on_unit_change="ignore")
self._stream.notch_filter(50, picks=ch_name)
Expand Down
50 changes: 28 additions & 22 deletions src/mne_lsl/stream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ def acquire(self) -> None:
"""
self._check_connected("acquire")
if (
self._executor is not None and self._acquisition_delay == 0
self._executor is not None and self._acquisition_delay is None
): # pragma: no cover
raise RuntimeError(
"The executor is not None despite the acquisition delay set to "
f"{self._acquisition_delay} seconds. This should not happen, please "
"contact the developers on GitHub."
"The executor is not None despite the acquisition delay set to None. "
"This should not happen, please contact the developers on GitHub."
)
elif self._executor is not None and self._acquisition_delay != 0:
elif self._executor is not None and self._acquisition_delay is not None:
raise RuntimeError(
"Acquisition is done automatically in a background thread. The method "
"stream.acquire() should not be called."
Expand Down Expand Up @@ -270,16 +269,16 @@ def anonymize(
@abstractmethod
def connect(
self,
acquisition_delay: float,
acquisition_delay: float | None,
) -> BaseStream:
"""Connect to the stream and initiate data collection in the buffer.
Parameters
----------
acquisition_delay : float
acquisition_delay : float | None
Delay in seconds between 2 acquisition during which chunks of data are
pulled from the connected device. If ``0``, the automatic acquisition in a
background thread is disabled and the user must manually call the
pulled from the connected device. If ``None``, the automatic acquisition in
a background thread is disabled and the user must manually call the
acquisition method ``Stream.acquire()`` to pull new samples.
Returns
Expand All @@ -290,20 +289,30 @@ def connect(
if self.connected:
warn("The stream is already connected. Skipping.")
return self
check_type(acquisition_delay, ("numeric",), "acquisition_delay")
if acquisition_delay < 0:
raise ValueError(
"The acquisition delay must be a positive number "
"defining the delay at which new samples are acquired in seconds. For "
"instance, 0.2 corresponds to a pull every 200 ms. 0 corresponds to "
f"manual acquisition. The provided {acquisition_delay} is invalid."
)
if acquisition_delay is not None:
check_type(acquisition_delay, ("numeric",), "acquisition_delay")
if acquisition_delay < 0:
raise ValueError(
"The acquisition delay must be a positive number defining the "
"delay at which new samples are acquired in seconds. For instance, "
"0.2 corresponds to a pull every 200 ms. None corresponds to "
f"manual acquisition. The provided {acquisition_delay} is invalid."
)
if acquisition_delay == 0:
warn(
"Argument acquisition_delay=0 is deprecated in favor of "
"acquisition_delay=None.",
DeprecationWarning,
)
acquisition_delay = None
self._acquisition_delay = acquisition_delay
self._n_new_samples = 0
self._executor = (
ThreadPoolExecutor(max_workers=1) if self._acquisition_delay != 0 else None
None
if self._acquisition_delay is None
else ThreadPoolExecutor(max_workers=1)
)
if self._acquisition_delay != 0:
if self._executor is not None:
logger.debug("%s: ThreadPoolExecutor started.", self)
# This method needs to connect to a stream, retrieve the stream information and
# create the ringbuffer. By the end of this method, the following variables
Expand Down Expand Up @@ -1182,13 +1191,10 @@ def connected(self) -> bool:
"""
attributes = [
"_info",
"_acquisition_delay",
"_buffer",
"_picks_inlet",
"_timestamps",
]
if hasattr(self, "_acquisition_delay") and self._acquisition_delay != 0:
attributes.append("_executor")
if all(getattr(self, attr, None) is None for attr in attributes):
return False
else:
Expand Down
46 changes: 27 additions & 19 deletions src/mne_lsl/stream/epochs.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,28 +258,27 @@ def acquire(self) -> None:
"""
self._check_connected("acquire")
if (
self._executor is not None and self._acquisition_delay == 0
self._executor is not None and self._acquisition_delay is None
): # pragma: no cover
raise RuntimeError(
"The executor is not None despite the acquisition delay set to "
f"{self._acquisition_delay} seconds. This should not happen, please "
"contact the developers on GitHub."
"The executor is not None despite the acquisition delay set to None. "
"This should not happen, please contact the developers on GitHub."
)
elif self._executor is not None and self._acquisition_delay != 0:
elif self._executor is not None and self._acquisition_delay is not None:
raise RuntimeError(
"Acquisition is done automatically in a background thread. The method "
"epochs.acquire() should not be called."
)
self._acquire()

def connect(self, acquisition_delay: float = 0.001) -> EpochsStream:
def connect(self, acquisition_delay: float | None = 0.001) -> EpochsStream:
"""Start acquisition of epochs from the connected Stream.
Parameters
----------
acquisition_delay : float
acquisition_delay : float | None
Delay in seconds between 2 updates at which the event stream is queried for
new events, and thus at which the epochs are updated. If ``0``, the
new events, and thus at which the epochs are updated. If ``None``, the
automatic acquisition in a background thread is disabled and the user must
manually call the acquisition method
:meth:~`mne_lsl.stream.EpochsStream.acquire` to pull new samples.
Expand Down Expand Up @@ -308,15 +307,23 @@ def connect(self, acquisition_delay: float = 0.001) -> EpochsStream:
"The event stream was disconnected between initialization and "
"connection of the EpochsStream object."
)
check_type(acquisition_delay, ("numeric",), "acquisition_delay")
if acquisition_delay < 0:
raise ValueError(
"The acquisition delay must be a positive number defining the delay at "
"which the epochs might be updated in seconds. For instance, 0.2 "
"corresponds to a query to the event source every 200 ms. 0 "
f"corresponds to manual acquisition. The provided {acquisition_delay} "
"is invalid."
)
if acquisition_delay is not None:
check_type(acquisition_delay, ("numeric",), "acquisition_delay")
if acquisition_delay < 0:
raise ValueError(
"The acquisition delay must be a positive number defining the "
"delay at which the epochs might be updated in seconds. For "
"instance, 0.2 corresponds to a query to the event source every "
"200 ms. None corresponds to manual acquisition. The provided "
f"{acquisition_delay} is invalid."
)
if acquisition_delay == 0:
warn(
"Argument acquisition_delay=0 is deprecated in favor of "
"acquisition_delay=None.",
DeprecationWarning,
)
acquisition_delay = None
self._acquisition_delay = acquisition_delay
assert self._n_new_epochs == 0 # sanity-check
# create the buffer and start acquisition in a separate thread
Expand All @@ -336,7 +343,9 @@ def connect(self, acquisition_delay: float = 0.001) -> EpochsStream:
)
self._buffer_events = np.zeros(self._bufsize, dtype=np.int16)
self._executor = (
ThreadPoolExecutor(max_workers=1) if self._acquisition_delay != 0 else None
None
if self._acquisition_delay is None
else ThreadPoolExecutor(max_workers=1)
)
# submit the first acquisition job
if self._executor is not None:
Expand Down Expand Up @@ -615,7 +624,6 @@ def connected(self) -> bool:
:type: :class:`bool`
"""
attributes = (
"_acquisition_delay",
"_buffer",
"_ch_idx_by_type",
"_info",
Expand Down
10 changes: 5 additions & 5 deletions src/mne_lsl/stream/stream_lsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def acquire(self) -> None:

def connect(
self,
acquisition_delay: float = 0.001,
acquisition_delay: float | None = 0.001,
*,
processing_flags: str | Sequence[str] | None = None,
timeout: float | None = 2,
Expand All @@ -107,11 +107,11 @@ def connect(
Parameters
----------
acquisition_delay : float
acquisition_delay : float | None
Delay in seconds between 2 acquisition during which chunks of data are
pulled from the :class:`~mne_lsl.lsl.StreamInlet`. If ``0``, the automatic
acquisition in a background thread is disabled and the user must manually
call :meth:`~mne_lsl.stream.StreamLSL.acquire` to pull new samples.
pulled from the :class:`~mne_lsl.lsl.StreamInlet`. If ``None``, the
automatic acquisition in a background thread is disabled and the user must
manually call :meth:`~mne_lsl.stream.StreamLSL.acquire` to pull new samples.
processing_flags : list of str | ``'all'`` | None
Set the post-processing options. By default, post-processing is disabled.
Any combination of the processing flags is valid. The available flags are:
Expand Down
24 changes: 21 additions & 3 deletions tests/stream/test_epochs.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def test_epochs_without_event_stream_manual_acquisition(mock_lsl_stream):
tmin=-0.05,
tmax=0.15,
baseline=(None, 0),
).connect(acquisition_delay=0)
).connect(acquisition_delay=None)
assert epochs.n_new_epochs == 0
time.sleep(0.5)
assert epochs.n_new_epochs == 0
Expand Down Expand Up @@ -900,7 +900,7 @@ def test_epochs_invalid(mock_lsl_stream):
baseline=None,
)

stream.connect(acquisition_delay=0)
stream.connect(acquisition_delay=None)
with pytest.raises(ValueError, match="must be greater than 'tmin'"):
EpochsStream(
stream,
Expand Down Expand Up @@ -1051,7 +1051,7 @@ def test_epochs_single_event(mock_lsl_stream, outlet_marker: StreamOutlet):
tmax=1.5, # large tmax to ensure that the event can't be consumed right away
baseline=None,
picks="eeg",
).connect(acquisition_delay=0)
).connect(acquisition_delay=None)
assert epochs.n_new_epochs == 0
epochs.acquire()
time.sleep(0.5)
Expand All @@ -1072,3 +1072,21 @@ def test_epochs_single_event(mock_lsl_stream, outlet_marker: StreamOutlet):
time.sleep(0.2)
assert epochs.n_new_epochs == 1
assert event_stream.n_new_samples == 1


def test_manual_acquisition_deprecation(mock_lsl_stream):
"""Test deprecation of acquisition_delay=0."""
stream = StreamLSL(
0.5, name=mock_lsl_stream.name, source_id=mock_lsl_stream.source_id
).connect(acquisition_delay=0.1)
epochs = EpochsStream(
stream,
10,
event_channels="trg",
event_id=dict(a=1),
tmin=-0.05,
tmax=0.15,
baseline=(None, 0),
)
with pytest.warns(DeprecationWarning, match="acquisition_delay"):
epochs.connect(acquisition_delay=0)
11 changes: 10 additions & 1 deletion tests/stream/test_stream_lsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ def test_manual_acquisition(mock_lsl_stream):
"""Test manual acquisition."""
stream = Stream(
bufsize=2.0, name=mock_lsl_stream.name, source_id=mock_lsl_stream.source_id
).connect(acquisition_delay=0)
).connect(acquisition_delay=None)
_sleep_until_new_data(1e-6, mock_lsl_stream)
assert stream.n_new_samples == 0
stream.acquire()
Expand Down Expand Up @@ -1214,3 +1214,12 @@ def test_manual_acquisition_errors(mock_lsl_stream):
with pytest.raises(RuntimeError, match="Acquisition is done automatically"):
stream.acquire()
stream.disconnect()


def test_manual_acquisition_deprecation(mock_lsl_stream):
"""Test deprecation of acquisition_delay=0."""
stream = Stream(
bufsize=2.0, name=mock_lsl_stream.name, source_id=mock_lsl_stream.source_id
)
with pytest.warns(DeprecationWarning, match="acquisition_delay"):
stream.connect(acquisition_delay=0)
2 changes: 1 addition & 1 deletion tutorials/30_stream_manual.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
# mode, all operation happens in the main thread and the user has full control over when
# to acquire new samples.

stream = StreamLSL(bufsize=2, source_id=source_id).connect(acquisition_delay=0)
stream = StreamLSL(bufsize=2, source_id=source_id).connect(acquisition_delay=None)
sleep(2) # wait for new samples
print(f"New samples acquired (before stream.acquire()): {stream.n_new_samples}")
stream.acquire()
Expand Down

0 comments on commit 39d1b51

Please sign in to comment.