Cleanup and fixes

This commit is contained in:
Master Kwoth 2017-07-03 20:26:17 +02:00
parent 421431d01d
commit 44859529d5
5 changed files with 134 additions and 130 deletions

View File

@ -10,42 +10,27 @@ namespace NadekoBot.DataStructures
// writepos == readpos - 1 means full // writepos == readpos - 1 means full
private readonly byte[] buffer; private readonly byte[] buffer;
private readonly object posLock = new object();
public int Capacity { get; } public int Capacity { get; }
private volatile int _readPos = 0; private int _readPos = 0;
private int ReadPos private int ReadPos
{ {
get => _readPos; get => _readPos;
set { lock (posLock) _readPos = value; } set => _readPos = value;
} }
private volatile int _writePos = 0; private int _writePos = 0;
private int WritePos private int WritePos
{ {
get => _writePos; get => _writePos;
set { lock (posLock) _writePos = value; } set => _writePos = value;
} }
private int Length public int Length => ReadPos <= WritePos
{ ? WritePos - ReadPos
get : Capacity - (ReadPos - WritePos);
{
lock (posLock)
{
return ReadPos <= WritePos ?
WritePos - ReadPos :
Capacity - (ReadPos - WritePos);
}
}
}
public int LightLength =>
_readPos <= _writePos?
_writePos - _readPos :
Capacity - (_readPos - _writePos);
public int RemainingCapacity public int RemainingCapacity
{ {
get { lock (posLock) return Capacity - Length - 1; } get => Capacity - Length - 1;
} }
private readonly SemaphoreSlim _locker = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _locker = new SemaphoreSlim(1, 1);
@ -56,90 +41,76 @@ namespace NadekoBot.DataStructures
this.buffer = new byte[this.Capacity]; this.buffer = new byte[this.Capacity];
} }
public Task<int> ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) => Task.Run(async () => public int Read(byte[] b, int offset, int toRead)
{ {
await _locker.WaitAsync(cancelToken); if (WritePos == ReadPos)
try return 0;
if (toRead > Length)
toRead = Length;
if (WritePos > ReadPos)
{ {
if (WritePos == ReadPos) Array.Copy(buffer, ReadPos, b, offset, toRead);
return 0; ReadPos += toRead;
if (toRead > Length)
toRead = Length;
if (WritePos > ReadPos)
{
Buffer.BlockCopy(buffer, ReadPos, b, offset, toRead);
ReadPos += toRead;
}
else
{
var toEnd = Capacity - ReadPos;
var firstRead = toRead > toEnd ?
toEnd :
toRead;
Buffer.BlockCopy(buffer, ReadPos, b, offset, firstRead);
ReadPos += firstRead;
var secondRead = toRead - firstRead;
if (secondRead > 0)
{
Buffer.BlockCopy(buffer, 0, b, offset + firstRead, secondRead);
ReadPos = secondRead;
}
}
return toRead;
} }
finally else
{ {
_locker.Release(); var toEnd = Capacity - ReadPos;
var firstRead = toRead > toEnd ?
toEnd :
toRead;
Array.Copy(buffer, ReadPos, b, offset, firstRead);
ReadPos += firstRead;
var secondRead = toRead - firstRead;
if (secondRead > 0)
{
Array.Copy(buffer, 0, b, offset + firstRead, secondRead);
ReadPos = secondRead;
}
} }
}); return toRead;
}
public Task WriteAsync(byte[] b, int offset, int toWrite, CancellationToken cancelToken) => Task.Run(async () => public bool Write(byte[] b, int offset, int toWrite)
{ {
while (toWrite > RemainingCapacity) while (toWrite > RemainingCapacity)
await Task.Delay(1000, cancelToken); // wait a lot, buffer should be large anyway return false;
if (toWrite == 0) if (toWrite == 0)
return; return true;
await _locker.WaitAsync(cancelToken); if (WritePos < ReadPos)
try
{ {
if (WritePos < ReadPos) Array.Copy(b, offset, buffer, WritePos, toWrite);
WritePos += toWrite;
}
else
{
var toEnd = Capacity - WritePos;
var firstWrite = toWrite > toEnd ?
toEnd :
toWrite;
Array.Copy(b, offset, buffer, WritePos, firstWrite);
var secondWrite = toWrite - firstWrite;
if (secondWrite > 0)
{ {
Buffer.BlockCopy(b, offset, buffer, WritePos, toWrite); Array.Copy(b, offset + firstWrite, buffer, 0, secondWrite);
WritePos += toWrite; WritePos = secondWrite;
} }
else else
{ {
var toEnd = Capacity - WritePos; WritePos += firstWrite;
var firstWrite = toWrite > toEnd ? if (WritePos == Capacity)
toEnd : WritePos = 0;
toWrite;
Buffer.BlockCopy(b, offset, buffer, WritePos, firstWrite);
var secondWrite = toWrite - firstWrite;
if (secondWrite > 0)
{
Buffer.BlockCopy(b, offset + firstWrite, buffer, 0, secondWrite);
WritePos = secondWrite;
}
else
{
WritePos += firstWrite;
if (WritePos == Capacity)
WritePos = 0;
}
} }
} }
finally return true;
{ }
_locker.Release();
}
});
public void Dispose() public void Dispose()
{ {
} }
} }
} }

