Skip to content

Commit

Permalink
More code cleanup in Restream
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevinjil committed Jan 8, 2025
1 parent f1f020d commit 82ab237
Showing 1 changed file with 47 additions and 55 deletions.
102 changes: 47 additions & 55 deletions Jellyfin.Xtream/Service/Restream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,22 @@ public class Restream : ILiveStream, IDirectStreamProvider, IDisposable
/// </summary>
public const string TunerHost = "Xtream-Restream";

private static readonly HttpStatusCode[] Redirects =
private static readonly HttpStatusCode[] _redirects =
[
HttpStatusCode.Moved,
HttpStatusCode.MovedPermanently,
HttpStatusCode.PermanentRedirect,
HttpStatusCode.Redirect,
];

private readonly IServerApplicationHost appHost;
private readonly WrappedBufferStream buffer;
private readonly IHttpClientFactory httpClientFactory;
private readonly ILogger logger;
private readonly CancellationTokenSource tokenSource;
private readonly bool enableStreamSharing;
private readonly string uniqueId;
private readonly string uri;
private readonly WrappedBufferStream _buffer;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger _logger;
private readonly CancellationTokenSource _tokenSource;
private readonly string _url;

private Task? copyTask;
private Stream? inputStream;

private string originalStreamId;
private MediaSourceInfo mediaSource;
private Task? _copyTask;
private Stream? _inputStream;

