Poopy buffer is back ^_^ Music lag fixes...

This commit is contained in:
Master Kwoth 2017-07-01 17:16:03 +02:00
parent f8ad6dda50
commit 9889baf8bd
5 changed files with 277 additions and 60 deletions

View File

@ -0,0 +1,145 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace NadekoBot.DataStructures
{
public class PoopyRingBuffer : IDisposable
{
// readpos == writepos means empty
// writepos == readpos - 1 means full
private readonly byte[] buffer;
private readonly object posLock = new object();
public int Capacity { get; }
private volatile int _readPos = 0;
private int ReadPos
{
get => _readPos;
set { lock (posLock) _readPos = value; }
}
private volatile int _writePos = 0;
private int WritePos
{
get => _writePos;
set { lock (posLock) _writePos = value; }
}
private int Length
{
get
{
lock (posLock)
{
return ReadPos <= WritePos ?
WritePos - ReadPos :
Capacity - (ReadPos - WritePos);
}
}
}
public int RemainingCapacity
{
get { lock (posLock) return Capacity - Length - 1; }
}
private readonly SemaphoreSlim _locker = new SemaphoreSlim(1, 1);
public PoopyRingBuffer(int capacity = 38400)
{
this.Capacity = capacity + 1;
this.buffer = new byte[this.Capacity];
}
public Task<int> ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken) => Task.Run(async () =>
{
await _locker.WaitAsync(cancelToken);
try
{
Console.WriteLine("Reading {0}", toRead);
if (WritePos == ReadPos)
return 0;
if (toRead > Length)
toRead = Length;
if (WritePos > ReadPos)
{
Buffer.BlockCopy(buffer, ReadPos, b, offset, toRead);
ReadPos += toRead;
}
else
{
var toEnd = Capacity - ReadPos;
var firstRead = toRead > toEnd ?
toEnd :
toRead;
Buffer.BlockCopy(buffer, ReadPos, b, offset, firstRead);
ReadPos += firstRead;
var secondRead = toRead - firstRead;
if (secondRead > 0)
{
Buffer.BlockCopy(buffer, 0, b, offset + firstRead, secondRead);
ReadPos = secondRead;
}
}
Console.WriteLine("Readpos: {0} WritePos: {1}", ReadPos, WritePos);
return toRead;
}
finally
{
_locker.Release();
}
});
public Task WriteAsync(byte[] b, int offset, int toWrite, CancellationToken cancelToken) => Task.Run(async () =>
{
while (toWrite > RemainingCapacity)
await Task.Delay(100, cancelToken);
await _locker.WaitAsync(cancelToken);
try
{
Console.WriteLine("Writing {0}", toWrite);
if (WritePos < ReadPos)
{
Buffer.BlockCopy(b, offset, buffer, WritePos, toWrite);
WritePos += toWrite;
}
else
{
var toEnd = Capacity - WritePos;
var firstWrite = toWrite > toEnd ?
toEnd :
toWrite;
Buffer.BlockCopy(b, offset, buffer, WritePos, firstWrite);
var secondWrite = toWrite - firstWrite;
if (secondWrite > 0)
{
Buffer.BlockCopy(b, offset + firstWrite, buffer, 0, secondWrite);
WritePos = secondWrite;
}
else
{
WritePos += firstWrite;
if (WritePos == Capacity)
WritePos = 0;
}
}
Console.WriteLine("Readpos: {0} WritePos: {1}", ReadPos, WritePos);
return toWrite;
}
finally
{
_locker.Release();
}
});
public void Dispose()
{
}
}
}

View File

