本系列會(huì)直接引用前一篇博客概述 .NET 6 ThreadPool 實(shí)現(xiàn) 里的結(jié)論,所以請(qǐng)沒看過的同學(xué)先麻煩看下。
文中所有例子均出于解釋目的,并非具有實(shí)際意義的代碼。有返回值的 Task 和無返回值的 Task 實(shí)際區(qū)別不是很大,下文大多數(shù)舉例不做特別區(qū)分。不糾結(jié) api 的使用細(xì)節(jié),只講 Task 的整體設(shè)計(jì)思路。
代碼運(yùn)行截圖是在 .NET 6 中的,其他版本的設(shè)計(jì)沒有大的改動(dòng),不影響學(xué)習(xí)。
筆者解讀并非權(quán)威解讀,只是希望能給大家一個(gè)理解 Task 的方法。
從表象講起
Task 從何而來
以下僅做典型舉例,并非全部
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});
- Task.FromResult 等直接創(chuàng)建一個(gè)已完成的 Task
Task.FromResult("Hello World!");
var task = Task.CompletedTask;
- 某個(gè)不知道其內(nèi)部實(shí)現(xiàn)的 async 方法
async Task<Bar> FooAsync();
Task 常見用法
- 注冊(cè)一個(gè)回調(diào),等待 Task 執(zhí)行完成時(shí)獲取結(jié)果并執(zhí)行回調(diào)
var task = Task.Run<string>(() => "Hello World!");
task.ContinueWith(t => Console.WriteLine(t.Result));
- await 一個(gè) Task 并得到結(jié)果
var task = Task.Run<string>(() => "Hello World!");
var result = await task;
Console.WriteLine(result);
var task = Task.Run<string>(() => "Hello World!");
// 等效于 task.Result
var result = task.GetAwaiter().GetResult();
Console.WriteLine(result);
Task 的分類
按是否包含 Result 分,也就是是否是泛型 Task
按得到 Task 的方式,可以分為
- 我知道這個(gè) Task 是怎么來的,這種情況下,我們自己參與了 Task 的創(chuàng)建過程,知道這個(gè) Task 是在干啥。比如:
Task task = Task.Run<int>(() => 1 + 2);
計(jì)算 1 + 2,并將結(jié)果作為 Task 的結(jié)果。
Task task = new HttpClient().GetStringAsync("http://localhost:5000/api/values");
而這兩種獲取方式的不同對(duì)應(yīng)的是兩種完全不同的側(cè)重點(diǎn):
- Task 是一個(gè)白盒,關(guān)注 Task 里干了什么,在哪執(zhí)行里面這些代碼。
- Task 是一個(gè)黑盒,關(guān)注 Task 能給到我什么,Task 完成執(zhí)行之后,我該干什么。
對(duì) Task 進(jìn)行分解
按功能點(diǎn)可以將 Task 分為三個(gè)部分
- 任務(wù)執(zhí)行:通過 Task.Run 等方式執(zhí)行一段我們自定義的邏輯。
- 回調(diào)通知及回調(diào)執(zhí)行:注冊(cè)一個(gè)回調(diào),等待 Task 完成時(shí)執(zhí)行。
- await 語法支持:脫離了 await,task 的上述兩個(gè)功能依舊可以完整執(zhí)行。但卻會(huì)喪失代碼的簡潔性。
Task 在哪執(zhí)行?
線程池
Task 可以作為 ThreadPool 隊(duì)列系統(tǒng)的基本單元被 ThreadPool 調(diào)度執(zhí)行。
下面這些常見的創(chuàng)建 Task 的方式,默認(rèn)情況都是在 ThreadPool 中被調(diào)度執(zhí)行的,這幾個(gè)本質(zhì)上是一樣的,只是使用方式上和可支持傳入的自定義選項(xiàng)上的區(qū)別。
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
// 可以看做簡化版的 TaskFactory.StartNew
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});
以 Task.Run
為例來看下里面到底做了些什么。
在 PortableThreadPool.TryCreateWorkerThread
和實(shí)際要要執(zhí)行的 lambda 表達(dá)式中打上斷點(diǎn),我們便可以清晰的看到整個(gè)執(zhí)行過程。