/// <summary>
/// Initializes a new instance of the <see cref="Restream"/> class.
Expand All @@ -71,20 +65,18 @@ public class Restream : ILiveStream, IDirectStreamProvider, IDisposable
/// <param name="mediaSource">The media which must be restreamed.</param>
public Restream(IServerApplicationHost appHost, IHttpClientFactory httpClientFactory, ILogger logger, MediaSourceInfo mediaSource)
{
this.appHost = appHost;
this.httpClientFactory = httpClientFactory;
this.logger = logger;
this.mediaSource = mediaSource;
_httpClientFactory = httpClientFactory;
_logger = logger;
MediaSource = mediaSource;

buffer = new WrappedBufferStream(16777216); // 16MiB
tokenSource = new CancellationTokenSource();
_buffer = new WrappedBufferStream(16 * 1024 * 1024); // 16MiB
_tokenSource = new CancellationTokenSource();

originalStreamId = mediaSource.Id;
enableStreamSharing = true;
uniqueId = Guid.NewGuid().ToString();
OriginalStreamId = MediaSource.Id;
UniqueId = Guid.NewGuid().ToString();

uri = mediaSource.Path;
string path = "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts";
_url = MediaSource.Path;
string path = $"/LiveTv/LiveStreamFiles/{UniqueId}/stream.ts";
MediaSource.Path = appHost.GetSmartApiUrl(IPAddress.Any) + path;
MediaSource.EncoderPath = appHost.GetApiUrlForLocalAccess() + path;
MediaSource.Protocol = MediaProtocol.Http;
Expand All @@ -94,55 +86,55 @@ public Restream(IServerApplicationHost appHost, IHttpClientFactory httpClientFac
public int ConsumerCount { get; set; }

/// <inheritdoc />
public string OriginalStreamId { get => originalStreamId; set => originalStreamId = value; }
public string OriginalStreamId { get; set; }

/// <inheritdoc />
public string TunerHostId { get => TunerHost; }
public string TunerHostId => TunerHost;

/// <inheritdoc />
public bool EnableStreamSharing { get => enableStreamSharing; }
public bool EnableStreamSharing => true;

/// <inheritdoc />
public MediaSourceInfo MediaSource { get => mediaSource; set => mediaSource = value; }
public MediaSourceInfo MediaSource { get; set; }

/// <inheritdoc />
public string UniqueId { get => uniqueId; }
public string UniqueId { get; init; }

/// <inheritdoc />
public async Task Open(CancellationToken openCancellationToken)
{
if (inputStream != null)
if (_inputStream != null)
{
logger.LogWarning("Restream for channel {ChannelId} is already open.", MediaSource.Id);
_logger.LogWarning("Restream for channel {ChannelId} is already open.", MediaSource.Id);
return;
}

string channelId = mediaSource.Id;
logger.LogInformation("Starting restream for channel {ChannelId}.", channelId);
string channelId = MediaSource.Id;
_logger.LogInformation("Starting restream for channel {ChannelId}.", channelId);

// Response stream is disposed manually.
HttpResponseMessage response = await httpClientFactory.CreateClient(NamedClient.Default)
.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead, openCancellationToken)
HttpResponseMessage response = await _httpClientFactory.CreateClient(NamedClient.Default)
.GetAsync(_url, HttpCompletionOption.ResponseHeadersRead, openCancellationToken)
.ConfigureAwait(true);
logger.LogDebug("Stream for channel {ChannelId} using url {Url}", channelId, uri);
_logger.LogDebug("Stream for channel {ChannelId} using url {Url}", channelId, _url);

// Handle a manual redirect in the case of a HTTPS to HTTP downgrade.
if (Redirects.Contains(response.StatusCode))
if (_redirects.Contains(response.StatusCode))
{
logger.LogDebug("Stream for channel {ChannelId} redirected to url {Url}", channelId, response.Headers.Location);
response = await httpClientFactory.CreateClient(NamedClient.Default)
_logger.LogDebug("Stream for channel {ChannelId} redirected to url {Url}", channelId, response.Headers.Location);
response = await _httpClientFactory.CreateClient(NamedClient.Default)
.GetAsync(response.Headers.Location, HttpCompletionOption.ResponseHeadersRead, openCancellationToken)
.ConfigureAwait(true);
}

inputStream = await response.Content.ReadAsStreamAsync(CancellationToken.None).ConfigureAwait(false);
copyTask = inputStream.CopyToAsync(buffer, tokenSource.Token)
_inputStream = await response.Content.ReadAsStreamAsync(CancellationToken.None).ConfigureAwait(false);
_copyTask = _inputStream.CopyToAsync(_buffer, _tokenSource.Token)
.ContinueWith(
(Task t) =>
{
logger.LogInformation("Restream for channel {ChannelId} finished with state {Status}", mediaSource.Id, t.Status);
inputStream.Close();
inputStream = null;
_logger.LogInformation("Restream for channel {ChannelId} finished with state {Status}", MediaSource.Id, t.Status);
_inputStream.Close();
_inputStream = null;
},
CancellationToken.None,
TaskContinuationOptions.None,
Expand All @@ -152,26 +144,26 @@ public async Task Open(CancellationToken openCancellationToken)
/// <inheritdoc />
public async Task Close()
{
if (copyTask == null)
if (_copyTask == null)
{
throw new ArgumentNullException("copyTask");
}

await tokenSource.CancelAsync().ConfigureAwait(false);
await copyTask.ConfigureAwait(false);
await _tokenSource.CancelAsync().ConfigureAwait(false);
await _copyTask.ConfigureAwait(false);
}

/// <inheritdoc />
public Stream GetStream()
{
if (inputStream == null)
if (_inputStream == null)
{
logger.LogWarning("Restream for channel {ChannelId} was not opened.", mediaSource.Id);
_logger.LogWarning("Restream for channel {ChannelId} was not opened.", MediaSource.Id);
_ = Open(CancellationToken.None);
}

return new WrappedBufferReadStream(buffer);
logger.LogInformation("Opening restream {Count} for channel {ChannelId}.", ConsumerCount, mediaSource.Id);
_logger.LogInformation("Opening restream {Count} for channel {ChannelId}.", ConsumerCount, MediaSource.Id);
return new WrappedBufferReadStream(_buffer);
}

/// <summary>
Expand All @@ -182,9 +174,9 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
inputStream?.Dispose();
buffer.Dispose();
tokenSource.Dispose();
_inputStream?.Dispose();
_buffer.Dispose();
_tokenSource.Dispose();
}
}

Expand Down

0 comments on commit 82ab237

Please sign in to comment.