@ -451,6 +451,7 @@ namespace NadekoBot.Modules.Music
{
try
{
await Task.Yield();
//todo fix for all
if (item.ProviderType == MusicType.Normal)
await Task.WhenAll(Task.Delay(1000), InternalQueue(mp, await _music.ResolveSong(item.Query, Context.User.ToString(), item.ProviderType), true)).ConfigureAwait(false);

View File

@ -81,59 +81,58 @@ namespace NadekoBot.Services.Music
_log.Info("Starting");
var p = Process.Start(new ProcessStartInfo
using (var b = new SongBuffer(data.Song.Uri, ""))
{
FileName = "ffmpeg",
Arguments = $"-i {data.Song.Uri} -f s16le -ar 48000 -vn -ac 2 pipe:1 -loglevel quiet",
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = false,
CreateNoWindow = true,
});
var ac = await GetAudioClient();
if (ac == null)
{
await Task.Delay(900);
// just wait some time, maybe bot doesn't even have perms to join that voice channel,
// i don't want to spam connection attempts
continue;
}
var pcm = ac.CreatePCMStream(AudioApplication.Music);
var bufferSuccess = await b.StartBuffering(cancelToken);
OnStarted?.Invoke(this, data.Song);
if (bufferSuccess == false)
continue;
byte[] buffer = new byte[3840];
int bytesRead = 0;
try
{
while ((bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0)
var ac = await GetAudioClient();
if (ac == null)
{
var vol = Volume;
if (vol != 1)
AdjustVolume(buffer, vol);
await pcm.WriteAsync(buffer, 0, bytesRead, cancelToken);
await (pauseTaskSource?.Task ?? Task.CompletedTask);
await Task.Delay(900);
// just wait some time, maybe bot doesn't even have perms to join that voice channel,
// i don't want to spam connection attempts
continue;
}
}
catch (OperationCanceledException)
{
_log.Info("Song Canceled");
}
catch (Exception ex)
{
_log.Warn(ex);
}
finally
{
//flush is known to get stuck from time to time, just cancel it if it takes more than 1 second
var flushCancel = new CancellationTokenSource();
var flushToken = flushCancel.Token;
var flushDelay = Task.Delay(1000, flushToken);
await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken));
flushCancel.Cancel();
var pcm = ac.CreatePCMStream(AudioApplication.Music);
OnCompleted?.Invoke(this, data.Song);
OnStarted?.Invoke(this, data.Song);
byte[] buffer = new byte[3840];
int bytesRead = 0;
try
{
while ((bytesRead = await b.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false)) > 0)
{
var vol = Volume;
if (vol != 1)
AdjustVolume(buffer, vol);
await Task.WhenAll(Task.Delay(10), pcm.WriteAsync(buffer, 0, bytesRead, cancelToken)).ConfigureAwait(false);
await (pauseTaskSource?.Task ?? Task.CompletedTask);
}
}
catch (OperationCanceledException)
{
_log.Info("Song Canceled");
}
catch (Exception ex)
{
_log.Warn(ex);
}
finally
{
//flush is known to get stuck from time to time, just cancel it if it takes more than 1 second
var flushCancel = new CancellationTokenSource();
var flushToken = flushCancel.Token;
var flushDelay = Task.Delay(1000, flushToken);
await Task.WhenAny(flushDelay, pcm.FlushAsync(flushToken));
flushCancel.Cancel();
OnCompleted?.Invoke(this, data.Song);
}
}
}
finally
@ -141,7 +140,7 @@ namespace NadekoBot.Services.Music
_log.Info("Next song");
do
{
await Task.Delay(100);
await Task.Delay(500);
}
while (Stopped && !Exited);
if(!RepeatCurrentSong)
@ -158,6 +157,14 @@ namespace NadekoBot.Services.Music
reconnect)
try
{
try
{
await _audioClient?.StopAsync();
_audioClient?.Dispose();
}
catch
{
}
_audioClient = await VoiceChannel.ConnectAsync();
}
catch

View File

@ -47,11 +47,6 @@ namespace NadekoBot.Services.Music
Directory.CreateDirectory(MusicDataPath);
}
// public MusicPlayer GetPlayer(ulong guildId)
// {
// MusicPlayers.TryGetValue(guildId, out var player);
// return player;
// }
public float GetDefaultVolume(ulong guildId)
{
return _defaultVolumes.GetOrAdd(guildId, (id) =>

View File

@ -1,10 +1,79 @@
//using NadekoBot.Extensions;
//using NLog;
//using System;
//using System.Diagnostics;
//using System.IO;
//using System.Threading;
//using System.Threading.Tasks;
using NadekoBot.DataStructures;
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace NadekoBot.Services.Music
{
public class SongBuffer : IDisposable
{
const int maxReadSize = 3840;
private Process p;
private PoopyRingBuffer _outStream = new PoopyRingBuffer();
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
public string SongUri { get; private set; }
public SongBuffer(string songUri, string skipTo)
{
this.SongUri = songUri;
this.p = Process.Start(new ProcessStartInfo
{
FileName = "ffmpeg",
Arguments = $"-i {songUri} -f s16le -ar 48000 -vn -ac 2 pipe:1 -loglevel quiet",
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = false,
CreateNoWindow = true,
});
}
public Task<bool> StartBuffering(CancellationToken cancelToken)
{
var toReturn = new TaskCompletionSource<bool>();
var _ = Task.Run(async () =>
{
try
{
byte[] buffer = new byte[3840];
while (!this.p.HasExited || cancelToken.IsCancellationRequested)
{
int bytesRead = await p.StandardOutput.BaseStream.ReadAsync(buffer, 0, buffer.Length, cancelToken).ConfigureAwait(false);
await _outStream.WriteAsync(buffer, 0, bytesRead, cancelToken);
if (_outStream.RemainingCapacity < _outStream.Capacity * 0.9f)
toReturn.TrySetResult(true);
}
}
catch
{
toReturn.TrySetResult(false);
//ignored
}
}, cancelToken);
return toReturn.Task;
}
public Task<int> ReadAsync(byte[] b, int offset, int toRead, CancellationToken cancelToken)
{
return _outStream.ReadAsync(b, offset, toRead, cancelToken);
}
public void Dispose()
{
try { this.p.Kill(); }
catch { }
_outStream.Dispose();
this.p.Dispose();
}
}
}
//namespace NadekoBot.Services.Music
//{