整理一下的話,主要就是這個(gè)樣子,為簡化理解,ThreadPool 中的調(diào)用細(xì)節(jié)已省略。

Task 關(guān)鍵代碼摘錄:
class Task
{
// 任務(wù)的主體,我們要執(zhí)行的實(shí)際邏輯
// 可能有返回值,可能沒有
internal Delegate m_action;
// 任務(wù)的狀態(tài)
internal volatile int m_stateFlags;
// ThreadPool 調(diào)用入口,由于 JIT 的內(nèi)聯(lián)優(yōu)化,調(diào)用棧里只能看到 ExecuteEntryUnsafe,看不到這個(gè)方法
internal virtual void ExecuteFromThreadPool(Thread threadPoolThread) => ExecuteEntryUnsafe(threadPoolThread);
internal void ExecuteEntryUnsafe(Thread? threadPoolThread)
{
// 設(shè)置 Task 狀態(tài)為已經(jīng)執(zhí)行
m_stateFlags |= (int)TaskStateFlags.DelegateInvoked;
if (!IsCancellationRequested & !IsCanceled)
{
ExecuteWithThreadLocal(ref t_currentTask, threadPoolThread);
}
else
{
ExecuteEntryCancellationRequestedOrCanceled();
}
}
// 創(chuàng)建 Task 的時(shí)候可傳入的數(shù)據(jù),用于執(zhí)行時(shí)使用
// new Task(state => Console.WriteLine(state), "Hello World").Start();
internal object? m_stateObject;
private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
{
// 執(zhí)行上下文維護(hù)著代碼執(zhí)行邏輯上下文的一些數(shù)據(jù),如 AsyncLocal
// 具體請(qǐng)看我的 AsyncLocal 博客 https://www.cnblogs.com/eventhorizon/p/12240767.html
ExecutionContext? ec = CapturedContext;
if (ec == null)
{
// 沒有執(zhí)行上下文,直接執(zhí)行
InnerInvoke();
}
else
{
// 是否是在 ThreadPool 線程上執(zhí)行
if (threadPoolThread is null)
{
ExecutionContext.RunInternal(ec, s_ecCallback, this);
}
else
{
ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, ec, s_ecCallback, this);
}
}
}
// 不管 ExecuteWithThreadLocal 分支如何,最后會(huì)走到 InnerInvoke
internal virtual void InnerInvoke()
{
if (m_action is Action action)
{
action();
return;
}
if (m_action is Action<object?> actionWithState)
{
actionWithState(m_stateObject);
}
}
}
可以看到 Task 以 ThreadPoolTaskScheduler 為媒介,進(jìn)入了 ThreadPool。ThreadPool 調(diào)用 Task.ExecuteFromThreadPool 方法最終觸發(fā) Task 所封裝的 action 的執(zhí)行。
與 ThreadPool 中另一種基本單元 IThreadPoolWorkItem 一樣,Task 在進(jìn)入 ThreadPoolWorkQueue 時(shí)會(huì)有兩種可能,進(jìn)入全局隊(duì)列或者本地隊(duì)列。
理解這個(gè)問題,我們需要看一下 ThreadPoolTaskScheduler.QueueTask 里做了些什么。
internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
// 創(chuàng)建獨(dú)立線程,和線程池?zé)o關(guān)
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
// 第二個(gè)參數(shù)是 preferLocal
// options & TaskCreationOptions.PreferFairness 這個(gè)位標(biāo)志的枚舉用法可查看官方資料
// https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/builtin-types/enum#enumeration-types-as-bit-flags
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
}
上面代碼里的 TaskCreationOptions 是我們?cè)趧?chuàng)建 Task 的時(shí)候可以指定的一個(gè)選項(xiàng),默認(rèn)是 None。
Task.Run 不支持傳入該選項(xiàng),可使用 TaskFactory.StartNew
的重載進(jìn)行指定:
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
}, TaskCreationOptions.PreferFairness);
根據(jù) TaskCreationOptions 的不同,出現(xiàn)了三個(gè)分支
- LongRunning:獨(dú)立線程,和線程池?zé)o關(guān)
- 包含 PreferFairness時(shí):preferLocal=false,進(jìn)入全局隊(duì)列
- 不包含 PreferFairness時(shí):preferLocal=ture,進(jìn)入本地隊(duì)列
進(jìn)入全局隊(duì)列的任務(wù)能夠公平地被各個(gè)線程池中的線程領(lǐng)取執(zhí)行,也是就是 prefer fairness
這個(gè)詞組的字面意思了。
下圖中 Task666 先進(jìn)入全局隊(duì)列,隨后被 Thread1 領(lǐng)走。Thread3 通過 WorkStealing 機(jī)制竊取了 Thread2 中的 Task2。