View File

@ -45,7 +45,7 @@ namespace NadekoBot.Modules.Music
var t = _music.DestroyPlayer(arg.Id); var t = _music.DestroyPlayer(arg.Id);
return Task.CompletedTask; return Task.CompletedTask;
} }
//todo changing server region is bugged again
private Task Client_UserVoiceStateUpdated(SocketUser iusr, SocketVoiceState oldState, SocketVoiceState newState) private Task Client_UserVoiceStateUpdated(SocketUser iusr, SocketVoiceState oldState, SocketVoiceState newState)
{ {
var t = Task.Run(() => var t = Task.Run(() =>

View File

@ -31,6 +31,7 @@ using NadekoBot.Extensions;
namespace NadekoBot namespace NadekoBot
{ {
//todo log when joining or leaving the server
public class NadekoBot public class NadekoBot
{ {
private Logger _log; private Logger _log;

View File

@ -131,38 +131,40 @@ namespace NadekoBot.Services.Music
_log.Info("Starting"); _log.Info("Starting");
using (var b = new SongBuffer(data.Song.Uri, "")) using (var b = new SongBuffer(data.Song.Uri, ""))
{ {
var bufferTask = b.StartBuffering(cancelToken); AudioOutStream pcm = null;
var timeout = Task.Delay(10000);
if (Task.WhenAny(bufferTask, timeout) == timeout)
{
_log.Info("Buffering failed due to a timeout.");
continue;
}
else if (!bufferTask.Result)
{
_log.Info("Buffering failed due to a cancel or error.");
continue;
}
var ac = await GetAudioClient();
if (ac == null)
{
await Task.Delay(900, cancelToken);
// just wait some time, maybe bot doesn't even have perms to join that voice channel,
// i don't want to spam connection attempts
continue;
}
var pcm = ac.CreatePCMStream(AudioApplication.Music, bufferMillis: 500);
OnStarted?.Invoke(this, data);
byte[] buffer = new byte[3840];
int bytesRead = 0;
try try
{ {
while ((bytesRead = await b.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0 var bufferTask = b.StartBuffering(cancelToken);
var timeout = Task.Delay(10000);
if (Task.WhenAny(bufferTask, timeout) == timeout)
{
_log.Info("Buffering failed due to a timeout.");
continue;
}
else if (!bufferTask.Result)
{
_log.Info("Buffering failed due to a cancel or error.");
continue;
}
var ac = await GetAudioClient();
if (ac == null)
{
await Task.Delay(900, cancelToken);
// just wait some time, maybe bot doesn't even have perms to join that voice channel,
// i don't want to spam connection attempts
continue;
}
pcm = ac.CreatePCMStream(AudioApplication.Music, bufferMillis: 500);
OnStarted?.Invoke(this, data);
byte[] buffer = new byte[3840];
int bytesRead = 0;
while ((bytesRead = b.Read(buffer, 0, buffer.Length)) > 0
&& (MaxPlaytimeSeconds <= 0 || MaxPlaytimeSeconds >= CurrentTime.TotalSeconds)) && (MaxPlaytimeSeconds <= 0 || MaxPlaytimeSeconds >= CurrentTime.TotalSeconds))
{ {
AdjustVolume(buffer, Volume); //AdjustVolume(buffer, Volume);
await pcm.WriteAsync(buffer, 0, bytesRead, cancelToken).ConfigureAwait(false); await pcm.WriteAsync(buffer, 0, bytesRead, cancelToken).ConfigureAwait(false);
unchecked { _bytesSent += bytesRead; } unchecked { _bytesSent += bytesRead; }
@ -179,12 +181,16 @@ namespace NadekoBot.Services.Music
} }
finally finally
{ {
//flush is known to get stuck from time to time, just cancel it if it takes more than 1 second if (pcm != null)
var flushCancel = new CancellationTokenSource(); {
var flushToken = flushCancel.Token; // flush is known to get stuck from time to time,
var flushDelay = Task.Delay(1000, flushToken); // just skip flushing if it takes more than 1 second
await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken)); var flushCancel = new CancellationTokenSource();
flushCancel.Cancel(); var flushToken = flushCancel.Token;
var flushDelay = Task.Delay(1000, flushToken);
await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken));
flushCancel.Cancel();
}
OnCompleted?.Invoke(this, data.Song); OnCompleted?.Invoke(this, data.Song);
} }
@ -309,8 +315,12 @@ namespace NadekoBot.Services.Music
{ {
try try
{ {
await _audioClient?.StopAsync(); var t = _audioClient?.StopAsync();
_audioClient?.Dispose(); if (t != null)
{
await t;
_audioClient?.Dispose();
}
} }
catch catch
{ {

View File

@ -36,10 +36,17 @@ namespace NadekoBot.Services.Music
var t = Task.Run(() => var t = Task.Run(() =>
{ {
this.p.BeginErrorReadLine(); this.p.BeginErrorReadLine();
this.p.ErrorDataReceived += P_ErrorDataReceived;
this.p.WaitForExit(); this.p.WaitForExit();
}); });
} }
private void P_ErrorDataReceived(object sender, DataReceivedEventArgs e)
{
_log.Error(">>> " + e.Data);
}
private readonly object locker = new object();
public Task<bool> StartBuffering(CancellationToken cancelToken) public Task<bool> StartBuffering(CancellationToken cancelToken)
{ {
var toReturn = new TaskCompletionSource<bool>(); var toReturn = new TaskCompletionSource<bool>();
@ -49,14 +56,26 @@ namespace NadekoBot.Services.Music
{ {
byte[] buffer = new byte[readSize]; byte[] buffer = new byte[readSize];
int bytesRead = 1; int bytesRead = 1;
while (!cancelToken.IsCancellationRequested && !this.p.HasExited && bytesRead > 0) while (!cancelToken.IsCancellationRequested && !this.p.HasExited)
{ {
bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, readSize, cancelToken).ConfigureAwait(false); bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, readSize, cancelToken).ConfigureAwait(false);
await _outStream.WriteAsync(buffer, 0, bytesRead, cancelToken); if (bytesRead == 0)
break;
bool written;
do
{
lock (locker)
written = _outStream.Write(buffer, 0, bytesRead);
if (!written)
await Task.Delay(32, cancelToken);
}
while (!written);
lock (locker)
if (_outStream.Length > 200_000 || bytesRead == 0)
if (toReturn.TrySetResult(true))
_log.Info("Prebuffering finished");
if (_outStream.LightLength > 200_000 || bytesRead == 0) await Task.Delay(5); // @.@
if (toReturn.TrySetResult(true))
_log.Info("Prebuffering finished");
} }
_log.Info("FFMPEG killed, song canceled, or song fully downloaded"); _log.Info("FFMPEG killed, song canceled, or song fully downloaded");
} }
@ -84,9 +103,10 @@ Check the guides for your platform on how to setup ffmpeg correctly:
return toReturn.Task; return toReturn.Task;
} }
public Task<int> ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) public int Read(byte[] b, int offset, int toRead)
{ {
return _outStream.ReadAsync(b, offset, toRead, cancelToken); lock (locker)
return _outStream.Read(b, offset, toRead);
} }
public void Dispose() public void Dispose()
@ -94,6 +114,8 @@ Check the guides for your platform on how to setup ffmpeg correctly:
try { this.p.Kill(); } try { this.p.Kill(); }
catch { } catch { }
_outStream.Dispose(); _outStream.Dispose();
this.p.StandardError.Dispose();
this.p.StandardOutput.Dispose();
this.p.Dispose(); this.p.Dispose();
} }
} }