什么是 long-running thread
long-running task 是指那些長(zhǎng)時(shí)間運(yùn)行的任務(wù),比如在一個(gè) while True 中執(zhí)行耗時(shí)較長(zhǎng)的同步處理。
下面的例子中,我們不斷從隊(duì)列中嘗試取出數(shù)據(jù),并對(duì)這些數(shù)據(jù)進(jìn)行處理,這樣的任務(wù)就適合交給一個(gè) long-running task 來(lái)處理。
var queue = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine($"You entered: {input}");
}
}, TaskCreationOptions.LongRunning);
while (true)
{
var input = Console.ReadLine();
queue.Add(input);
}
在 .NET 中,我們可以使用 Task.Factory.StartNew 方法并傳入 TaskCreationOptions.LongRunning 來(lái)創(chuàng)建一個(gè) long-running task。
雖然這種方式創(chuàng)建的 long-running task 和默認(rèn)創(chuàng)建的 task 一樣,都是分配給 ThreadPoolTaskScheduler 來(lái)調(diào)度的, 但 long-running task 會(huì)被分配到一個(gè)新的 Background 線(xiàn)程上執(zhí)行,而不是交給 ThreadPool 中的線(xiàn)程來(lái)執(zhí)行。
class ThreadPoolTaskScheduler : TaskScheduler
{
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
}
為什么long-running task要和普通的task分開(kāi)調(diào)度
如果一個(gè)task持續(xù)占用一個(gè)線(xiàn)程,那么這個(gè)線(xiàn)程就不能被其他的task使用,這和 ThreadPool 的設(shè)計(jì)初衷是相違背的。
如果在 ThreadPool 中創(chuàng)建了大量的 long-running task,那么就會(huì)導(dǎo)致
ThreadPool 中的線(xiàn)程不夠用,從而影響到其他的 task 的執(zhí)行。
在 long-running task await 一個(gè) async 方法后會(huì)發(fā)生什么
有時(shí)候,我們需要在 long-running task 中調(diào)用一個(gè) async 方法。比如下面的例子中,我們需要在 long-running task 中調(diào)用一個(gè) async
的方法來(lái)處理數(shù)據(jù)。
var queue = new BlockingCollection<string>();
Task.Factory.StartNew(async () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
await ProcessAsync(input);
Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, TaskCreationOptions.LongRunning);
async Task ProcessAsync(string input)
{
await Task.Delay(100);
Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
while (true)
{
var input = Console.ReadLine();
queue.Add(input);
}
TaskScheduler InternalCurrentTaskScheduler()
{
var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
return (TaskScheduler)propertyInfo.GetValue(null);
}
連續(xù)輸入 1、2、3、4,輸出如下:
1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
2
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 2, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
3
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 3, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
4
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 4, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
從執(zhí)行結(jié)果中可以看出,第一次 await 之前,當(dāng)前線(xiàn)程是 long-running task 所在的線(xiàn)程(thread id: 9),此后就變成了 ThreadPool
中的線(xiàn)程(thread id: 4)。
至于為什么之后一直是 ThreadPool 中的線(xiàn)程(thread id: 4),這邊做一下簡(jiǎn)單的解釋。在我以前一篇介紹 await 的文章中介紹了 await 的執(zhí)行過(guò)程,以及 await 之后的代碼會(huì)在哪個(gè)線(xiàn)程上執(zhí)行。

?
https://www.cnblogs.com/eventhorizon/p/15912383.html
- 第一次 await 前,當(dāng)前線(xiàn)程是 long-running task 所在的線(xiàn)程(thread id: 9),綁定了 TaskScheduler(ThreadPoolTaskScheduler),也就是說(shuō) await 之后的代碼會(huì)被調(diào)度到 ThreadPool 中執(zhí)行。
- 第一次 await 之后的代碼被調(diào)度到 ThreadPool 中的線(xiàn)程(thread id: 4)上執(zhí)行。
- ThreadPool 中的線(xiàn)程不會(huì)綁定 TaskScheduler,也就意味著之后的代碼還是會(huì)在 ThreadPool 中的線(xiàn)程上執(zhí)行,并且是本地隊(duì)列優(yōu)先,所以一直是 thread id: 4 這個(gè)線(xiàn)程在從本地隊(duì)列中取出任務(wù)在執(zhí)行。
線(xiàn)程池的介紹請(qǐng)參考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/15316955.html
回到本文的主題,如果在 long-running task 使用了 await 調(diào)用一個(gè) async 方法,就會(huì)導(dǎo)致為 long-running task 分配的獨(dú)立線(xiàn)程提前退出,和我們的預(yù)期不符。
long-running task 中 調(diào)用 一個(gè) async 方法的可能姿勢(shì)
使用 Task.Wait
在 long-running task 中調(diào)用一個(gè) async 方法,可以使用 Task.Wait 來(lái)阻塞當(dāng)前線(xiàn)程,直到 async 方法執(zhí)行完畢。
對(duì)于 Task.Factory.StartNew 創(chuàng)建出來(lái)的 long-running task 來(lái)說(shuō),因?yàn)槠浣壎?ThreadPoolTaskScheduler,就算是使用 Task.Wait
阻塞了當(dāng)前線(xiàn)程,也不會(huì)導(dǎo)致死鎖。
并且 Task.Wait 會(huì)把異常拋出來(lái),所以我們可以在 catch 中處理異常。
Task.Factory.StartNew( () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
ProcessAsync(input).Wait();
Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}");
}
}, TaskCreationOptions.LongRunning);
輸出如下:
1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 2, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 3, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 4, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
Task.Wait 并不會(huì)對(duì) async 方法內(nèi)部產(chǎn)生影響,所以 async 方法內(nèi)部的代碼還是按照正常的邏輯執(zhí)行。這邊 ProcessAsync 方法內(nèi)部打印的
thread id 沒(méi)變純粹是因?yàn)?ThreadPool 目前就只創(chuàng)建了一個(gè)線(xiàn)程,你可以瘋狂輸入看看結(jié)果。
關(guān)于 Task.Wait 的使用,可以參考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/17481757.html
使用自定義的 TaskScheduler 來(lái)創(chuàng)建 long-running task
Task.Factory.StartNew(async () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine(
$"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
await ProcessAsync(input);
Console.WriteLine(
$"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
class CustomerTaskScheduler : TaskScheduler
{
private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
public CustomerTaskScheduler()
{
var thread = new Thread(() =>
{
foreach (var task in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
})
{
IsBackground = true
};
thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}
輸出如下:
1
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
因?yàn)樾薷牧松舷挛慕壎ǖ?TaskScheduler,會(huì)影響到 async 方法內(nèi)部 await 回調(diào)的執(zhí)行。
這種做法不推薦使用,因?yàn)榭赡軙?huì)導(dǎo)致死鎖。
如果我將 await 改成 Task.Wait,就會(huì)導(dǎo)致死鎖。
Task.Factory.StartNew(() =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine(
$"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
ProcessAsync(input).Wait();
Console.WriteLine(
$"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
輸出如下:
1
Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False
后面就沒(méi)有輸出了,因?yàn)樗梨i了,除非我們?cè)?ProcessAsync 方法內(nèi)部每個(gè) await 的 Task 后加上ConfigureAwait(false)。
同理,同學(xué)們也可以嘗試用 SynchronizationContext 來(lái)實(shí)現(xiàn)類(lèi)似的效果,同樣有死鎖的風(fēng)險(xiǎn)。
總結(jié)
如果你想要在一個(gè) long-running task 中執(zhí)行 async 方法,使用 await 關(guān)鍵字會(huì)導(dǎo)致 long-running task 的獨(dú)立線(xiàn)程提前退出。
比較推薦的做法是使用 Task.Wait。如果連續(xù)執(zhí)行多個(gè) async 方法,建議將這些 async 方法封裝成一個(gè)新方法,然后只 Wait 這個(gè)新方法的 Task。
轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/17497359.html