一個(gè)獨(dú)立的后臺(tái)線程中
也就是上文提到的創(chuàng)建 Task 時(shí)使用 TaskCreationOptions.LongRunning
,如果你需要一個(gè)執(zhí)行一個(gè)長時(shí)間的任務(wù),比如一段耗時(shí)很久的同步代碼,就可以使用這個(gè)。執(zhí)行異步代碼(指 await xxx)時(shí)不推薦使用,后面會(huì)講原因。
new TaskFactory().StartNew(() =>
{
// 耗時(shí)較長的同步代碼
}, TaskCreationOptions.LongRunning);
ThreadPool 管理的線程是出于可復(fù)用的目的設(shè)計(jì)的,不停地從隊(duì)列系統(tǒng)中領(lǐng)取任務(wù)執(zhí)行。如果一個(gè) WorkThread 阻塞在一個(gè)耗時(shí)較長的任務(wù)上,它就沒辦法處理其他任務(wù),ThreadPool 的吞吐率會(huì)受影響。
當(dāng)然并不意味著 ThreadPool 不能處理這樣的任務(wù)。舉個(gè)極端的例子,如果線程池目前的 WorkThread 全在處理 LongRunning Task。在 Starvation Avoidance 機(jī)制(每隔500ms)創(chuàng)建新的 WorkThread 之前,ThreadPool 沒法執(zhí)行新的任務(wù)。
LongRunning 的 Task 生命周期與 ThreadPool 設(shè)計(jì)目的不符合,因此需獨(dú)立開來。
自定義的TaskScheduler里
除了 ThreadPoolTaskScheduler
外,我們還可以定義自己的 TaskScheduler
。
首先需要繼承 TaskScheduler
這個(gè)抽象類,有三個(gè)抽象方法需要我們實(shí)現(xiàn)。
public abstract class TaskScheduler
{
// 入口,待調(diào)度執(zhí)行的 Task 會(huì)通過該方法傳入
protected internal abstract void QueueTask(Task task);
// 這個(gè)是在執(zhí)行 Task 回調(diào)的時(shí)候才會(huì)被執(zhí)行到的方法,放到后面再講
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
// 獲取所有調(diào)度到該 TaskScheduler 的 Task
protected abstract IEnumerable<Task>? GetScheduledTasks();
}
在我們自定義的 TaskScheduler 里,在 QueueTask 被執(zhí)行時(shí)會(huì)拿到 Task,但是 Task 要怎么去觸發(fā)里面的 action 呢。
Task 針對(duì) ThreadPool 的調(diào)用場(chǎng)景暴露了一個(gè) ExecuteFromThreadPool 的 internal 方法,同時(shí)也提供了一個(gè) ExecuteEntry 方法供其他場(chǎng)景調(diào)用,但是這個(gè)方法也是 internal 的。只能通過 TaskScheduler 的 protect 方法進(jìn)行間接調(diào)用。
public abstract class TaskScheduler
{
protected bool TryExecuteTask(Task task)
{
if (task.ExecutingTaskScheduler != this)
{
throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler);
}
return task.ExecuteEntry();
}
}
下面是一個(gè)自定義的 TaskScheduler,在一個(gè)固定的線程上順序執(zhí)行 Task。
```C#
class CustomTaskScheduler : TaskScheduler
{
private readonly BlockingCollection<Task> _queue = new();
public CustomTaskScheduler()
{
new Thread(() =>
{
while (true)
{
var task = _queue.Take();
Console.WriteLine($"task {task.Id} is going to be executed");
TryExecuteTask(task);
Console.WriteLine($"task {task.Id} has been executed");
}
})
{
IsBackground = true
}.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _queue.ToArray();
}
protected override void QueueTask(Task task)
{
_queue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}
在 TaskFactory 的構(gòu)造函數(shù)中可以傳入我們自定義的 TaskScheduler
var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
輸出結(jié)果如下:
task 1 is going to be executed
task 1 threadId: 10
task 1 has been executed
task 2 is going to be executed
task 2 threadId: 10
task 2 has been executed
所有的 Task 都會(huì)在一個(gè)線程里被調(diào)度執(zhí)行。
Task 可以封裝任何類型的別的任務(wù)
上面兩種情況,Task 都存在明確的執(zhí)行實(shí)體,但有時(shí)候,可能是沒有的。看下面這樣的例子。
var task = FooAsync();
var action = typeof(Task).GetField("m_action", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(task);
Console.WriteLine($"Task action is null: {action == null}");
task.ContinueWith(t => Console.WriteLine(t.Result));
// 回調(diào)可以注冊(cè)多個(gè)
task.ContinueWith(t => Console.WriteLine(t.Result));
Task<string> FooAsync()
{
var tsc = new TaskCompletionSource<string>();
new Thread(() =>
{
Thread.Sleep(1000);
tsc.SetResult("Hello World");
})
{
IsBackground = true
}.Start();
return tsc.Task;
}
輸出:
Task action is null: True
Hello World
Hello World
從 FooAsync 外部和內(nèi)部兩個(gè)角度來看這個(gè)問題
- FooAsync 外:拿到了一個(gè) Task 并注冊(cè)了回調(diào)
- FooAsync 內(nèi):相當(dāng)于間接的持有了這個(gè)回調(diào),并通過 tsc.SetResult 間接地調(diào)用了這個(gè)回調(diào)。
下面是關(guān)鍵代碼的摘錄
class Task
{
// 保存一個(gè)或一組回調(diào)
private volatile object? m_continuationObject;
internal void FinishContinuations()
{
// 處理回調(diào)的執(zhí)行
}
}
class Task<T> : Task
{
internal bool TrySetResult(TResult result)
{
// ...
this.m_result = result;
// 復(fù)用父類的邏輯
FinishContinuations();
// ...
}
}
public class TaskCompletionSource<TResult>
{
public TaskCompletionSource() => _task = new Task<TResult>();
public Task<TResult> Task => _task;
public void SetResult(TResult result)
{
TrySetResult(result);
}
public bool TrySetResult(TResult result)
{
_task.TrySetResult(result);
// ...
}
}
有時(shí)候 Task.TrySetResult() 的觸發(fā)源可能是一個(gè)異步IO完成事件導(dǎo)致的,也就是我們常說的異步IO,硬件有自己的處理芯片,在異步IO完成通知CPU(硬件中斷 hardware interrupt)之前,CPU并不需要參與,這也是異步IO的價(jià)值所在。
小結(jié)
Task 是個(gè)已經(jīng)完成或者將在未來某個(gè)時(shí)間點(diǎn)完成的任務(wù),可以向其注冊(cè)一個(gè)回調(diào)等待任務(wù)完成時(shí)被執(zhí)行。
轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/15824541.html
該文章在 2025/8/8 9:51:08 編輯過