diff --git a/src/NadekoBot/DataStructures/PoopyRingBuffer.cs b/src/NadekoBot/DataStructures/PoopyRingBuffer.cs index a09af647..6c1e1118 100644 --- a/src/NadekoBot/DataStructures/PoopyRingBuffer.cs +++ b/src/NadekoBot/DataStructures/PoopyRingBuffer.cs @@ -10,42 +10,27 @@ namespace NadekoBot.DataStructures // writepos == readpos - 1 means full private readonly byte[] buffer; - private readonly object posLock = new object(); public int Capacity { get; } - private volatile int _readPos = 0; + private int _readPos = 0; private int ReadPos { get => _readPos; - set { lock (posLock) _readPos = value; } + set => _readPos = value; } - private volatile int _writePos = 0; + private int _writePos = 0; private int WritePos { get => _writePos; - set { lock (posLock) _writePos = value; } + set => _writePos = value; } - private int Length - { - get - { - lock (posLock) - { - return ReadPos <= WritePos ? - WritePos - ReadPos : - Capacity - (ReadPos - WritePos); - } - } - } - - public int LightLength => - _readPos <= _writePos? - _writePos - _readPos : - Capacity - (_readPos - _writePos); + public int Length => ReadPos <= WritePos + ? WritePos - ReadPos + : Capacity - (ReadPos - WritePos); public int RemainingCapacity { - get { lock (posLock) return Capacity - Length - 1; } + get => Capacity - Length - 1; } private readonly SemaphoreSlim _locker = new SemaphoreSlim(1, 1); @@ -56,90 +41,76 @@ namespace NadekoBot.DataStructures this.buffer = new byte[this.Capacity]; } - public Task ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) => Task.Run(async () => + public int Read(byte[] b, int offset, int toRead) { - await _locker.WaitAsync(cancelToken); - try + if (WritePos == ReadPos) + return 0; + + if (toRead > Length) + toRead = Length; + + if (WritePos > ReadPos) { - if (WritePos == ReadPos) - return 0; - - if (toRead > Length) - toRead = Length; - - if (WritePos > ReadPos) - { - Buffer.BlockCopy(buffer, ReadPos, b, offset, toRead); - ReadPos += toRead; - } - else - { - var toEnd = Capacity - ReadPos; - var firstRead = toRead > toEnd ? - toEnd : - toRead; - Buffer.BlockCopy(buffer, ReadPos, b, offset, firstRead); - ReadPos += firstRead; - var secondRead = toRead - firstRead; - if (secondRead > 0) - { - Buffer.BlockCopy(buffer, 0, b, offset + firstRead, secondRead); - ReadPos = secondRead; - } - } - return toRead; + Array.Copy(buffer, ReadPos, b, offset, toRead); + ReadPos += toRead; } - finally + else { - _locker.Release(); + var toEnd = Capacity - ReadPos; + var firstRead = toRead > toEnd ? + toEnd : + toRead; + Array.Copy(buffer, ReadPos, b, offset, firstRead); + ReadPos += firstRead; + var secondRead = toRead - firstRead; + if (secondRead > 0) + { + Array.Copy(buffer, 0, b, offset + firstRead, secondRead); + ReadPos = secondRead; + } } - }); + return toRead; + } - public Task WriteAsync(byte[] b, int offset, int toWrite, CancellationToken cancelToken) => Task.Run(async () => + public bool Write(byte[] b, int offset, int toWrite) { while (toWrite > RemainingCapacity) - await Task.Delay(1000, cancelToken); // wait a lot, buffer should be large anyway + return false; if (toWrite == 0) - return; + return true; - await _locker.WaitAsync(cancelToken); - try + if (WritePos < ReadPos) { - if (WritePos < ReadPos) + Array.Copy(b, offset, buffer, WritePos, toWrite); + WritePos += toWrite; + } + else + { + var toEnd = Capacity - WritePos; + var firstWrite = toWrite > toEnd ? + toEnd : + toWrite; + Array.Copy(b, offset, buffer, WritePos, firstWrite); + var secondWrite = toWrite - firstWrite; + if (secondWrite > 0) { - Buffer.BlockCopy(b, offset, buffer, WritePos, toWrite); - WritePos += toWrite; + Array.Copy(b, offset + firstWrite, buffer, 0, secondWrite); + WritePos = secondWrite; } else { - var toEnd = Capacity - WritePos; - var firstWrite = toWrite > toEnd ? - toEnd : - toWrite; - Buffer.BlockCopy(b, offset, buffer, WritePos, firstWrite); - var secondWrite = toWrite - firstWrite; - if (secondWrite > 0) - { - Buffer.BlockCopy(b, offset + firstWrite, buffer, 0, secondWrite); - WritePos = secondWrite; - } - else - { - WritePos += firstWrite; - if (WritePos == Capacity) - WritePos = 0; - } + WritePos += firstWrite; + if (WritePos == Capacity) + WritePos = 0; } } - finally - { - _locker.Release(); - } - }); + return true; + } public void Dispose() { + } } } diff --git a/src/NadekoBot/Modules/Music/Music.cs b/src/NadekoBot/Modules/Music/Music.cs index 6cb3ebe8..bc88ff2d 100644 --- a/src/NadekoBot/Modules/Music/Music.cs +++ b/src/NadekoBot/Modules/Music/Music.cs @@ -45,7 +45,7 @@ namespace NadekoBot.Modules.Music var t = _music.DestroyPlayer(arg.Id); return Task.CompletedTask; } - + //todo changing server region is bugged again private Task Client_UserVoiceStateUpdated(SocketUser iusr, SocketVoiceState oldState, SocketVoiceState newState) { var t = Task.Run(() => diff --git a/src/NadekoBot/NadekoBot.cs b/src/NadekoBot/NadekoBot.cs index 855c5652..5560437f 100644 --- a/src/NadekoBot/NadekoBot.cs +++ b/src/NadekoBot/NadekoBot.cs @@ -31,6 +31,7 @@ using NadekoBot.Extensions; namespace NadekoBot { + //todo log when joining or leaving the server public class NadekoBot { private Logger _log; diff --git a/src/NadekoBot/Services/Music/MusicPlayer.cs b/src/NadekoBot/Services/Music/MusicPlayer.cs index c42d56da..a9a12f25 100644 --- a/src/NadekoBot/Services/Music/MusicPlayer.cs +++ b/src/NadekoBot/Services/Music/MusicPlayer.cs @@ -131,38 +131,40 @@ namespace NadekoBot.Services.Music _log.Info("Starting"); using (var b = new SongBuffer(data.Song.Uri, "")) { - var bufferTask = b.StartBuffering(cancelToken); - var timeout = Task.Delay(10000); - if (Task.WhenAny(bufferTask, timeout) == timeout) - { - _log.Info("Buffering failed due to a timeout."); - continue; - } - else if (!bufferTask.Result) - { - _log.Info("Buffering failed due to a cancel or error."); - continue; - } - - var ac = await GetAudioClient(); - if (ac == null) - { - await Task.Delay(900, cancelToken); - // just wait some time, maybe bot doesn't even have perms to join that voice channel, - // i don't want to spam connection attempts - continue; - } - var pcm = ac.CreatePCMStream(AudioApplication.Music, bufferMillis: 500); - OnStarted?.Invoke(this, data); - - byte[] buffer = new byte[3840]; - int bytesRead = 0; + AudioOutStream pcm = null; try { - while ((bytesRead = await b.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0 + var bufferTask = b.StartBuffering(cancelToken); + var timeout = Task.Delay(10000); + if (Task.WhenAny(bufferTask, timeout) == timeout) + { + _log.Info("Buffering failed due to a timeout."); + continue; + } + else if (!bufferTask.Result) + { + _log.Info("Buffering failed due to a cancel or error."); + continue; + } + + var ac = await GetAudioClient(); + if (ac == null) + { + await Task.Delay(900, cancelToken); + // just wait some time, maybe bot doesn't even have perms to join that voice channel, + // i don't want to spam connection attempts + continue; + } + pcm = ac.CreatePCMStream(AudioApplication.Music, bufferMillis: 500); + OnStarted?.Invoke(this, data); + + byte[] buffer = new byte[3840]; + int bytesRead = 0; + + while ((bytesRead = b.Read(buffer, 0, buffer.Length)) > 0 && (MaxPlaytimeSeconds <= 0 || MaxPlaytimeSeconds >= CurrentTime.TotalSeconds)) { - AdjustVolume(buffer, Volume); + //AdjustVolume(buffer, Volume); await pcm.WriteAsync(buffer, 0, bytesRead, cancelToken).ConfigureAwait(false); unchecked { _bytesSent += bytesRead; } @@ -179,12 +181,16 @@ namespace NadekoBot.Services.Music } finally { - //flush is known to get stuck from time to time, just cancel it if it takes more than 1 second - var flushCancel = new CancellationTokenSource(); - var flushToken = flushCancel.Token; - var flushDelay = Task.Delay(1000, flushToken); - await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken)); - flushCancel.Cancel(); + if (pcm != null) + { + // flush is known to get stuck from time to time, + // just skip flushing if it takes more than 1 second + var flushCancel = new CancellationTokenSource(); + var flushToken = flushCancel.Token; + var flushDelay = Task.Delay(1000, flushToken); + await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken)); + flushCancel.Cancel(); + } OnCompleted?.Invoke(this, data.Song); } @@ -309,8 +315,12 @@ namespace NadekoBot.Services.Music { try { - await _audioClient?.StopAsync(); - _audioClient?.Dispose(); + var t = _audioClient?.StopAsync(); + if (t != null) + { + await t; + _audioClient?.Dispose(); + } } catch { diff --git a/src/NadekoBot/Services/Music/SongBuffer.cs b/src/NadekoBot/Services/Music/SongBuffer.cs index 08737c49..527c65f3 100644 --- a/src/NadekoBot/Services/Music/SongBuffer.cs +++ b/src/NadekoBot/Services/Music/SongBuffer.cs @@ -36,10 +36,17 @@ namespace NadekoBot.Services.Music var t = Task.Run(() => { this.p.BeginErrorReadLine(); + this.p.ErrorDataReceived += P_ErrorDataReceived; this.p.WaitForExit(); }); } + private void P_ErrorDataReceived(object sender, DataReceivedEventArgs e) + { + _log.Error(">>> " + e.Data); + } + + private readonly object locker = new object(); public Task StartBuffering(CancellationToken cancelToken) { var toReturn = new TaskCompletionSource(); @@ -49,14 +56,26 @@ namespace NadekoBot.Services.Music { byte[] buffer = new byte[readSize]; int bytesRead = 1; - while (!cancelToken.IsCancellationRequested && !this.p.HasExited && bytesRead > 0) + while (!cancelToken.IsCancellationRequested && !this.p.HasExited) { bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, readSize, cancelToken).ConfigureAwait(false); - await _outStream.WriteAsync(buffer, 0, bytesRead, cancelToken); + if (bytesRead == 0) + break; + bool written; + do + { + lock (locker) + written = _outStream.Write(buffer, 0, bytesRead); + if (!written) + await Task.Delay(32, cancelToken); + } + while (!written); + lock (locker) + if (_outStream.Length > 200_000 || bytesRead == 0) + if (toReturn.TrySetResult(true)) + _log.Info("Prebuffering finished"); - if (_outStream.LightLength > 200_000 || bytesRead == 0) - if (toReturn.TrySetResult(true)) - _log.Info("Prebuffering finished"); + await Task.Delay(5); // @.@ } _log.Info("FFMPEG killed, song canceled, or song fully downloaded"); } @@ -84,9 +103,10 @@ Check the guides for your platform on how to setup ffmpeg correctly: return toReturn.Task; } - public Task ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) + public int Read(byte[] b, int offset, int toRead) { - return _outStream.ReadAsync(b, offset, toRead, cancelToken); + lock (locker) + return _outStream.Read(b, offset, toRead); } public void Dispose() @@ -94,6 +114,8 @@ Check the guides for your platform on how to setup ffmpeg correctly: try { this.p.Kill(); } catch { } _outStream.Dispose(); + this.p.StandardError.Dispose(); + this.p.StandardOutput.Dispose(); this.p.Dispose(); } }