在我們業(yè)務(wù)操作時(shí),難免會有多次操作,比如:界面搜索框,輸入內(nèi)容時(shí)時(shí)查詢數(shù)據(jù)庫/后臺數(shù)據(jù)。多次觸發(fā)搜索,我們期望什么結(jié)果呢?絕大部分情況,應(yīng)該是只需要最后一次操作的結(jié)果,其它操作應(yīng)該無效。
自定義等待的任務(wù)類
1. 可等待的任務(wù)類 AwaitableTask:
/// <summary>
/// 可等待的任務(wù)
/// </summary>
public class AwaitableTask
{
/// <summary>
/// 獲取任務(wù)是否為不可執(zhí)行狀態(tài)
/// </summary>
public bool NotExecutable { get; private set; }
/// <summary>
/// 設(shè)置任務(wù)不可執(zhí)行
/// </summary>
public void SetNotExecutable()
{
NotExecutable = true;
}
/// <summary>
/// 獲取任務(wù)是否有效
/// 注:對無效任務(wù),可以不做處理。減少并發(fā)操作導(dǎo)致的干擾
/// </summary>
public bool IsInvalid { get; private set; } = true;
/// <summary>
/// 標(biāo)記任務(wù)無效
/// </summary>
public void MarkTaskValid()
{
IsInvalid = false;
}
#region Task
private readonly Task _task;
/// <summary>
/// 初始化可等待的任務(wù)。
/// </summary>
/// <param name="task"></param>
public AwaitableTask(Task task) => _task = task;
/// <summary>
/// 獲取任務(wù)是否已完成
/// </summary>
public bool IsCompleted => _task.IsCompleted;
/// <summary>
/// 任務(wù)的Id
/// </summary>
public int TaskId => _task.Id;
/// <summary>
/// 開始任務(wù)
/// </summary>
public void Start() => _task.Start();
/// <summary>
/// 同步執(zhí)行開始任務(wù)
/// </summary>
public void RunSynchronously() => _task.RunSynchronously();
#endregion
#region TaskAwaiter
/// <summary>
/// 獲取任務(wù)等待器
/// </summary>
/// <returns></returns>
public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
/// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask _task;
/// <summary>
/// 任務(wù)等待器
/// </summary>
/// <param name="awaitableTask"></param>
public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
/// <summary>
/// 任務(wù)是否完成.
/// </summary>
public bool IsCompleted => _task._task.IsCompleted;
/// <inheritdoc />
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
/// <summary>
/// 獲取任務(wù)結(jié)果
/// </summary>
public void GetResult() => _task._task.Wait();
}
#endregion
}
無效的操作可以分為以下倆種:
已經(jīng)進(jìn)行中的操作,后續(xù)結(jié)果應(yīng)標(biāo)記為無效
還沒開始的操作,后續(xù)不執(zhí)行
自定義任務(wù)類型 AwaitableTask中,添加倆個(gè)字段NotExecutable、IsInvalid:
/// <summary>
/// 獲取任務(wù)是否為不可執(zhí)行狀態(tài)
/// </summary>
public bool NotExecutable { get; private set; }
/// <summary>
/// 獲取任務(wù)是否有效
/// 注:對無效任務(wù),可以不做處理。減少并發(fā)操作導(dǎo)致的干擾
/// </summary>
public bool IsInvalid { get; private set; } = true;
2. 有返回結(jié)果的可等待任務(wù)類 AwaitableTask<TResult>:
/// <summary>
/// 可等待的任務(wù)
/// </summary>
/// <typeparam name="TResult"></typeparam>
public class AwaitableTask<TResult> : AwaitableTask
{
private readonly Task<TResult> _task;
/// <summary>
/// 初始化可等待的任務(wù)
/// </summary>
/// <param name="task">需要執(zhí)行的任務(wù)</param>
public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
#region TaskAwaiter
/// <summary>
/// 獲取任務(wù)等待器
/// </summary>
/// <returns></returns>
public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
/// <summary>
/// 任務(wù)等待器
/// </summary>
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public new struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask<TResult> _task;
/// <summary>
/// 初始化任務(wù)等待器
/// </summary>
/// <param name="awaitableTask"></param>
public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
/// <summary>
/// 任務(wù)是否已完成。
/// </summary>
public bool IsCompleted => _task._task.IsCompleted;
/// <inheritdoc />
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
/// <summary>
/// 獲取任務(wù)結(jié)果。
/// </summary>
/// <returns></returns>
public TResult GetResult() => _task._task.Result;
}
#endregion
}
添加任務(wù)等待器,同步等待結(jié)果返回:
/// <summary>
/// 獲取任務(wù)等待器
/// </summary>
/// <returns></returns>
public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
/// <summary>
/// 任務(wù)等待器
/// </summary>
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public new struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask<TResult> _task;
/// <summary>
/// 初始化任務(wù)等待器
/// </summary>
/// <param name="awaitableTask"></param>
public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
/// <summary>
/// 任務(wù)是否已完成。
/// </summary>
public bool IsCompleted => _task._task.IsCompleted;
/// <inheritdoc />
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
/// <summary>
/// 獲取任務(wù)結(jié)果。
/// </summary>
/// <returns></returns>
public TResult GetResult() => _task._task.Result;
}
異步任務(wù)隊(duì)列
/// <summary>
/// 異步任務(wù)隊(duì)列
/// </summary>
public class AsyncTaskQueue : IDisposable
{
/// <summary>
/// 異步任務(wù)隊(duì)列
/// </summary>
public AsyncTaskQueue()
{
_autoResetEvent = new AutoResetEvent(false);
_thread = new Thread(InternalRunning) { IsBackground = true };
_thread.Start();
}
#region 執(zhí)行
/// <summary>
/// 執(zhí)行異步操作
/// </summary>
/// <typeparam name="T">返回結(jié)果類型</typeparam>
/// <param name="func">異步操作</param>
/// <returns>isInvalid:異步操作是否有效;result:異步操作結(jié)果</returns>
public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
{
var task = GetExecutableTask(func);
var result = await await task;
if (!task.IsInvalid)
{
result = default(T);
}
return (task.IsInvalid, result);
}
/// <summary>
/// 執(zhí)行異步操作
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <returns></returns>
public async Task<bool> ExecuteAsync<T>(Func<Task> func)
{
var task = GetExecutableTask(func);
await await task;
return task.IsInvalid;
}
#endregion
#region 添加任務(wù)
/// <summary>
/// 獲取待執(zhí)行任務(wù)
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
private AwaitableTask GetExecutableTask(Action action)
{
var awaitableTask = new AwaitableTask(new Task(action));
AddPenddingTaskToQueue(awaitableTask);
return awaitableTask;
}
/// <summary>
/// 獲取待執(zhí)行任務(wù)
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="function"></param>
/// <returns></returns>
private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)
{
var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));
AddPenddingTaskToQueue(awaitableTask);
return awaitableTask;
}
/// <summary>
/// 添加待執(zhí)行任務(wù)到隊(duì)列
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
private void AddPenddingTaskToQueue(AwaitableTask task)
{
//添加隊(duì)列,加鎖。
lock (_queue)
{
_queue.Enqueue(task);
//開始執(zhí)行任務(wù)
_autoResetEvent.Set();
}
}
#endregion
#region 內(nèi)部運(yùn)行
private void InternalRunning()
{
while (!_isDisposed)
{
if (_queue.Count == 0)
{
//等待后續(xù)任務(wù)
_autoResetEvent.WaitOne();
}
while (TryGetNextTask(out var task))
{
//如已從隊(duì)列中刪除
if (task.NotExecutable) continue;
if (UseSingleThread)
{
task.RunSynchronously();
}
else
{
task.Start();
}
}
}
}
/// <summary>
/// 上一次異步操作
/// </summary>
private AwaitableTask _lastDoingTask;
private bool TryGetNextTask(out AwaitableTask task)
{
task = null;
while (_queue.Count > 0)
{
//獲取并從隊(duì)列中移除任務(wù)
if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
{
//設(shè)置進(jìn)行中的異步操作無效
_lastDoingTask?.MarkTaskValid();
_lastDoingTask = task;
return true;
}
//并發(fā)操作,設(shè)置任務(wù)不可執(zhí)行
task.SetNotExecutable();
}
return false;
}
#endregion
#region dispose
/// <inheritdoc />
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 析構(gòu)任務(wù)隊(duì)列
/// </summary>
~AsyncTaskQueue() => Dispose(false);
private void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
_autoResetEvent.Dispose();
}
_thread = null;
_autoResetEvent = null;
_isDisposed = true;
}
#endregion
#region 屬性及字段
/// <summary>
/// 是否使用單線程完成任務(wù).
/// </summary>
public bool UseSingleThread { get; set; } = true;
/// <summary>
/// 自動(dòng)取消以前的任務(wù)。
/// </summary>
public bool AutoCancelPreviousTask { get; set; } = false;
private bool _isDisposed;
private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
private Thread _thread;
private AutoResetEvent _autoResetEvent;
#endregion
添加異步任務(wù)隊(duì)列類,用于任務(wù)的管理,如添加、執(zhí)行、篩選等:
1. 自動(dòng)取消之前的任務(wù) AutoCancelPreviousTask
內(nèi)部使用線程,循環(huán)獲取當(dāng)前任務(wù)列表,如果當(dāng)前任務(wù)被標(biāo)記NotExecutable不可執(zhí)行,則跳過。
NotExecutable是何時(shí)標(biāo)記的?
獲取任務(wù)時(shí),標(biāo)記所有獲取的任務(wù)為NotExecutable。直到任務(wù)列表中為空,那么只執(zhí)行最后獲取的一個(gè)任務(wù)。
2. 標(biāo)記已經(jīng)進(jìn)行的任務(wù)無效 MarkTaskValid
當(dāng)前進(jìn)行的任務(wù),無法中止,那么標(biāo)記為無效即可。
/// <summary>
/// 上一次異步操作
/// </summary>
private AwaitableTask _lastDoingTask;
private bool TryGetNextTask(out AwaitableTask task)
{
task = null;
while (_queue.Count > 0)
{
//獲取并從隊(duì)列中移除任務(wù)
if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
{
//設(shè)置進(jìn)行中的異步操作無效
_lastDoingTask?.MarkTaskValid();
_lastDoingTask = task;
return true;
}
//并發(fā)操作,設(shè)置任務(wù)不可執(zhí)行
task.SetNotExecutable();
}
return false;
}
后續(xù)執(zhí)行完后,根據(jù)此標(biāo)記,設(shè)置操作結(jié)果為空。
/// <summary>
/// 執(zhí)行異步操作
/// </summary>
/// <typeparam name="T">返回結(jié)果類型</typeparam>
/// <param name="func">異步操作</param>
/// <returns>isInvalid:異步操作是否有效;result:異步操作結(jié)果</returns>
public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
{
var task = GetExecutableTask(func);
var result = await await task;
if (!task.IsInvalid)
{
result = default(T);
}
return (task.IsInvalid, result);
}
實(shí)踐測試
啟動(dòng)9個(gè)并發(fā)任務(wù),測試實(shí)際的任務(wù)隊(duì)列并發(fā)操作管理:
public MainWindow()
{
InitializeComponent();
_asyncTaskQueue = new AsyncTaskQueue
{
AutoCancelPreviousTask = true,
UseSingleThread = true
};
}
private AsyncTaskQueue _asyncTaskQueue;
private void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
// 快速啟動(dòng)9個(gè)任務(wù)
for (var i = 1; i < 10; i++)
{
Test(_asyncTaskQueue, i);
}
}
public static async void Test(AsyncTaskQueue taskQueue, int num)
{
var result = await taskQueue.ExecuteAsync(async () =>
{
Debug.WriteLine("輸入:" + num);
// 長時(shí)間耗時(shí)任務(wù)
await Task.Delay(TimeSpan.FromSeconds(5));
return num * 100;
});
Debug.WriteLine($"{num}輸出的:" + result);
}
測試結(jié)果如下:
一共9次操作,只有最后一次操作結(jié)果,才是有效的。其它8次操作,一次是無效的,7次操作被取消不執(zhí)行。
固定時(shí)間間隔只保留最后操作
實(shí)際業(yè)務(wù)過程中,大部分場景并不是瞬間丟過來N個(gè)任務(wù),而是比如100ms內(nèi)有20個(gè)操作觸發(fā)。
這類高并發(fā)操作,不止是上方做一個(gè)隊(duì)列獲取首尾任務(wù),還需要加個(gè)延時(shí)(時(shí)間間隔),減少并發(fā)操作的執(zhí)行,可以基于上方TryGetNextTask方法操作:
/// <summary>
/// 獲取下一任務(wù)
/// </summary>
/// <returns></returns>
private async Task<AwaitableTask> TryGetNextTask()
{
//獲取并從隊(duì)列中移除任務(wù)
if (_queue.TryDequeue(out var task))
{
//設(shè)置進(jìn)行中的異步操作無效
foreach (var lastDoingTask in _lastDoingTasks)
{
lastDoingTask.MarkTaskInvalid();
}
_lastDoingTasks.Add(task);
//如果有中間任務(wù),則立即取消
if (_queue.Count != 0)
{
task.SetNotExecutable();
}
else
{
//如果沒有中間任務(wù),則設(shè)置延遲時(shí)間再確認(rèn)是否有中間任務(wù)
await Task.Delay(_delay).ConfigureAwait(false);
if (_queue.Count != 0)
{
task.SetNotExecutable();
}
}
return task;
}
設(shè)置延時(shí)取任務(wù)后,就能實(shí)現(xiàn)在固定的時(shí)間內(nèi)最多只會觸發(fā)一次,并且是最后一次操作。
作者:唐宋元明清2188
出處:http://www.cnblogs.com/kybs0/
本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須在文章頁面給出原文連接,否則保留追究法律責(zé)任的權(quán)利。
該文章在 2024/9/5 11:36:52 編輯過