diff --git a/src/NadekoBot/DataStructures/PoopyRingBuffer.cs b/src/NadekoBot/DataStructures/PoopyRingBuffer.cs new file mode 100644 index 00000000..4f9db9be --- /dev/null +++ b/src/NadekoBot/DataStructures/PoopyRingBuffer.cs @@ -0,0 +1,145 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace NadekoBot.DataStructures +{ + public class PoopyRingBuffer : IDisposable + { + // readpos == writepos means empty + // writepos == readpos - 1 means full + + private readonly byte[] buffer; + private readonly object posLock = new object(); + public int Capacity { get; } + + private volatile int _readPos = 0; + private int ReadPos + { + get => _readPos; + set { lock (posLock) _readPos = value; } + } + private volatile int _writePos = 0; + private int WritePos + { + get => _writePos; + set { lock (posLock) _writePos = value; } + } + private int Length + { + get + { + lock (posLock) + { + return ReadPos <= WritePos ? + WritePos - ReadPos : + Capacity - (ReadPos - WritePos); + } + } + } + + public int RemainingCapacity + { + get { lock (posLock) return Capacity - Length - 1; } + } + + private readonly SemaphoreSlim _locker = new SemaphoreSlim(1, 1); + + public PoopyRingBuffer(int capacity = 38400) + { + this.Capacity = capacity + 1; + this.buffer = new byte[this.Capacity]; + } + + public Task ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) => Task.Run(async () => + { + await _locker.WaitAsync(cancelToken); + try + { + Console.WriteLine("Reading {0}", toRead); + if (WritePos == ReadPos) + return 0; + + if (toRead > Length) + toRead = Length; + + if (WritePos > ReadPos) + { + Buffer.BlockCopy(buffer, ReadPos, b, offset, toRead); + ReadPos += toRead; + } + else + { + var toEnd = Capacity - ReadPos; + var firstRead = toRead > toEnd ? + toEnd : + toRead; + Buffer.BlockCopy(buffer, ReadPos, b, offset, firstRead); + ReadPos += firstRead; + var secondRead = toRead - firstRead; + if (secondRead > 0) + { + Buffer.BlockCopy(buffer, 0, b, offset + firstRead, secondRead); + ReadPos = secondRead; + } + } + Console.WriteLine("Readpos: {0} WritePos: {1}", ReadPos, WritePos); + return toRead; + } + finally + { + _locker.Release(); + } + }); + + public Task WriteAsync(byte[] b, int offset, int toWrite, CancellationToken cancelToken) => Task.Run(async () => + { + while (toWrite > RemainingCapacity) + await Task.Delay(100, cancelToken); + + await _locker.WaitAsync(cancelToken); + try + { + Console.WriteLine("Writing {0}", toWrite); + if (WritePos < ReadPos) + { + Buffer.BlockCopy(b, offset, buffer, WritePos, toWrite); + WritePos += toWrite; + } + else + { + var toEnd = Capacity - WritePos; + var firstWrite = toWrite > toEnd ? + toEnd : + toWrite; + Buffer.BlockCopy(b, offset, buffer, WritePos, firstWrite); + var secondWrite = toWrite - firstWrite; + if (secondWrite > 0) + { + Buffer.BlockCopy(b, offset + firstWrite, buffer, 0, secondWrite); + WritePos = secondWrite; + } + else + { + WritePos += firstWrite; + if (WritePos == Capacity) + WritePos = 0; + } + } + Console.WriteLine("Readpos: {0} WritePos: {1}", ReadPos, WritePos); + return toWrite; + } + finally + { + _locker.Release(); + } + }); + + public void Dispose() + { + } + } +} diff --git a/src/NadekoBot/Modules/Music/Music.cs b/src/NadekoBot/Modules/Music/Music.cs index a8b7ae0a..3681d631 100644 --- a/src/NadekoBot/Modules/Music/Music.cs +++ b/src/NadekoBot/Modules/Music/Music.cs @@ -451,6 +451,7 @@ namespace NadekoBot.Modules.Music { try { + await Task.Yield(); //todo fix for all if (item.ProviderType == MusicType.Normal) await Task.WhenAll(Task.Delay(1000), InternalQueue(mp, await _music.ResolveSong(item.Query, Context.User.ToString(), item.ProviderType), true)).ConfigureAwait(false); diff --git a/src/NadekoBot/Services/Music/MusicPlayer.cs b/src/NadekoBot/Services/Music/MusicPlayer.cs index b4bc4009..97581b2f 100644 --- a/src/NadekoBot/Services/Music/MusicPlayer.cs +++ b/src/NadekoBot/Services/Music/MusicPlayer.cs @@ -81,59 +81,58 @@ namespace NadekoBot.Services.Music _log.Info("Starting"); - var p = Process.Start(new ProcessStartInfo + using (var b = new SongBuffer(data.Song.Uri, "")) { - FileName = "ffmpeg", - Arguments = $"-i {data.Song.Uri} -f s16le -ar 48000 -vn -ac 2 pipe:1 -loglevel quiet", - UseShellExecute = false, - RedirectStandardOutput = true, - RedirectStandardError = false, - CreateNoWindow = true, - }); - var ac = await GetAudioClient(); - if (ac == null) - { - await Task.Delay(900); - // just wait some time, maybe bot doesn't even have perms to join that voice channel, - // i don't want to spam connection attempts - continue; - } - var pcm = ac.CreatePCMStream(AudioApplication.Music); + var bufferSuccess = await b.StartBuffering(cancelToken); - OnStarted?.Invoke(this, data.Song); + if (bufferSuccess == false) + continue; - byte[] buffer = new byte[3840]; - int bytesRead = 0; - try - { - while ((bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0) + var ac = await GetAudioClient(); + if (ac == null) { - var vol = Volume; - if (vol != 1) - AdjustVolume(buffer, vol); - await pcm.WriteAsync(buffer, 0, bytesRead, cancelToken); - - await (pauseTaskSource?.Task ?? Task.CompletedTask); + await Task.Delay(900); + // just wait some time, maybe bot doesn't even have perms to join that voice channel, + // i don't want to spam connection attempts + continue; } - } - catch (OperationCanceledException) - { - _log.Info("Song Canceled"); - } - catch (Exception ex) - { - _log.Warn(ex); - } - finally - { - //flush is known to get stuck from time to time, just cancel it if it takes more than 1 second - var flushCancel = new CancellationTokenSource(); - var flushToken = flushCancel.Token; - var flushDelay = Task.Delay(1000, flushToken); - await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken)); - flushCancel.Cancel(); + var pcm = ac.CreatePCMStream(AudioApplication.Music); - OnCompleted?.Invoke(this, data.Song); + OnStarted?.Invoke(this, data.Song); + + byte[] buffer = new byte[3840]; + int bytesRead = 0; + try + { + while ((bytesRead = await b.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0) + { + var vol = Volume; + if (vol != 1) + AdjustVolume(buffer, vol); + await Task.WhenAll(Task.Delay(10), pcm.WriteAsync(buffer, 0, bytesRead, cancelToken)).ConfigureAwait(false); + + await (pauseTaskSource?.Task ?? Task.CompletedTask); + } + } + catch (OperationCanceledException) + { + _log.Info("Song Canceled"); + } + catch (Exception ex) + { + _log.Warn(ex); + } + finally + { + //flush is known to get stuck from time to time, just cancel it if it takes more than 1 second + var flushCancel = new CancellationTokenSource(); + var flushToken = flushCancel.Token; + var flushDelay = Task.Delay(1000, flushToken); + await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken)); + flushCancel.Cancel(); + + OnCompleted?.Invoke(this, data.Song); + } } } finally @@ -141,7 +140,7 @@ namespace NadekoBot.Services.Music _log.Info("Next song"); do { - await Task.Delay(100); + await Task.Delay(500); } while (Stopped && !Exited); if(!RepeatCurrentSong) @@ -158,6 +157,14 @@ namespace NadekoBot.Services.Music reconnect) try { + try + { + await _audioClient?.StopAsync(); + _audioClient?.Dispose(); + } + catch + { + } _audioClient = await VoiceChannel.ConnectAsync(); } catch diff --git a/src/NadekoBot/Services/Music/MusicService.cs b/src/NadekoBot/Services/Music/MusicService.cs index 79d787fc..986fd0fd 100644 --- a/src/NadekoBot/Services/Music/MusicService.cs +++ b/src/NadekoBot/Services/Music/MusicService.cs @@ -47,11 +47,6 @@ namespace NadekoBot.Services.Music Directory.CreateDirectory(MusicDataPath); } - // public MusicPlayer GetPlayer(ulong guildId) - // { - // MusicPlayers.TryGetValue(guildId, out var player); - // return player; - // } public float GetDefaultVolume(ulong guildId) { return _defaultVolumes.GetOrAdd(guildId, (id) => diff --git a/src/NadekoBot/Services/Music/SongBuffer.cs b/src/NadekoBot/Services/Music/SongBuffer.cs index f11c2cce..c5c08175 100644 --- a/src/NadekoBot/Services/Music/SongBuffer.cs +++ b/src/NadekoBot/Services/Music/SongBuffer.cs @@ -1,10 +1,79 @@ -//using NadekoBot.Extensions; -//using NLog; -//using System; -//using System.Diagnostics; -//using System.IO; -//using System.Threading; -//using System.Threading.Tasks; +using NadekoBot.DataStructures; +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace NadekoBot.Services.Music +{ + public class SongBuffer : IDisposable + { + const int maxReadSize = 3840; + private Process p; + private PoopyRingBuffer _outStream = new PoopyRingBuffer(); + + private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1); + + public string SongUri { get; private set; } + + public SongBuffer(string songUri, string skipTo) + { + this.SongUri = songUri; + + this.p = Process.Start(new ProcessStartInfo + { + FileName = "ffmpeg", + Arguments = $"-i {songUri} -f s16le -ar 48000 -vn -ac 2 pipe:1 -loglevel quiet", + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = false, + CreateNoWindow = true, + }); + } + + public Task StartBuffering(CancellationToken cancelToken) + { + var toReturn = new TaskCompletionSource(); + var _ = Task.Run(async () => + { + try + { + byte[] buffer = new byte[3840]; + while (!this.p.HasExited || cancelToken.IsCancellationRequested) + { + int bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false); + + await _outStream.WriteAsync(buffer, 0, bytesRead, cancelToken); + + if (_outStream.RemainingCapacity < _outStream.Capacity * 0.9f) + toReturn.TrySetResult(true); + } + } + catch + { + toReturn.TrySetResult(false); + //ignored + } + }, cancelToken); + + return toReturn.Task; + } + + public Task ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) + { + return _outStream.ReadAsync(b, offset, toRead, cancelToken); + } + + public void Dispose() + { + try { this.p.Kill(); } + catch { } + _outStream.Dispose(); + this.p.Dispose(); + } + } +} //namespace NadekoBot.Services.Music //{