Compare commits
3 Commits
write-asyn
...
master
Author | SHA1 | Date | |
---|---|---|---|
309db7e5f1 | |||
fb71ce64cf | |||
d87e3830a5 |
|
@ -1,884 +0,0 @@
|
|||
---
|
||||
title: async/await究竟是如何工作的?
|
||||
tags:
|
||||
- dotnet
|
||||
- 技术笔记
|
||||
- 译文
|
||||
---
|
||||
|
||||
### 译者按
|
||||
|
||||
如何正确而快速的编写异步运行的代码一直是软件工程界的难题,而C#提出的`async/await`范式无疑是探索道路上的先行者。本篇文章便是翻译自.NET开发者博客上一篇名为“How async/await really works in C#”的文章,希望能够让读者在阅读之后明白`async/await`编程范式的前世今生和`.NET`实现方式。另外,.Net开发者中文博客也翻译了[这篇文章](https://devblogs.microsoft.com/dotnet-ch/async-await%e5%9c%a8-c%e8%af%ad%e8%a8%80%e4%b8%ad%e6%98%af%e5%a6%82%e4%bd%95%e5%b7%a5%e4%bd%9c%e7%9a%84/),一并供读者参考。
|
||||
|
||||
---
|
||||
|
||||
数周前,[.NET开发者博客](https://devblogs.microsoft.com/dotnet/)发布了一篇题为[什么是.NET,为什么你应该选择.NET](https://devblogs.microsoft.com/dotnet/why-dotnet/)的文章。文章中从宏观上概览了整个`dotnet`生态系统,总结了系统中的各个部分和其中的设计决定;文章还承诺在未来推出一系列的深度文章介绍涉及到的方方面面。这篇文章便是这系列文章中的第一篇,深入介绍C#和.NET中`async/await`的历史、设计决定和实现细节。
|
||||
|
||||
对于`async/await`的支持大约在十年前就提供了。在这段时间里,`async/await`语法大幅改变了编写可扩展.NET代码的方式,同时该语法使得在不了解`async/await`工作原理的情况下使用它提供的功能编写异步代码也是十分容易和常见的。以下面的**同步**方法为例:(因为这个方法的调用者在整个操作完成之前、将控制权返回给它之前都不能进行任何操作,所以这个方法被称为**同步**)
|
||||
|
||||
```csharp
|
||||
// 将数据同步地从源复制到目的地
|
||||
public void CopyStreamToStream(Stream source, Stream destination)
|
||||
{
|
||||
var buffer = new byte[0x1000];
|
||||
int numRead;
|
||||
while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
|
||||
{
|
||||
destination.Write(buffer, 0, numRead);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
在这个方法的基础上,你只需要修改几个关键词、改变几个方法的名称,就可以得到一个**异步**的方法(因为这个方法将很快,往往实在所有的工作完成之前,就会将控制权返回给它的调用者,所以被称作异步方法)。
|
||||
|
||||
```csharp
|
||||
// 将数据异步地从源复制到目的地
|
||||
public async Task CopyStreamToStreamAsync(Stream source, Stream destination)
|
||||
{
|
||||
var buffer = new byte[0x1000];
|
||||
int numRead;
|
||||
while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0)
|
||||
{
|
||||
await destination.WriteAsync(buffer, 0, numRead);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
有着几乎相同的语法,类似的控制流结构,但是现在这个方法在执行过程中不会阻塞,有着完全不同的底层执行模型,而且C#编译器和核心库帮你完成所有这些复杂的工作。
|
||||
|
||||
尽管在不了解底层原理的基础上使用这类技术是十分普遍的,但是我们坚持认为了解这些事务的运行原理将会帮助我们更好的利用它们。之于`async/await`,了解这些原理将在你需要深入探究时十分有用,例如当你需要调试一段错误的代码或者优化某段正确运行代码的运行效率时。在这篇文章中,我们将深入了解`async/await`具体如何在语言、编译器和库层面运行,然后你将更好地利用这些优秀的设计。
|
||||
|
||||
为了更好的理解这一切,我们将回到没有`async/await`的时代,看看在没有它们的情况下最优秀的异步代码是如何编写的。平心而论,这些代码看上去并不好。
|
||||
|
||||
### 原初的历史
|
||||
|
||||
回到.NET框架1.0时代,当时流行的异步编程范式是**异步编程模型**,“Asynchronous Programming Model”,也被称作`APM`范式、`Being/End`范式或者`IAsyncResult`范式。从宏观上来看,这种范式是相当简单的。例如对于一个同步操作`DoStuff`:
|
||||
|
||||
```csharp
|
||||
class Handler
|
||||
{
|
||||
public int DoStuff(string arg);
|
||||
}
|
||||
```
|
||||
|
||||
在这种编程模型下会存在两个相关的方法:一个`BeginStuff`方法和一个`EndStuff`方法:
|
||||
|
||||
```csharp
|
||||
class Handler
|
||||
{
|
||||
public int DoStuff(string arg);
|
||||
|
||||
public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state);
|
||||
public int EndDoStuff(IAsyncResult asyncResult);
|
||||
}
|
||||
```
|
||||
|
||||
`BeginStuff`方法首先会接受所有`DoStuff`方法会接受的参数,同时其会接受一个`AsyncCallback`回调和一个**不透明**的状态对象`state`,而且这两个参数都可以为空。这个“开始”方法将负责异步操作的初始化,而且如果提供了回调函数,这个函数还会负责在异步操作完成之后调用这个回调函数,因此这个回调函数也常常被称为初始化操作的“下一步”。开始方法还会负责构建一个实现了`IAsyncResult`接口的对象,这个对象中的`AsyncState`属性由可选的`state`参数提供:
|
||||
|
||||
```csharp
|
||||
namespace System
|
||||
{
|
||||
public interface IAsyncResult
|
||||
{
|
||||
object? AsyncState { get; }
|
||||
WaitHandle AsyncWaitHandle { get; }
|
||||
bool IsCompleted { get; }
|
||||
bool CompletedSynchronously { get; }
|
||||
}
|
||||
|
||||
public delegate void AsyncCallback(IAsyncResult ar);
|
||||
}
|
||||
```
|
||||
|
||||
这个`IAsynResult`实例将会被开始方法返回,在调用`AsyncCallback`时这个实例也会被传递过去。当准备好使用该异步操作的结果时,调用者也会将这个`IAsyncResult`实例传递给结束方法,同时结束方法也会负责保证这个异步操作完成,如果没有完成该方法就会阻塞代码的运行直到完成。结束方法会返回异步操作的结果,异步操作过程中引发的各种错误和异常也会通过该方法传递出来。因此,对于下面这种同步的操作:
|
||||
|
||||
```csharp
|
||||
try
|
||||
{
|
||||
int i = handler.DoStuff(arg);
|
||||
Use(i);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
... // 在这里处理DoStuff方法和Use方法中引发的各种异常
|
||||
}
|
||||
```
|
||||
|
||||
可以使用开始/结束方法改写为异步运行的形式:
|
||||
|
||||
```csharp
|
||||
try
|
||||
{
|
||||
handler.BeginDoStuff(arg, iar =>
|
||||
{
|
||||
try
|
||||
{
|
||||
Handler handler = (Handler)iar.AsyncState!;
|
||||
int i = handler.EndDoStuff(iar);
|
||||
Use(i);
|
||||
}
|
||||
catch (Exception e2)
|
||||
{
|
||||
... // 处理从EndDoStuff方法和Use方法中引发的各种异常
|
||||
}
|
||||
}, handler);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
... // 处理从同步调用BeginDoStuff方法引发的各种异常
|
||||
}
|
||||
```
|
||||
|
||||
对于熟悉使用含有回调`API`语言的开发者来说,这样的代码应该会显得相当眼熟。
|
||||
|
||||
但是事情在这里变得更加复杂了。例如,这段代码存在“栈堆积”`stack dive`的问题。栈堆积就是代码在重复的调用方法中使得栈越来越深,直到发生栈溢出的现象。如果“异步”操作同步完成,开始方法将会使同步的调用回调方法,这就意味着对于开始方法的调用就会直接调用回调方法。同时考虑到“异步”方法同步完成却是一种非常常见的现象,它们只是承诺会异步的完成操作而不是只被允许异步的完成。例如一个对于某个网络操作的异步操作,比如读取一个套接字,如果你只需要从一次操作中读取少量的数据,例如在一次回答中只需要读取少量响应头的数据,你可能会直接读取大量数据存储在缓冲区中。相比于每次使用都使用系统调用但是只读取少量的数据,你一次读取了大量数据在缓冲区中,并在缓冲区失效之前都是从缓冲区中读取,这样就减少了需要调用昂贵的系统调用来和套接字交互的次数。像这样的缓冲区可能在你进行任何异步调用之后存在,例如第一次操作异步的完成对于缓冲区的填充,之后的若干次“异步”操作都不需要同I/O进行任何交互而直接通过与缓冲区的同步交互完成,直到缓冲区失效之后再次异步的填充缓冲区。因此当开始方法进行上述的一次调用时,开始方法会发现操作同步地完成了,因此开始方法同步地调用回调方法。此时,你有一个调用了开始方法的栈帧和一个调用了回调方法的栈帧。想想看如果回调方法再次调用了开始方法会发生什么?如果开始方法和回调方法都是被同步调用的,现在你就会在站上得到多个重复的栈帧,如此重复下去直到将栈上的空间耗尽。
|
||||
|
||||
这并不是杞人忧天,使用下面这段代码就可以很容易的复现这个问题:
|
||||
|
||||
```csharp
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
listener.Listen();
|
||||
|
||||
using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
client.Connect(listener.LocalEndPoint!);
|
||||
|
||||
using Socket server = listener.Accept();
|
||||
_ = server.SendAsync(new byte[100_000]);
|
||||
|
||||
var mres = new ManualResetEventSlim();
|
||||
byte[] buffer = new byte[1];
|
||||
|
||||
var stream = new NetworkStream(client);
|
||||
|
||||
void ReadAgain()
|
||||
{
|
||||
stream.BeginRead(buffer, 0, 1, iar =>
|
||||
{
|
||||
if (stream.EndRead(iar) != 0)
|
||||
{
|
||||
ReadAgain(); // uh oh!
|
||||
}
|
||||
else
|
||||
{
|
||||
mres.Set();
|
||||
}
|
||||
}, null);
|
||||
};
|
||||
ReadAgain();
|
||||
|
||||
mres.Wait();
|
||||
```
|
||||
|
||||
在代码中我们建立一个简单的客户端套接字和一个简单的服务端套接字并让它们连接。服务端会向客户端发送十万字节的信息,而客户端会使用开始/结束方法尝试去“异步的”接收这些信息(需要注意这样做是十分低效的,在教学实例之外的地方都不应该这样编写代码)。传递给`BeingRead`的回调函数通过调用`EndRead`方法停止读取,如果在读取过程中读取到数据(意味着还没有读取完成),就通过对于本地方法`ReadAgain`的递归调用来再次调用`BeingRead`方法继续读取。值得指出的是,在.NET Core中套接字操作比原来在.NET Framework中的版本快上许多,同时如果操作系统可以同步的完成这些操作,那么.NET Core中的操作也会同步完成(需要注意操作系统内核也有一个缓冲区来完成套接字接收操作)。因此,运行这段代码就会出现栈溢出。
|
||||
|
||||
鉴于这个问题非常容易出现,因此`APM`模型中内建了缓解这个问题的方法。容易想到有两种方法可以缓解这个问题:
|
||||
|
||||
1. 不允许`AsyncCallback`被同步调用。如果该回调方法始终都是被异步调用的,即使操作是异步完成的,栈堆叠的方法也就不存在了。但是这样做会降低性能,因为同步完成的操作(或者快到难以注意到的操作)是相当的常见的,强制这些操作的回调排队完成会增加相当可观的开销。
|
||||
2. 引入一个机制让调用者而不是回调函数在工作异步完成时完成剩余的工作。在这种情况下,我们就避免了引入额外的栈帧,在不增加栈深度的情况下完成了余下的工作。
|
||||
|
||||
`APM`模型使用了第二种方法。为了实现这个方法,`IAsyncResult`接口提供了另外两个成员:`IsCompleted`和`CompletedSynchronusly`。`IsCompeleted`成员告诉我们操作是否完成,在程序中可以反复检查这个成员直到它从`false`变成`true`。相对的,`CompletedSynchronously`在运行过程中不会变化,(或者它存在一个还未被发现的`bug`会导致这个值变化,笑),这个值的主要作用是判断后续的工作是应该由开始方法的调用者还是`AsyncCallback`来进行。如果`CompletedSynchronously`的值是`false`,说明这个操作是异步进行的,所有后续的工作应该由回调函数来进行处理;毕竟,如果工作是异步完成的,开始方法的调用者不能知道工作是何时完成的(如果开始方法的调用者调用了结束方法,那么结束方法就会阻塞直到工作完成)。反之,如果`CompletedSynchronously`的值是`true`,如果此时使用回调方法处理后续的工作就会引发栈堆叠问题,因为此时回调方法会在栈上比开始它更低的位置上进行后续的操作。因此任何在意栈堆叠问题的实现需要关注`CompletedSynchronously`的值,当为真的时候,让开始方法的调用者处理后续的工作,而回调方法在此时不应处理任何工作。这也是为什么`CompletedSynchronously`的值不能改变——开始方法的调用者和回调方法需要相同的值来保证后续工作在任何情况下都进行且只进行一次。
|
||||
|
||||
因此我们之前的`DoStuff`实例就需要被修改为:
|
||||
|
||||
```csharp
|
||||
try
|
||||
{
|
||||
IAsyncResult ar = handler.BeginDoStuff(arg, iar =>
|
||||
{
|
||||
if (!iar.CompletedSynchronously)
|
||||
{
|
||||
try
|
||||
{
|
||||
Handler handler = (Handler)iar.AsyncState!;
|
||||
int i = handler.EndDoStuff(iar);
|
||||
Use(i);
|
||||
}
|
||||
catch (Exception e2)
|
||||
{
|
||||
... // handle exceptions from EndDoStuff and Use
|
||||
}
|
||||
}
|
||||
}, handler);
|
||||
if (ar.CompletedSynchronously)
|
||||
{
|
||||
int i = handler.EndDoStuff(ar);
|
||||
Use(i);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
... // handle exceptions that emerge synchronously from BeginDoStuff and possibly EndDoStuff/Use
|
||||
}
|
||||
```
|
||||
|
||||
这里的代码已经~~显得冗长~~,而且我们还只研究了如何使用这种范式,还没有涉及如何实现这种范式。尽管大部分的开发者并不需要在这些子调用(例如实现`Socket.BeginReceive/EndReceive`这些方法去和操作系统交互),但是很多开发者需要组合这些操作(从一个“较大的”的异步操作调用多个异步操作),而这不仅需要使用其他的开始/结束方法,还需要自行实现你自己的开始/结束方法,这样你才能在其他的地方使用这个操作。同时,你还会注意到在上述的`DoStuff`范例中没有任何的控制流代码。如果需要引入一些控制流代码——即使是一个简单的循环——这也会立刻变成~~抖M才会编写的代码~~,同时也给无数的博客作者提供水`CSDN`的好题材。
|
||||
|
||||
所以让我们现在就来写一篇`CSDN`,给出一个完成的实例。在文章的开头我展示了一个`CopyStreamToStream`方法,这个方式会将一个流中的数据复制到另外一个流中(就是`Stream.CopyTo`方法所完成的工作,但是为了说明,让我们假设这个方法并不存在):
|
||||
|
||||
```csharp
|
||||
public void CopyStreamToStream(Stream source, Stream destination)
|
||||
{
|
||||
var buffer = new byte[0x1000];
|
||||
int numRead;
|
||||
while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
|
||||
{
|
||||
destination.Write(buffer, 0, numRead);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
直白的说,我们只需要不停的从一个流中读取数据然后写入到另外一个流中,直到我们没法从第一个流中读取到任何数据。现在让我们使用`APM`模型使用这个操作的异步模式吧:
|
||||
|
||||
```csharp
|
||||
public IAsyncResult BeginCopyStreamToStream(
|
||||
Stream source, Stream destination,
|
||||
AsyncCallback callback, object state)
|
||||
{
|
||||
var ar = new MyAsyncResult(state);
|
||||
var buffer = new byte[0x1000];
|
||||
|
||||
Action<IAsyncResult?> readWriteLoop = null!;
|
||||
readWriteLoop = iar =>
|
||||
{
|
||||
try
|
||||
{
|
||||
for (bool isRead = iar == null; ; isRead = !isRead)
|
||||
{
|
||||
if (isRead)
|
||||
{
|
||||
iar = source.BeginRead(buffer, 0, buffer.Length, static readResult =>
|
||||
{
|
||||
if (!readResult.CompletedSynchronously)
|
||||
{
|
||||
((Action<IAsyncResult?>)readResult.AsyncState!)(readResult);
|
||||
}
|
||||
}, readWriteLoop);
|
||||
|
||||
if (!iar.CompletedSynchronously)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int numRead = source.EndRead(iar!);
|
||||
if (numRead == 0)
|
||||
{
|
||||
ar.Complete(null);
|
||||
callback?.Invoke(ar);
|
||||
return;
|
||||
}
|
||||
|
||||
iar = destination.BeginWrite(buffer, 0, numRead, writeResult =>
|
||||
{
|
||||
if (!writeResult.CompletedSynchronously)
|
||||
{
|
||||
try
|
||||
{
|
||||
destination.EndWrite(writeResult);
|
||||
readWriteLoop(null);
|
||||
}
|
||||
catch (Exception e2)
|
||||
{
|
||||
ar.Complete(e);
|
||||
callback?.Invoke(ar);
|
||||
}
|
||||
}
|
||||
}, null);
|
||||
|
||||
if (!iar.CompletedSynchronously)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
destination.EndWrite(iar);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
ar.Complete(e);
|
||||
callback?.Invoke(ar);
|
||||
}
|
||||
};
|
||||
|
||||
readWriteLoop(null);
|
||||
|
||||
return ar;
|
||||
}
|
||||
|
||||
public void EndCopyStreamToStream(IAsyncResult asyncResult)
|
||||
{
|
||||
if (asyncResult is not MyAsyncResult ar)
|
||||
{
|
||||
throw new ArgumentException(null, nameof(asyncResult));
|
||||
}
|
||||
|
||||
ar.Wait();
|
||||
}
|
||||
|
||||
private sealed class MyAsyncResult : IAsyncResult
|
||||
{
|
||||
private bool _completed;
|
||||
private int _completedSynchronously;
|
||||
private ManualResetEvent? _event;
|
||||
private Exception? _error;
|
||||
|
||||
public MyAsyncResult(object? state) => AsyncState = state;
|
||||
|
||||
public object? AsyncState { get; }
|
||||
|
||||
public void Complete(Exception? error)
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
_completed = true;
|
||||
_error = error;
|
||||
_event?.Set();
|
||||
}
|
||||
}
|
||||
|
||||
public void Wait()
|
||||
{
|
||||
WaitHandle? h = null;
|
||||
lock (this)
|
||||
{
|
||||
if (_completed)
|
||||
{
|
||||
if (_error is not null)
|
||||
{
|
||||
throw _error;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
h = _event ??= new ManualResetEvent(false);
|
||||
}
|
||||
|
||||
h.WaitOne();
|
||||
if (_error is not null)
|
||||
{
|
||||
throw _error;
|
||||
}
|
||||
}
|
||||
|
||||
public WaitHandle AsyncWaitHandle
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
return _event ??= new ManualResetEvent(_completed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public bool CompletedSynchronously
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
if (_completedSynchronously == 0)
|
||||
{
|
||||
_completedSynchronously = _completed ? 1 : -1;
|
||||
}
|
||||
|
||||
return _completedSynchronously == 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public bool IsCompleted
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
return _completed;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
~~Yowsers~~。即使写完了这些繁文缛节,这实际上仍然不是一个完美的实现。例如,`IAsyncResult`的实现会在每次操作时上锁,而不是在任何可能的时候都使用无锁的实现;异常也是以原始的模型存储,如果使用`ExceptionDispatchInfo`可以让异常在传播的过程中含有调用栈的信息,在每次操作中都分配了大量的空间来存储变量(例如在每次`BeingWrite`调用时都会分配一片空间来存储委托),如此等等。现在想象这就是你每次编写方法时需要做的工作,每次当你需要编写一个可重用的异步方法来使用另外一个异步方法时,你需要自己完成上述所有的工作。而且如果你需要编写使用多个不同的`IAsyncResult`的可重用代码——就像在`async/await`范式中`Task.WhenAll`所完成的那样,难度又上升了一个等级;每个不同操作都会实现并暴露针对相关的`API`,这让编写一套逻辑代码并简单的复用它们也变得不可能(尽管一些库作者可能会通过提供一层针对回调方法的新抽象来方便开发者编写需要访问暴露`API`的回调方法)。
|
||||
|
||||
上述这些复杂性也说明只有很少的一部分人尝试过这样编写代码,而且对于这些人来说,`bug`也往往如影随形。而且这并不是一个`APM`范式的黑点,这是所有使用基于回调的异步方法都具有的缺点。我们已经十分习惯现代语言都有的控制流结构所带来的强大和便利,因此使用会破坏这种结构的基于回调的异步方式会带来大量的复杂性也是可以理解的。同时,也没有任何主流的语言提供了更好的替代。
|
||||
|
||||
我们需要一种更好的办法,一个既继承了我们在`APM`范式中所学习到所有经验也规避了其所有的各种缺点的方式。一个有趣的点是,`APM`范式只是一种编程范式,运行时、核心库和编译器在使用或者实现这种范式的过程中没有提供任何协助。
|
||||
|
||||
### 基于事件的异步范式
|
||||
|
||||
在.NET Framework 2.0中提供了一系列的`API`来实现一种不同的异步编程范式,当时设想这种范式的主要应用场景是客户端应用程序。这种基于事件的异步范式,也被称作`EAP`范式,也是以提供一系列成员的方式提供的,包含一个用于初始化异步操作的方式和一个监听异步操作是否完成的事件。因此上述的`DoStuff`示例可能会暴露如下的一系列成员:
|
||||
|
||||
```csharp
|
||||
class Handler
|
||||
{
|
||||
public int DoStuff(string arg);
|
||||
|
||||
public void DoStuffAsync(string arg, object? userToken);
|
||||
public event DoStuffEventHandler? DoStuffCompleted;
|
||||
}
|
||||
|
||||
public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e);
|
||||
|
||||
public class DoStuffEventArgs : AsyncCompletedEventArgs
|
||||
{
|
||||
public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) :
|
||||
base(error, canceled, usertoken) => Result = result;
|
||||
|
||||
public int Result { get; }
|
||||
}
|
||||
```
|
||||
|
||||
首先通过`DoStuffCompleted`事件注册需要在完成异步操作时进行的工作然后调用`DoStuff`方法,这个方法将初始化异步操作,一旦异步操作完成,`DoStuffCompleted`事件将会被调用者引发。已经注册的回调方法可以运行剩余的工作,例如验证提供的`userToken`是否是期望的`userToken`,同时我们可以注册多个回调方法在异步操作完成的时候运行。
|
||||
|
||||
这个范式确实让一系列用例的编写更好编写,同时也让一系列用例变得更加复杂(例如上述的`CopyStreamToStream`例子)。这种范式的影响范围并不大,只在一次.NET Framework的更新中引入便匆匆地消失了,除了留下了一系列为了支持这种范式而实现的`API`,例如:
|
||||
|
||||
```csharp
|
||||
class Handler
|
||||
{
|
||||
public int DoStuff(string arg);
|
||||
|
||||
public void DoStuffAsync(string arg, object? userToken);
|
||||
public event DoStuffEventHandler? DoStuffCompleted;
|
||||
}
|
||||
|
||||
public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e);
|
||||
|
||||
public class DoStuffEventArgs : AsyncCompletedEventArgs
|
||||
{
|
||||
public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) :
|
||||
base(error, canceled, usertoken) => Result = result;
|
||||
|
||||
public int Result { get; }
|
||||
}
|
||||
```
|
||||
|
||||
但是这种编程范式确实在`APM`范式所没有注意到的地方前进了一大步,并且这一点还保留到了我们今天所介绍的模型中:[同步上下文](https://github.com/dotnet/runtime/blob/967a59712996c2cdb8ce2f65fb3167afbd8b01f3/src/libraries/System.Private.CoreLib/src/System/Threading/SynchronizationContext.cs#L6) (`SynchronizationContext`)。
|
||||
|
||||
同步上下文作为一个对于通用调度器的实现,也是在.NET Framework中引入的。在实践中,同步上下文最常用的方法是`Post`,这个方法将一个工作实现传递给上下文所代表的一种调度器。举例来说,一个基础的同步上下文实现是一个线程池`ThreadPool`,因此`Post`方法的典型实现就是`ThreadPool.QueueUserWorkItem`方法,这个方法将让线程池在池中任意的线程上以指定的状态调用指定的委托。然而,同步上下文的巧妙之处不仅在于提供了对于不同调度器的支持,而是提供了一种针对不同的应用模型使用不同调度方法的抽象能力。
|
||||
|
||||
考虑像Windows Forms之类的`UI`框架。对于大多数工作在Windows上的`UI`框架来说,控件往往关联到一个特定的线程,这个线程负责运行一个消息管理中心,这个中心用来运行那些需要同控件交互的工作:只有这个控件有能力来修改控件,任何其他试图同控件进行交互的线程都需要发送消息到这个消息控制中心。Windows Forms通过一系列方法来实现这一点,例如`Control.BeingInvoke`,这类方法将会把提供的委托和参数传递给同这个控件相关联的线程来运行。你可以写出如下的代码:
|
||||
|
||||
```csharp
|
||||
private void button1_Click(object sender, EventArgs e)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_ =>
|
||||
{
|
||||
string message = ComputeMessage();
|
||||
button1.BeginInvoke(() =>
|
||||
{
|
||||
button1.Text = message;
|
||||
});
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
这段代码首先将`ComputeMessage`方法交给线程池中的一个线程运行(这样可以保证该方法在运行时`UI`界面不会卡死),当上述工作完成之后,再将一个更新`button1`标签的委托传递给关联到`button1`的线程运行。简单而易于理解。在`WPF`框架中也是类似的逻辑,使用一个被称为`Dispatcher`的类型:
|
||||
|
||||
```csharp
|
||||
private void button1_Click(object sender, RoutedEventArgs e)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_ =>
|
||||
{
|
||||
string message = ComputeMessage();
|
||||
button1.Dispatcher.InvokeAsync(() =>
|
||||
{
|
||||
button1.Content = message;
|
||||
});
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
`.NET MAUI`亦然。但是如果我想将这部分的逻辑封装到一个独立的辅助函数中,例如下面这种:
|
||||
|
||||
```csharp
|
||||
// 调用ComputeMessage然后触发更新逻辑
|
||||
internal static void ComputeMessageAndInvokeUpdate(Action<string> update) { ... }
|
||||
```
|
||||
|
||||
这样我就可以直接:
|
||||
|
||||
```csharp
|
||||
private void button1_Click(object sender, EventArgs e)
|
||||
{
|
||||
ComputeMessageAndInvokeUpdate(message => button1.Text = message);
|
||||
}
|
||||
```
|
||||
|
||||
但是`ComputerMessageAndInvokeUpdate`应该如何实现才能适配各种类型的应用程序呢?难道需要硬编码所有可能涉及的`UI`框架吗?这就是`SynchronizationContext`大显神威的地方,我们可以这样实现这个方法:
|
||||
|
||||
```csharp
|
||||
internal static void ComputeMessageAndInvokeUpdate(Action<string> update)
|
||||
{
|
||||
SynchronizationContext? sc = SynchronizationContext.Current;
|
||||
ThreadPool.QueueUserWorkItem(_ =>
|
||||
{
|
||||
string message = ComputeMessage();
|
||||
if (sc is not null)
|
||||
{
|
||||
sc.Post(_ => update(message), null);
|
||||
}
|
||||
else
|
||||
{
|
||||
update(message);
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
在这个实现中将`SynchronizationContext`作为同`UI`进行交互的调度器之抽象。任何应用程序模型都需要保证在`SynchronizationContext.Current`属性上注册一个继承了`SynchronizationContext`的类,这个就会完成调度相关的工作。例如在`Windows Forms`中:
|
||||
|
||||
```csharp
|
||||
public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable
|
||||
{
|
||||
public override void Post(SendOrPostCallback d, object? state) =>
|
||||
_controlToSendTo?.BeginInvoke(d, new object?[] { state });
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
在`WPF`中有:
|
||||
|
||||
```
|
||||
public sealed class DispatcherSynchronizationContext : SynchronizationContext
|
||||
{
|
||||
public override void Post(SendOrPostCallback d, Object state) =>
|
||||
_dispatcher.BeginInvoke(_priority, d, state);
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
`ASP.NET`*曾经*也有过一个实现,尽管Web框架实际上并不关心是哪个线程在运行指定的工作,但是非常关心指定工作和那个请求相关,因此该实现主要负责保证多个线程不会在同时访问同一个`HttpContext`。
|
||||
|
||||
```csharp
|
||||
internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase
|
||||
{
|
||||
public override void Post(SendOrPostCallback callback, Object state) =>
|
||||
_state.Helper.QueueAsynchronous(() => callback(state));
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
这个概念也并不局限于像上面的主流应用程序模型。例如在[xunit](https://github.com/xunit/xunit),一个流行的单元测试框架(`.NET`核心代码仓库也使用了)中也实现了需要自定义的`SynchronizationContext`。例如限制同步运行单元测试时同时运行单元测试数量就可以用`SynchroniaztionContext`实现:
|
||||
|
||||
```
|
||||
public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable
|
||||
{
|
||||
public override void Post(SendOrPostCallback d, object? state)
|
||||
{
|
||||
var context = ExecutionContext.Capture();
|
||||
workQueue.Enqueue((d, state, context));
|
||||
workReady.Set();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`MaxConcurrentSyncContext`中的`Post`方法只是将需要完成的工作压入其内部的工作队列中,这样就能够控制同时多少工作能够并行的运行。
|
||||
|
||||
那么同步上下文这个概念时如何同基于事件的异步范式关联起来的呢?`EAP`范式和同步上下文都是在同一时间引入的,而`EAP`范式要求当异步操作启动的时候,完成事件需要由当前`SynchronizationContext`进行调度。为了简化这个过程(可能反而引入多余的复杂性),在`System.ComponentModel`命名控件中引入了一些帮助程序,具体来说是`AsyncOperation`和`AsyncOperationManager`。其中前者是一个由用户提供的状态对象和捕获到的`SynchronizationContext`组成的元组,后者是一个捕获`SynchronizationContext`和创建`AsyncOperation`对象的工厂类。`EAP`范式会在实现中使用上述帮助类,例如`Ping.SendAsync`会首先调用`AsyncOperationManager.CreateOperationi`来捕获同步上下文,然后当异步操作完成的时候调用`AsyncOperation.PostOperationCompleted`方法来调用捕获到的`SynchronizationContext.Post`方法。
|
||||
|
||||
`SynchronizationContext`还提供了其他一些后面会用到的小工具。这个类暴露了`OperationStarted`和`OperationCompleted`两个方法。这个虚方法在基类中的实现都是空的,并不完成任何工作。但是继承其的实现可能会重载这些来了解运行中的操作。`EAP`的实现就会在每个操作开始和结束的时候调用`OperationStarted`和`OperationCompleted`,来方便可能存在的同步上下文跟踪工作的进度。鉴于在`EAP`范式中启动异步操作的方法往往不会返回任何东西,不能指望可以获得任何帮助你跟踪工作进度的东西,因而可能获得工作进度的同步上下文就显得很有价值了。
|
||||
|
||||
综上所说,我们需要一些比`APM`编程范式更好的东西,而`EAP`范式引入了一些新的东西,但是没有解决我们面对的核心问题,我们仍然需要一些更好的东西。
|
||||
|
||||
### 进入Task时代
|
||||
|
||||
在.NET Framework 4.0中引入了`System.Threading.Tasks.Task`类型。当时`Task`类型还只代表某些异步操作的最终完成(在其他编程框架中可能成称为`promise`或者`future`)。当一个操作开始时,创建一个`Task`来表示这个操作,当这个操作完成之后,操作的结果就会被保存在这个`Task`中。简单而明确。但是`Task`相较于`IAsyncResult`提供的重要特点是其蕴含了一个任务在持续运行的状态。这个特点让你能够随意找到一个`Task`,让它在异步操作完成的时候异步的通知你,而不用你关注任务当前是处在已经完成、没有完成、正在完成等各种状态。为什么这点非常重要?首先想想`APM`范式中存在的两个主要问题:
|
||||
|
||||
1. 你需要对每个操作实现一个自定义的`IAsycResult`实现:库中没有任何内置开箱即用的`IAsycResult`实现。
|
||||
2. 你需要在调用开始方法之前就知道在操作结束的时候需要做什么。这让编写使用任意异步操作的组合代码或者通用运行时非常困难。
|
||||
|
||||
相对的,`Task`提供了一个通用的接口让你在启动一个异步操作之后“接触”这个操作,还提供了针对“持续”的抽象,这样你就不需要为启动异步操作的方法提供一个持续性。任何需要进行异步操作的人都可以产生一个`Task`,任何人需要使用异步操作的人都可以使用一个`Task`,在这个过程中不用自定义任何东西,`Task`成为了沟通异步操作的生产者和消费者之间最重要的桥梁。这一点大大改变了.NET框架。
|
||||
|
||||
现在让我们深入理解`Task`所带来的重要意义。与其直接去研究错综复杂的`Task`源代码,我们将尝试去实现一个`Task`的简单版本。这不会是一个完善的实现,只会完成基础的功能来让我们更好的理解什么是`Task`,即一个负责协调设置和存储完成信号的数据结构。
|
||||
|
||||
开始时`Task`中只有很少的字段:
|
||||
|
||||
```csharp
|
||||
class MyTask
|
||||
{
|
||||
private bool _completed;
|
||||
private Exception? _error;
|
||||
private Action<MyTask>? _continuation;
|
||||
private ExecutionContext? _ec;
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
我们首先需要一个字段告诉我们任务是否完成`_completed`,一个字段存储造成任务执行失败的错误`_error`;如果我们需要实现一个泛型的`MyTask<TResult>`,还需要一个`private TResult _result`字段来存储操作运行完成之后的结果。到目前为止的实现和`IAsyncResult`相关的实现非常类似(当然这不是一个巧合)。`_continuation`字段时实现中最重要的字段。在这个简单的实现中,我们只支持一个简单的后续过程,在真正的`Task`实现中是一个`object`类型的字段,这样既可以是一个独立的后续过程,也可以是一个后续过程的列表。这个委托会在任务完成的时候调用。
|
||||
|
||||
让我们继续深入。如上所述,`Task`相较于之前的异步执行模型一个基础的优势是在异步操作开始之后再提供后续需要完成的工作。因此我们需要一个方法来实现这个功能:
|
||||
|
||||
```csharp
|
||||
public void ContinueWith(Action<MyTask> action)
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
if (_completed)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_ => action(this));
|
||||
}
|
||||
else if (_continuation is not null)
|
||||
{
|
||||
throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation.");
|
||||
}
|
||||
else
|
||||
{
|
||||
_continuation = action;
|
||||
_ec = ExecutionContext.Capture();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
如果在调用`ContinueWith`的时候异步操作已经完成,那么就直接将该委托的执行加入执行队列。反之,这个方法就会将存储这个委托,当异步任务完成的时候进行执行(这个方法同时也存储一个被称为`ExecutionContext`的对象,会在后续调用委托的涉及到,我们后续会继续介绍)。
|
||||
|
||||
然后我们需要能够在异步过程完成的时候标记任务已经完成。我们将添加两个方法,一个负责标记任务成功完成,一个负责标记任务报错退出。
|
||||
|
||||
```csharp
|
||||
public void SetResult() => Complete(null);
|
||||
|
||||
public void SetException(Exception error) => Complete(error);
|
||||
|
||||
private void Complete(Exception? error)
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
if (_completed)
|
||||
{
|
||||
throw new InvalidOperationException("Already completed");
|
||||
}
|
||||
|
||||
_error = error;
|
||||
_completed = true;
|
||||
|
||||
if (_continuation is not null)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(_ =>
|
||||
{
|
||||
if (_ec is not null)
|
||||
{
|
||||
ExecutionContext.Run(_ec, _ => _continuation(this), null);
|
||||
}
|
||||
else
|
||||
{
|
||||
_continuation(this);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
我们会存储任何的错误、标记任务已经完成,如果已经注册的任何的后续过程,我们也会引发其进行执行。
|
||||
|
||||
最后我们还需要一个方法将在工作中发生的任何传递出来,(如果是泛型类型,还需要将执行结果返回),为了方便某些特定的场景,我们将允许这个方法阻塞直到异步操作完成(通过调用`ContinueWith`注册一个`ManualResetEventSlim`实现)。
|
||||
|
||||
```csharp
|
||||
public void Wait()
|
||||
{
|
||||
ManualResetEventSlim? mres = null;
|
||||
lock (this)
|
||||
{
|
||||
if (!_completed)
|
||||
{
|
||||
mres = new ManualResetEventSlim();
|
||||
ContinueWith(_ => mres.Set());
|
||||
}
|
||||
}
|
||||
|
||||
mres?.Wait();
|
||||
if (_error is not null)
|
||||
{
|
||||
ExceptionDispatchInfo.Throw(_error);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
这就是一个基础的`Task`实现。当然需要指出的是实际的`Task`会复杂很多:
|
||||
|
||||
- 支持设置任意数量的后续工作;
|
||||
- 支持配置其的工作行为(例如配置后续工作是应该进入工作队列等待执行还是作为任务完成的一部分同步被调用);
|
||||
- 支持存储多个错误;
|
||||
- 支持取消异步操作;
|
||||
- 一系列的帮助函数(例如`Task.Run`创建一个代表在线程池上运行委托的`Task`)。
|
||||
|
||||
但是这些内容中没有什么奥秘,核心工作原理和我们自行实现的是一样的。
|
||||
|
||||
你可以会注意到我们自行实现的`MyTask`直接公开了`SetResult/SetException`方法,而`Task`没有;这是因为`Task`是以`internal`声明了上述两个方法,同时`System.Threading.Tasks.TaskCompletionSource`类型负责作为一个独立的`Task`生产者和管理任务的完成。这样做的目的并不是出于技术目的,只是将负责控制完成的方法从消费`Task`的方法中分离出来。这样你就可以通过保留`TaskCompletionSource`对象来控制`Task`的完成,不必担心你创建的`Task`在你不知道的地方被完成。(`CancellationToken`和`CanellationTokenSource`也是处于同样的设计考虑,`CancellationToken`是一个包装`CancellationTokenSource`的结构,只暴露了和接受消费信号相关的结构而缺少产生一个取消信号的能力,这样就限制只有`CancellationToeknSource`可以产生取消信号。)
|
||||
|
||||
当前我们也可以像`Task`一样为我们自己的`MyTask`添加各种工具函数。例如我们添加一个`MyTask.WhenAll`:
|
||||
|
||||
```csharp
|
||||
public static MyTask WhenAll(MyTask t1, MyTask t2)
|
||||
{
|
||||
var t = new MyTask();
|
||||
|
||||
int remaining = 2;
|
||||
Exception? e = null;
|
||||
|
||||
Action<MyTask> continuation = completed =>
|
||||
{
|
||||
e ??= completed._error; // just store a single exception for simplicity
|
||||
if (Interlocked.Decrement(ref remaining) == 0)
|
||||
{
|
||||
if (e is not null) t.SetException(e);
|
||||
else t.SetResult();
|
||||
}
|
||||
};
|
||||
|
||||
t1.ContinueWith(continuation);
|
||||
t2.ContinueWith(continuation);
|
||||
|
||||
return t;
|
||||
}
|
||||
```
|
||||
|
||||
然后是一个`MyTask.Run`的示例:
|
||||
|
||||
```csharp
|
||||
public static MyTask Run(Action action)
|
||||
{
|
||||
var t = new MyTask();
|
||||
|
||||
ThreadPool.QueueUserWorkItem(_ =>
|
||||
{
|
||||
try
|
||||
{
|
||||
action();
|
||||
t.SetResult();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
t.SetException(e);
|
||||
}
|
||||
});
|
||||
|
||||
return t;
|
||||
}
|
||||
```
|
||||
|
||||
还有一个简单的`MyTask.Delay`:
|
||||
|
||||
```csharp
|
||||
public static MyTask Delay(TimeSpan delay)
|
||||
{
|
||||
var t = new MyTask();
|
||||
|
||||
var timer = new Timer(_ => t.SetResult());
|
||||
timer.Change(delay, Timeout.InfiniteTimeSpan);
|
||||
|
||||
return t;
|
||||
}
|
||||
```
|
||||
|
||||
在`Task`横空出世之后,之前的所有异步编程范式都成为了过去式。任何使用过去的编程范式暴露的异步`API`,现在都提供了返回`Task`的方法。
|
||||
|
||||
### 添加Value Task
|
||||
|
||||
直到现在,`Task`都是.NET异步编程中的主力军,在每次新版本发布或者社区发布的新`API`都会返回`Task`或者`Task<TResult>`。但是,`Task`是一个类,而每次创建一个类是都需要分配一次内存。在大多数情况下,为一个会长期存在的异步操作进行一次内存分配时无关紧要的,并不会操作明显的性能影响。但是正如之前所说的,同步完成的异步操作十分创建。例如,`Stream.ReadAsync`会返回一个`Task<int>`,但是如果是在一个类似与`BufferedStream`的实现上调用该方法,那么你的调用由很大概率就会是同步完成的,因为大多数读取只需要从内存中的缓冲区中读取数据而不需要通过系统调用访问`I/O`。在这种情况下还需要分配一个额外的对象显然是不划算的(而且在`APM`范式中也存在这个问题)。对于返回非泛型类型的方法来说,还可以通过返回一个预先分配的已完成单例来缓解这个问题,而且`Task`也提供了一个`Task.CompletedTask`。但是对于泛型的`Task<TResult>`则不行,因为不可能针对每个不同的`TResult`都创建一个对应的单例。那么我们可以如何让这个同步操作更快呢?
|
||||
|
||||
我们可以试图缓存一个常见的`Task<TResult>`。例如`Task<bool>`就非常的常见,而且也只存在两种需要缓存的情况:当结果为真时的一个对象和结果为假时的一个对象。同样的,尽管我们可能不想尝试(也不太可能)去缓存数亿个`Task<int>`对象以覆盖所有可能出现的值,但是鉴于很小的`Int32`值时非常常见的,我们可以尝试去缓存给一些较小的结果,例如从-1到8的结果。 而且对于其他任意的类型来说,`default`就是一个常常出现的值,因此缓存一个结果是`default(TResult)`的`Task`。而且 在最近的.NET版本中添加了一个称作`Task.FromResult`辅助函数,该函数就会完成与上述类似的工作,如果存在可以重复使用的`Task<Result>`单例就返回该单例,反之再创建一个新的`Task`对象。对于其他常常出现的值也也可以设计方法进行缓存。还是以`Stream.ReadAsync`为例子,这个方法常常会在同一个流上调用多次,而且每次读取的值都是允许读取的字节数量`count`。再考虑到使用者往往只需要读取到这个`count`值,因此`Stream.ReadAsync`操作常常会重复返回有着相同`int`值的`Task`对象。为了避免在这种情况下重复的内存分配,许多`Stream`的实现(例如`MemoryStream`)会缓存上一次成功缓存的`Task<int>`对象,如果下一次读取仍然是同步返回的且返回了相同的数值,该方法就会返回上一次读取创建的`Task<int>`对象。但是仍然会存在许多无法覆盖的其他情况,能不能找到一种更加优雅的解决方案来来避免在异步操作同步完成的时候避免创建新的对象,尤其是在性能非常重要的场景下。
|
||||
|
||||
这就是`ValueTask<TResult>`诞生的背景([这篇博客](https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/)详细测试了`ValueTask<TResult>`的性能)。`ValueTask<TResult>`在诞生之初是`TResult`和`Task<TResult>`的歧视性联合。在这些争论尘埃落定之后,`ValueTask<TResult>`便不是一个立刻可以返回的结果就是一个对未来结果的承诺:
|
||||
|
||||
```csharp
|
||||
public readonly struct ValueTask<TResult>
|
||||
{
|
||||
private readonly Task<TResult>? _task;
|
||||
private readonly TResult _result;
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
一个方法可以通过返回`ValueTask<TResult>`来避免在`TResult`已知的情况下创建新的`Task<Result>`对象,当然返回的类型会更大、返回的结果更加不直接。
|
||||
|
||||
当然,实际应用中也存在对性能需求相当高的场合,甚至你会想在操作异步完成的时候也避免`Task<TResult>`对象的分配。例如`Socket`作为整个网络栈的最底层,对于网络中的大多数服务来说`SendAsync`和`ReceiveAsync`都是绝对的热点代码路径,不论是同步操作还是异步操作都是非常常见的(鉴于内核中的缓存,大多数发送请求都会同步完成,部分接受请求会同步完成)。因此对于像`Socket`这类的工具,如果我们可以在异步我弄成和同步完成的情况下都实现无内存分配的调用是十分有意义的。
|
||||
|
||||
这就是`System.Threading.Tasks.Sources.IValueTaskSource<TResult>`产生的背景:
|
||||
|
||||
```csharp
|
||||
public interface IValueTaskSource<out TResult>
|
||||
{
|
||||
ValueTaskSourceStatus GetStatus(short token);
|
||||
void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags);
|
||||
TResult GetResult(short token);
|
||||
}
|
||||
```
|
||||
|
||||
该接口允许自行为`ValueTask<TResult>`实现一个“背后“的对象,并且让这个对象提供了获得操作结构的`GetResult`方法和设置操作后续工作的`OnCompleted`。在这个接口出现之后,`ValueTask<TResult>`也小小修改了定义,`Task<TResult>? _task`字段被一个`object? _obj`字段替换了:
|
||||
|
||||
```csharp
|
||||
public readonly struct ValueTask<TResult>
|
||||
{
|
||||
private readonly object? _obj;
|
||||
private readonly TResult _result;
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
现在`_obj`字段就可以存储一个`IValueTaskSource<TReuslt>`对象了。而且相较于`Task<TResult>`在完成之后就只能保持完成的状态,不能变回未完成的状态,`IValueTaskSource<TResult>`的实现有着完全的控制权,可以在已完成和未完成的状态之间双向变化。但是`ValueTask<TResult>`要求一个特定的实例只能被使用一次,不能观察到这个实例在使用之后的任何变化,这也是分析规则[CA2012](https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2012)存在的意义。这就让让类似于`Socket`的工具为重复的调用建立一个`IValueTaskSource<TResult>`对象池。从实现上来说,`Socket`会至多缓存两个类似的实例,一个用于读取操作一个用于写入操作,因为在99.999%的情况下同时只会有一个发送请求和一个接受请求。
|
||||
|
||||
值得说明的是我只提到了`ValueTask<TResult>`却没有提到`ValueTask`。因为如果只是为了在操作同步完成的时候避免内存分配,非泛型类型的`ValueTask`指挥提供很少的性能提升,因为在同样的条件下可以使用`Task.CompletedTask`。但是如果要考虑在异步完成的时候通过缓存对象避免内存分配,非泛型类型也有作用。因而,在引入`IValueTaskSource<TResult>`的同时,`IValueTaskSource`和`ValueTask`也被引入了。
|
||||
|
||||
到目前我们,我们已经可以利用`Task`,`Task<TResult>`,`ValueTask`,`ValueTask<TResult>`表示各种各样的异步操作,并注册在操作完成之前和之后注册后续的操作。
|
||||
|
||||
但是这些后续操作仍然是回调方法,我们仍然陷入了基于回调的异步控制流程。该怎么办?
|
||||
|
||||
### 迭代器成为大救星
|
||||
|
||||
解决方案的先声实际上在`Task`诞生之前就出现了,在C# 2.0引入迭代器语法的时候。
|
||||
|
||||
你可能会问,迭代器就是`IEnumerable<T>`吗?这是其中的一个。迭代器是一个让编译器将你编写的方法自动实现`IEnumerable<T>`或者`IEnumertor<T>`的语法。例如我可以用迭代器语法编写一个产生斐波那契数列的可遍历对象:
|
||||
|
||||
```csharp
|
||||
public static IEnumerable<int> Fib()
|
||||
{
|
||||
int prev = 0, next = 1;
|
||||
yield return prev;
|
||||
yield return next;
|
||||
|
||||
while (true)
|
||||
{
|
||||
int sum = prev + next;
|
||||
yield return sum;
|
||||
prev = next;
|
||||
next = sum;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
这个方法可以直接用`foreach`遍历,也可以和`System.Linq.Enumerable`中提供的各种方法组合,也可以直接用一个`IEnumerator<T>`对象遍历。
|
||||
|
||||
```csharp
|
||||
foreach (int i in Fib())
|
||||
{
|
||||
if (i > 100) break;
|
||||
Console.Write($"{i} ");
|
||||
}
|
||||
```
|
||||
|
||||
```csharp
|
||||
foreach (int i in Fib().Take(12))
|
||||
{
|
||||
Console.Write($"{i} ");
|
||||
}
|
||||
```
|
||||
|
||||
```csharp
|
||||
using IEnumerator<int> e = Fib().GetEnumerator();
|
||||
while (e.MoveNext())
|
||||
{
|
||||
int i = e.Current;
|
||||
if (i > 100) break;
|
||||
Console.Write($"{i} ");
|
||||
}
|
||||
```
|
||||
|
67
YaeBlog/source/posts/cncc-2024.md
Normal file
67
YaeBlog/source/posts/cncc-2024.md
Normal file
|
@ -0,0 +1,67 @@
|
|||
---
|
||||
title: 2024中国计算机大会
|
||||
date: 2024-11-03T14:06:36.4212070+08:00
|
||||
tags:
|
||||
- 杂谈
|
||||
---
|
||||
|
||||
2024年的中国计算机大会于10月24日到10月26日在浙江省金华市东阳市横店镇举办,而鄙人在下不才我,有幸受到实验室资助前去参观学习。
|
||||
|
||||
<!--more-->
|
||||
|
||||
首先开幕式镇楼。
|
||||
|
||||
![image-20241102212738598](./cncc-2024/image-20241102212738598.png)
|
||||
|
||||
## 学术上
|
||||
|
||||
大会每天的日程是上午的大会特邀报告和大会论坛,下午的各个分论坛讨论。老实说,大会上午的报告和论坛我都没有特别感兴趣,因此这里将重点放在我参加的三个分论坛上。
|
||||
|
||||
### AI时代的异构融合操作系统:聚散终有时,融合亦有期
|
||||
|
||||
第一个报告是华为庞加莱实验室秦彬娟老师的《异构智算时代的操作系统演进》。报告高屋建瓴,从比较宏观的角度上介绍了当前异构融合操作系统诞生的背景、发展的方向。在报告中重点介绍了一种异构融合操作系统的设计思路:通过三层架构,基于互联池化技术,构建AI时代的融合算力系统。系统中的三层包括:(1)池化基础底层,包括多设备的融合和池化设备虚拟化;(2)异构融合核心子系统,例如异构融合调度系统、异构融合内存和异构融合存储系统;(3)异构核心服务。总的来说,这个报告在一定程度上勾勒出了未来一个异构融合操作系统应有的各项功能,但是显然这一操作系统的实现还存在着明显的困难。
|
||||
|
||||
![image-20241102211959206](./cncc-2024/image-20241102211959206.png)
|
||||
|
||||
下面一个报告是较为有干货的报告,北京航空航天大学刘瀚骋老师的《异构融合OS及多样性内存管理框架》。报告中介绍了一个称作`FMMU`的系统,是对于异构融合操作系统中内存管理系统的探索。报告中首先介绍了内存池化技术对于异构融合操作系统的重要性,指出分布式共享内存(Distributed Shared Memory)可能是实现内存池化技术的未来。然后介绍了将部分内存管理中的计算卸载到可编程网络硬件中来加速分布式内存访问的新思路。最后在报告中提到了内存管理技术如何解决错误预测和错误回复的问题。虽然在听的时候没太注意,但是现在总结的时候才发现这个报告的思路似乎有点混乱,尤其是最后一点和内存管理系统并没有什么直接的关系,而且这个内存管理系统似乎不是**异构系统**的内存管理,反而是分布式系统的内存管理。不过总的来说,这个报告还是非常实际的,介绍了不少当前异构融合操作系统中的内存管理面临的问题和解决问题的探索。
|
||||
|
||||
![image-20241102212355390](./cncc-2024/image-20241102212355390.png)
|
||||
|
||||
第三个报告是国防科技大学李东升老师的《异构计算环境下的分布式深度学习训练》。报告首先从李老师的主业——并行计算起手,介绍了深度学习训练过程中主要的各种并行方法,例如数据并行、模型并行和混合并行等,指出目前大模型的并行训练存在着计算/存储/通信难的问题。因此,提出了一个智能模型训练并行任务划分方法:(1)基于符号算子的计算图定义方法;(2)面向Transformer模型的流水线并行任务划分方法;(3)异构资源感知的流水线并行任务划分方法。然后针对分布式模型训练中通信调度存在的通信墙、数据依赖关系复杂等的问题,提出综合词嵌入表的稀疏通信调度技术、流水线并行的P2P通信调度技术、模型计算的统一操作执行引擎和网络链路感知的通信执行引擎的通信调度技术。最后提到了智能模型训练 的内存优化技术,针对现有重计算技术(re-computing)和存储交换(swapping)技术存在的问题,提出了一种面向大型智能模型训练的细粒度内存优化方法`DELTA`。
|
||||
|
||||
最后一个报告是上海交通大学杜冬冬老师的《软硬芯异构融合操作系统的多个维度》。报告伊始,杜老师就抛出一个问题:操作系统的演进应该是提供新的抽象还是兼容现有的抽象?在回答这个问题之前,杜老师首先介绍他们一个异构融合操作系统的设计思路:层OS架构的思路,通过设置两个层次——全局OS和本地OS,全局OS在本地OS的基础上提供一层跨`XPU`的能力。杜老师设计的这个系统称作`XPU-Shim`,在设计这个系统时就面对着前面的问题,是提供新的抽象还是兼容现有的抽象。`XPU-Shim`的回答是兼容现有的抽象,在底层的CXL、UB等内存语义总线的基础上实现了传统的Socket抽象,提供了低时延、高吞吐的协同能力。在操作系统的抽象问题之外,杜老师还就云上GPU应用的启动时延问题进行了讨论,深入解释了通过状态复用完全跳过初始化阶段从而加速应用冷启动过程的思路。
|
||||
|
||||
Plane讨论没有参加。
|
||||
|
||||
### 编译系统前沿技术与应用
|
||||
|
||||
第一个报告是清华大学陈文光老师的《神经网络全同态编译器》。这个报告可以说证明了“编译技术的人才活跃在各行各业”,报告中的主要内容就是编译技术如何助力机密计算中的全同态加密应用在神经网络的推理中。全同态加密算法实现了“数据可用不可见”的概念,允许程序直接在密文上进行乘法和加法运算,但是限制也是只能进行加法和乘法运算,而且过多的乘法操作会造成计算之后解密失败。该编译器成为`ANT-ACE`,首先通过设计新的五层中间表示(IR)实现了自动化全同态加密程序生成和面向性能的优化设计,在实现基本的编译工作之外,`ANT-ACE`提供了一定的调试支持,通过部分支持对于模型的部分加密支持和运行时校验为解决加密之后程序推理准确率下降的问题。
|
||||
|
||||
接下来三个报告都是关于如何将人工智能技术同编译技术解决起来。计算所冯晓兵老师的报告《人工智能编译领域的应用探索》,介绍了大模型同编译后端的两个结合方向:(1)使用大模型生成编译器的后端代码;(2)使用大模型替换编译器的后端,直接利用大模型生成汇编代码。华为毕昇编译器架构师魏伟的报告《AI for Compiler的技术探索和应用实践》则是介绍了毕昇编译器的自动调优器`Autotuner`,这个一个自动寻找最优化的编译参数组合工具。复旦大学张为华老师的报告《基于学习的编译优化技术》也是一个类似的工作,利用机器学习技术挖掘已有的编译系统中存在的相关知识来指导新的编译优化。
|
||||
|
||||
最后一个报告则是字节公司郑思泽研究员的《计算通信融合中的编译器设计》,该报告主要聚焦于如何实现在深度学习算子层的计算通信融合,这个报告主要由搞`MLIR`的同学听,我就摸鱼了。
|
||||
|
||||
### 智能终端操作系统OpenHarmony前沿研究
|
||||
|
||||
虽然名字叫作OpenHarmony,但是感觉内容实际上和鸿蒙系统没有什么太大的关系。
|
||||
|
||||
第一个报告是软件所武延军老师的《万物智联时代基础软件如何驯服碎片化》。报告的标题非常的高大上,但是实际上就讲了两件事情:(1)RISCV架构,或者说RISCV这个可扩展的思想,是解决架构碎片化的思路;(2)`openEular`系统可以作为系统软件适配的一个基线操作系统。总结一下,这其实就是一个广告,希望大家做基础软件的都来和大家一起做。
|
||||
|
||||
第二个报告是南京大学冯新宇老师的《基于仓颉语言的嵌入式DSL开发》,同时冯新宇老师也是仓颉语言的首席架构师。冯老师的这个报告主要聚焦于仓颉语言提供的嵌入式DSL能力,而嵌入式DSL这一设计范式已经在前端开发中展现了不俗的潜力。报告中介绍了嵌入式DSL出现的背景,仓颉中为了提供嵌入式DSL而引入的语法糖、仓颉提供的嵌入式DSL工具箱等。虽然仓颉语言是一个主要面向上层应用开发的语言,但是仓颉中丰富的DSL能力还是给异构编程模型的设计提供了不少的启发。而且目前在各种深度学习编译器中DSL的应用也非常广泛,例如`triton`。
|
||||
|
||||
![image-20241102212536635](./cncc-2024/image-20241102212536635.png)
|
||||
|
||||
第三个报告是在存算一体的芯片上做数据库的加速,第四个报告是OpenHarmony上`ArkTS`程序的静态分析,都没怎么听。
|
||||
|
||||
最后一个又是上交杜冬冬老师的报告,《面向下一代智能终端操作系统的渲染服务研究与挑战》。这是一个我感觉还挺有趣的报告,报告中介绍的主要背景是随着终端设备上屏幕刷新率的提高和操作系统动画变得更加精致复杂,用户会发现终端系统上的显示卡顿越来越多、越明显。这是因为目前的终端显示刷新机制是同步的,显示屏会按照当前刷新的频率从操纵系统中读取下一帧的画面,但是操作系统面对这越来越短的刷新时延和越来越复杂的动画常常不能按时把下一帧的画面渲染好。于是我们的杜冬冬老师就提出了一种动态、异步的渲染机制,考虑到系统中显示动画的时间还是占少部分的,于是就可以借用这些系统不繁忙的时间预先渲染(削峰填谷)。但是这种方式需要预知到系统后面会显示的内容,这使得这套技术只能在确定性的场景和部分简单交互场景下使用。
|
||||
|
||||
> 这里插入一个杜冬冬老师的八卦,杜老师改过一次名字,之前的名字是杜东(Dong Du),在查找论文的时候使用后面的名字会更好一些(在[IPADS](https://ipads.se.sjtu.edu.cn/zh/members/)和[dblp](https://dblp.org/pid/48/331-3.html)上面都还没有改过来)。
|
||||
|
||||
## 其他
|
||||
|
||||
首先我要锐评一下浙江省金华市东阳市横店镇。横店镇感觉完全没有为一个旅游目的地做过准备,虽然说镇子上面的酒店还是挺多的,但是不管是吃的还是玩的感觉都非常少。而且镇上的交通简直就是一坨,尤其是我们从酒店到会议举办地圆明新园的一段路,完全被大货车摧残的不成样子,在上面坐车堪比过山车。
|
||||
|
||||
然后我要锐评一下会议的举办地横店圆明新园。在去之前听说这里是1:1复刻了被八国联军烧毁的圆明园,结果去了才发现圆明新园分成春苑、夏苑和秋苑,其中春苑是复刻的圆明园,但是会议的举办地是在夏苑和秋苑,感觉有点的被诈骗了。夏苑里面只复刻了圆明园长春园的部分景观,比如海岳开襟、谐奇趣和大水法等,而且还增设了英、法、美、俄、日、德、意和奥等国的特色建筑,而会议就主要在这些特色建筑中进行,属实感觉有点奇怪了。
|
||||
|
||||
最后我要锐评一下CNCC会议。名义上看这个会议有涵盖数十个方向的130余场论坛,上万名注册参会者的大型会议,但是这个会议却选在了一个看上去基本上不适合召开大型会议的横店镇圆明新园。同时会议进行的非常寒酸,中午的午餐是横店提供给剧组的盒饭,在主会场发给我们之后只能自己端着吃,下午的茶歇更是少的可怜,除了第三天有好哥们分了我一块蛋挞,三天的茶歇我愣是一点都没见到(有可能是第三天的人最少,提高了我获得茶歇的概率)。
|
||||
|
BIN
YaeBlog/source/posts/cncc-2024/image-20241102211959206.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/cncc-2024/image-20241102211959206.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212355390.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212355390.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212536635.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212536635.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212738598.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/cncc-2024/image-20241102212738598.png
(Stored with Git LFS)
Normal file
Binary file not shown.
875
YaeBlog/source/posts/heterogeneous-programming-model.md
Normal file
875
YaeBlog/source/posts/heterogeneous-programming-model.md
Normal file
|
@ -0,0 +1,875 @@
|
|||
---
|
||||
title: 异构编程模型的昨天、今天与明天
|
||||
date: 2024-11-04T22:20:41.2571467+08:00
|
||||
tags:
|
||||
- 编译原理
|
||||
- 组会汇报
|
||||
---
|
||||
|
||||
|
||||
随着摩尔定律的逐渐失效,将CPU和其他架构的计算设备集成在片上或者通过高速总线互联构建的异构系统成为了高性能计算的主流。但是在这种系统中,上层应用的设计与实现面临着异构系统中各个设备之间体系结构差异过大、缺乏良好的异构抽象以及统一的编程接口和应用程序的优化难度大等困难。
|
||||
|
||||
异构并行编程模型便是解决这些编程和执行效率问题的解决方案。
|
||||
|
||||
<!--more-->
|
||||
|
||||
## 异构并行编程模型概述
|
||||
|
||||
异构并行编程模型是沟通上层应用和下层异构系统之间的桥梁,其的设计需要处理好下面五个问题:任务划分、任务映射、数据分布、同步和通信。
|
||||
|
||||
### 异构并行编程模型面临的技术挑战
|
||||
|
||||
异构并行编程模型面临的技术挑战主要是由两方面带来的:首先异构架构本身为编程模型带来的挑战,其次是上层应用带来的挑战。
|
||||
|
||||
异构并行编程模型需要解决的一个重要问题就是为上层应用的程序员提供一个合理的硬件平台抽象,使得其在编程是可以充分释放异构资源带来的计算能力,同时不需要考虑复杂的硬件细节。但是异构系统中各个计算设备在内部体系结构、设备间互联架构上的复杂性和多样性使得异构并行编程模型在提供建立统一的平台抽象上遇到了巨大的困难。具体来说,主要体现下述三点。
|
||||
|
||||
首先是异构系统中各个设备之间的并行计算能力不同。在同构的并行计算系统中,比如多核CPU中,虽然同一CPU的不同核之间、同一核的不同SIMD部件之间可以承担不同粒度的并行计算任务,但是其并行计算的能力是完全相同的。但是在一个典型的异构计算系统,例如CPU、GPU和FPGA组成的异构系统,不同设备的微架构具有本质差异,其并行计算的模式和能力都完全不同,设备之间的特长也完全不同。这种设备之间并行计算能力的差异使得系统中的任务划分和任务映射不再是均一的,而是具有显著的特异性。这种特点虽然也有利于表达实际应用的特点,但是却给异构并行计算模型的设计带来了巨大的困难。
|
||||
|
||||
![9eb06d8be92ddef3db33e040163c67a7.png](./heterogeneous-programming-model/9eb06d8be92ddef3db33e040163c67a7.png)
|
||||
|
||||
其次是异构系统中加速设备数据分布可配置、设备间数据通信渠道多样性给数据分布和通信带来的困难。在同构并行系统中,CPU片内的存储是对于软件透明的缓存架构,在片外则是一个共享内存模型,因此在这类系统中,数据仅可能分布在片外的共享存储中,具有存储位置单一的特点,也不需要进行显式的通信操作。但是在异构系统中,不仅在单个加速设备内部可能有软件可分配的快速局部存储,设备之间的连接方式差异也很大。目前,大多个加速设备都是通过PCIe总线的方式同CPU进行连接,这使得加速设备无法通过和CPU相同的方式完成地址映射,存在某一设备无法访问另一设备片外存储的问题。这使得异构系统中数据可以分布在CPU、加速设备的片外存储和加速设备的片内多层次局部存储等多个位置,不仅使得编程模型的数据分布问题变得十分复杂,设备间的通信文件也可能需要显式进行。
|
||||
|
||||
![eab553f9e30d8d866a1ddd201b5e4c85.png](./heterogeneous-programming-model/eab553f9e30d8d866a1ddd201b5e4c85.png)
|
||||
|
||||
最后是异构系统中多层次数据共享和多范围同步操作带来的同步困难问题。这也可以认为是上个数据同步问题带来的后继问题:在异构系统中数据可能分布在不同位置的条件下,同步操作需要在众多的位置上保证共享数据的一致性,这使得同步操作的范围变得十分复杂。同时,在一些特定的加速设备中,例如GPU,可能还会有局部的硬件同步机制,这更加提高了在异构系统的同步操作的设计和实现难度。
|
||||
|
||||
上层应用带来的挑战主要集中在缺少良好的异构抽象和统一的编程接口上。例如在CPU上进行编程时通常使用Java、Python等高级语言,而在进行GPU编程时则使用各种C语言的变体,其中的核心计算函数(Kernel Function)则通常只支持一个C语言的子集,而FPGA这些硬件设备又需要使用硬件描述语言进行编程。
|
||||
|
||||
### 异构并行编程接口和编译/运行时支持机制
|
||||
|
||||
异构并行编程接口是编程模型暴露给程序员使用的界面,它既需要为程序员提供合理的异构架构抽象,使程序员可以对异构计算资源加以合理利用,又需要保证接口的易用性,避免程序员陷入复杂的硬件细节中。编译/运行时系统是异构并行编程模型的软件工具层,它将程序员编写的加速器代码编译为可执行文件,并通过运行时系统完成任务的加速执行。
|
||||
|
||||
在任务划分、任务映射、数据分布、通信和同步这五个关键任务中,程序员往往只需要关注所编写应用程序的特点,因此显示的任务划分机制对应程序员来说可能是必不可少的,而其他的数据分布、通信和同步等任务只会加剧程序员开发应用程序的负担,但是这些任务通过接口暴露出来也为后续进行深度优化提供了空间。异构编译/运行时支持机制的主要任务就是保障任务映射,即明确任务将具体在哪个设备或者计算单元上执行,以何种顺序执行,同时在当程序员没有显式处理数据分布、通信和同步问题时进行自动处理并进行全系统级别的优化。
|
||||
|
||||
## 异构并行编程接口的研究
|
||||
|
||||
异构并行编程接口一般可以划分成两类:新设计的异构编程语言和现有语言的异构并行扩展。对于现有语言进行的异构并行扩展一般通过库(Library)或者是制导(Directive)的方法进行。
|
||||
|
||||
从异构并行编程接口的功能角度上来说也可以分成两类:有些接口屏蔽了较多的异构并行编程细节,通常仅给程序员提供显式异构任务划分的机制,而数据分布和通信、同步等的工作由运行时系统负责完成,也有些接口将多数异构系统的硬件细节通过上述机制暴露给程序员使用,这在给编程带来更大自由度的同时带来了使用上的困难。
|
||||
|
||||
![83ee1d254d638536d0fb4197ff63e758.png](./heterogeneous-programming-model/83ee1d254d638536d0fb4197ff63e758.png)
|
||||
|
||||
### 异构任务划分机制研究
|
||||
|
||||
在同构的并行编程语言中,并行编程接口需要提供一种面向单一设备的并行任务划分机制,这种并行任务划分机制有**任务并行**和**数据并行**等。数据并行指的是对源集合或者数组的元素同时执行相同操作的场景,一个数据并行的典型例子如下面计算两个矩阵的乘积:
|
||||
|
||||
```csharp
|
||||
static void MultiplyMatricesParallel(double[,] matA, double[,] matB, double[,] result)
|
||||
{
|
||||
int matACols = matA.GetLength(1);
|
||||
int matBCols = matB.GetLength(1);
|
||||
int matARows = matA.GetLength(0);
|
||||
|
||||
// A basic matrix multiplication.
|
||||
// Parallelize the outer loop to partition the source array by rows.
|
||||
Parallel.For(0, matARows, i =>
|
||||
{
|
||||
for (int j = 0; j < matBCols; j++)
|
||||
{
|
||||
double temp = 0;
|
||||
for (int k = 0; k < matACols; k++)
|
||||
{
|
||||
temp += matA[i, k] * matB[k, j];
|
||||
}
|
||||
result[i, j] = temp;
|
||||
}
|
||||
}); // Parallel.For
|
||||
}
|
||||
```
|
||||
|
||||
任务并行的概念一般是指一个或者多个独立的任务同时运行,是一种比数据并行更高的抽象层级。
|
||||
|
||||
```csharp
|
||||
public class Result
|
||||
{
|
||||
public static void Main()
|
||||
{
|
||||
Task<Double>[] taskArray = { Task<Double>.Factory.StartNew(() => DoComputation(1.0)),
|
||||
Task<Double>.Factory.StartNew(() => DoComputation(100.0)),
|
||||
Task<Double>.Factory.StartNew(() => DoComputation(1000.0)) };
|
||||
|
||||
var results = new Double[taskArray.Length];
|
||||
Double sum = 0;
|
||||
|
||||
for (int i = 0; i < taskArray.Length; i++) {
|
||||
results[i] = taskArray[i].Result;
|
||||
Console.Write("{0:N1} {1}", results[i],
|
||||
i == taskArray.Length - 1 ? "= " : "+ ");
|
||||
sum += results[i];
|
||||
}
|
||||
Console.WriteLine("{0:N1}", sum);
|
||||
}
|
||||
|
||||
private static Double DoComputation(Double start)
|
||||
{
|
||||
Double sum = 0;
|
||||
for (var value = start; value <= start + 10; value += .1)
|
||||
sum += value;
|
||||
|
||||
return sum;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
不论是高级或者是低级的异构并行编程接口都需要提供一种异构并行任务的划分机制。同传统的同构并行编程接口只需要提供面向单一设备的并行任务划分机制不同,异构并行编程接口还需要提供描述任务在不同设备间分配的机制。因此,异构并行编程接口的任务划分机制需要包括两个维度:异构特征描述和并行性表达两个维度。
|
||||
|
||||
一种典型异构任务划分机制是由`BrookGPU`编程语言提出的。该编程语言采用特殊函数`kernel`标记需要在GPU上执行的代码段,`kernel`函数必须作用在流上。这个流(Stream)在并行性表达方面表达了细粒度的数据并行。后面的OpenCL和CUDA在C语言的基础上提供了异构扩展,这种扩展的任务划分机制和`BrookGPU`的十分类似。但是OpenCL和CUDA在并行行表达的层面上支持了SPMD计算模型,这个`BrookGPU`编程语言采用的流式编程模型不同。OpenCL在数据并行之外还提供了任务并行的机制。
|
||||
|
||||
`Lime`则是一门完全新的异构并行编程语言,通过语言结构为程序提供了丰富的操作符用于任务的划分。同时在异构特征描述方面,`Lime`也没有任何显式的接口,同`BrookGPU`等一系列需要手动指定设备代码段的编程模型完全不同,这也是因为`Lime`采用了基于任务的并行划分方式。同时在任务并行之外,`Lime`也通过`MapReduce`操作符提供了中粒度的数据并行机制。
|
||||
|
||||
`Merge`还是一门新的异构并行编程语言,基于Intel提出的异构多核多线程系统编程环境`EXOCHI`。在并行性表达上,`Merge`使用`MapReduce`思想。而在异构特征描述方面,`Merge`则提供了一种成为平台变体(Target Variant)的机制,程序员需要为异构系统中的不同设备提供不同版本的代码实现。
|
||||
|
||||
### 异构数据分布和通信机制
|
||||
|
||||
异构数据分布和通信机制主要分成显式和隐式两种,其中`OpenCL/CUDA`等使用了显式的数据分布的通信机制,为程序员提供了丰富的异构数据分布与通信接口。而`Lime`和`Merge`等语言则使用了隐式机制,运行时系统代为完成这部分的工作。
|
||||
|
||||
采用显示异步数据分布和通信机制的主要问题是普通程序员一般无法充分利用这些接口获得性能上的提升。这通常使用因为加速设备通常采用了大量的硬件加速机制,例如GPU的全局内存访存合并机制,这使得程序员如果没有为数据分配合理的存储位置或者设定足够多的线程,会使得加速的效果大打折扣。因此出现了针对这类显式控制语言的优化方法,例如`CUDA-lite`,这个运行时允许程序元在CUDA程序中加入简单的制导语句,数据分布的相关工作使用`CUDA-lite`的运行时系统完成,降低了CUDA程序的编写难度。
|
||||
|
||||
![628804b3fe95d39013ff55ae84516d14.png](./heterogeneous-programming-model/Screenshot_20241016_214139.png)
|
||||
|
||||
总结一下,为了解决异构系统带来的问题,异构并行编程接口具有如下三个特点:
|
||||
- 异构任务划分机制在传统并行编程模型的基础上增加了"异构特征描述"的维度,用于描述任务在不同设备上的分配情况;
|
||||
- 异构数据分布和通知机制在传统并行编程模型的基础上增加了"设备内数据多层分布"和"设备间显式通信"接口;
|
||||
- 异构同步机制在传统并行编程模型的基础上增加了"设备间同步"的机制。
|
||||
|
||||
## 异步编译/运行时的研究
|
||||
|
||||
### 异构任务映射机制
|
||||
|
||||
异构编程/运行时系统的任务映射机制主要有两种:一类是直接映射,即独立完成并行任务向异构平台映射的工作,另一种是间接映射,即需要借助其他异构编译和运行时系统协助来完成部分任务映射工作。直接映射系统一般在运行时系统中实现,而间接映射通过源到源变换和是运行时分析相结合的方式实现。
|
||||
|
||||
![](./heterogeneous-programming-model/Screenshot_20241016_214939.png)
|
||||
|
||||
### 异构编译/运行时优化
|
||||
|
||||
与同构平台类似,异构编译/运行时优化有两条路径:
|
||||
|
||||
- 平台相关的优化,其核心是挖掘系统的硬件优势;
|
||||
- 应用导向的优化,其核心是实施特定领域的优化并解决应用的输入敏感问题。
|
||||
|
||||
在平台优化上,异构系统通常具有复杂且多变的硬件结构, 因此程序员仅负责编写正确实现程序功能的代码、由编译/运行时系统完成面向加速设备结构特点的优化是比较合理的方式, 这样也有利于程序在不同异构系统中获得良好的性能。
|
||||
|
||||
## 异构并行编程模型的研究方向
|
||||
|
||||
- 面向普通用户的异构并行编程接口
|
||||
- 面向多种加速设备的异构编译/运行时优化
|
||||
- 面向异构集群的异构并行编程模型
|
||||
|
||||
## 异构并行编程模型调研
|
||||
|
||||
为了调研各个异构并行编程模型的不同,使用不同的编程模型实现一个通用矩阵乘法算法,并通过计算`2048*2048`大小的矩阵乘法时间来比较各个模型的加速效果。
|
||||
|
||||
辅助计算的`Calculator`类如下所示:
|
||||
|
||||
```cpp
|
||||
#define MATRIX_SIZE 2048
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
|
||||
class Calculator
|
||||
{
|
||||
public:
|
||||
static void validate_matrix(const std::vector<std::vector<int>>& a, const std::vector<std::vector<int>>& b)
|
||||
{
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
if (a[i][j] != b[i][j])
|
||||
{
|
||||
std::cout << "Two matrix must be the same." << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::vector<int>> calculate(const std::string& method,
|
||||
const std::function<std::vector<std::vector<int>>(
|
||||
const std::vector<std::vector<int>>&,
|
||||
const std::vector<std::vector<int>>&)>& calculator) const
|
||||
{
|
||||
std::cout << "Calculator '" << method << "' start." << std::endl;
|
||||
const auto start_time = std::chrono::high_resolution_clock::now();
|
||||
const auto result = calculator(a, b);
|
||||
const auto end_time = std::chrono::high_resolution_clock::now();
|
||||
const auto span = end_time - start_time;
|
||||
|
||||
std::cout << "Calculator '" << method << "' end, time is " << std::chrono::duration_cast<
|
||||
std::chrono::milliseconds>(span).count() << " ms." << std::endl;
|
||||
|
||||
return result;
|
||||
}
|
||||
private:
|
||||
std::vector<std::vector<int>> a = initialize_matrix();
|
||||
std::vector<std::vector<int>> b = initialize_matrix();
|
||||
|
||||
static std::vector<std::vector<int>> initialize_matrix()
|
||||
{
|
||||
std::vector<std::vector<int>> matrix;
|
||||
std::random_device seed;
|
||||
std::ranlux48 engine(seed());
|
||||
std::uniform_int_distribution distribute(0, 102400);
|
||||
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
std::vector row(MATRIX_SIZE, 0);
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
row[j] = distribute(engine);
|
||||
}
|
||||
|
||||
matrix.emplace_back(row);
|
||||
}
|
||||
|
||||
return matrix;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
作为对比,一个使用CPU单线程计算的例子如下:
|
||||
|
||||
```cpp
|
||||
inline std::vector<int> cpuMatrixMultiply(
|
||||
const std::vector<int>& a,
|
||||
const std::vector<int>& b)
|
||||
{
|
||||
std::vector result(MATRIX_SIZE * MATRIX_SIZE, 0);
|
||||
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
int temp = 0;
|
||||
for (int k = 0; k < MATRIX_SIZE; k++)
|
||||
{
|
||||
// a[i][j] = a[i][k] * b[k][j] where k in (0..MATRIX_SIZE)
|
||||
temp += a[i * MATRIX_SIZE + k] * b[k * MATRIX_SIZE + j];
|
||||
}
|
||||
result[i * MATRIX_SIZE + j] = temp;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
### OpenMP
|
||||
|
||||
OpenMP是`Open MultiProcessing`的缩写,是一个使用编译器制导(Directives)来进行共享内存平行计算的框架,在C、C++和Fortran语言的并行编程中得到的了广泛的应用。OpenMP提供了一个简单而灵活的接口,让程序员能够充分释放多核和多处理器系统性能。
|
||||
|
||||
OpenMP从上面的介绍来看似乎并不是一个严格的异步并行编程模型,但是第一,OpenMP作为一个经典的并行编程框架,研究价值还是非常高的,其次在一些较新的OpenMP版本中其宣称也能利用NVIDIA GPU进行加速,似乎也能算是一个异构并行编程模型。
|
||||
|
||||
使用OpenMP进行并行加速的代码如下:
|
||||
|
||||
```C++
|
||||
std::vector<std::vector<int>> omp_matrix_multiply(
|
||||
const std::vector<std::vector<int>>& a,
|
||||
const std::vector<std::vector<int>>& b)
|
||||
{
|
||||
std::vector result(MATRIX_SIZE, std::vector(MATRIX_SIZE, 0));
|
||||
|
||||
#pragma omp parallel for shared(a, b, result) default(none)
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
int temp = 0;
|
||||
for (int k = 0; k < MATRIX_SIZE; k++)
|
||||
{
|
||||
temp += a[i][k] * b[k][j];
|
||||
}
|
||||
result[i][j] = temp;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
加速的结果如下:
|
||||
|
||||
| 运行方法 | 运行时间 | 比率 |
|
||||
| ------------ | -------- | ---- |
|
||||
| SingleThread | 21685 ms | 1.00 |
|
||||
| OpenMP | 2268 ms | 0.10 |
|
||||
|
||||
### CUDA
|
||||
|
||||
CUDA是NVIDIA公司设计的一套GPU加速应用程序的编程框架,是将NVIDIA GPU作为GPGPU使用的官方解决方案。
|
||||
|
||||
CUDA的异构编程接口是经典的Device-Host两元结构,程序员需要编写两部分代码,Device代码是实际运行在GPU上的逻辑部分,而Host代码则负责将数据从内存中复制到GPU上的显存和复制回来等准备工作,并负责以特定的参数调用GPU上的Device代码。
|
||||
|
||||
一个使用GPU的矩阵乘法程序如下所示:
|
||||
|
||||
```c++
|
||||
template <typename T>
|
||||
void check(T result, char const* const func, const char* const file, int const line)
|
||||
{
|
||||
if (result)
|
||||
{
|
||||
std::cerr << "CUDA error at " << file << ":" << line << "code = " << result << "(" << cudaGetErrorString(result)
|
||||
<< ") '" << func << "'" << std::endl;
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
#define checkCudaErrors(val) check((val), #val, __FILE__, __LINE__)
|
||||
|
||||
__global__ void cudaMatrixMultiply(const int* a, const int* b, int* c)
|
||||
{
|
||||
const int totalSize = MATRIX_SIZE * MATRIX_SIZE;
|
||||
int threadId = threadIdx.x + blockIdx.x * blockDim.x;
|
||||
|
||||
while (threadId < totalSize)
|
||||
{
|
||||
const int x = threadId / MATRIX_SIZE;
|
||||
const int y = threadId % MATRIX_SIZE;
|
||||
|
||||
int result = 0;
|
||||
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
result += a[x * MATRIX_SIZE + i] * b[i * MATRIX_SIZE + y];
|
||||
}
|
||||
|
||||
c[MATRIX_SIZE * x + y] = result;
|
||||
threadId += gridDim.x * blockDim.x;
|
||||
}
|
||||
|
||||
__syncthreads();
|
||||
}
|
||||
|
||||
std::vector<std::vector<int>> cudaCalculateMatrix(const std::vector<std::vector<int>>& a,
|
||||
const std::vector<std::vector<int>>& b)
|
||||
{
|
||||
constexpr unsigned int matrixSize = sizeof(int) * MATRIX_SIZE * MATRIX_SIZE;
|
||||
|
||||
// 在host上为a, b, c分配空间
|
||||
int *hostA, *hostB, *hostC;
|
||||
checkCudaErrors(cudaMallocHost(&hostA, matrixSize));
|
||||
checkCudaErrors(cudaMallocHost(&hostB, matrixSize));
|
||||
checkCudaErrors(cudaMallocHost(&hostC, matrixSize));
|
||||
|
||||
// 将数据复制到host上
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
hostA[i * MATRIX_SIZE + j] = a[i][j];
|
||||
hostB[i * MATRIX_SIZE + j] = b[i][j];
|
||||
}
|
||||
}
|
||||
|
||||
// 在device上分配空间
|
||||
int *deviceA, *deviceB, *deviceC;
|
||||
checkCudaErrors(cudaMalloc(reinterpret_cast<void**>(&deviceA), matrixSize));
|
||||
checkCudaErrors(cudaMalloc(reinterpret_cast<void**>(&deviceB), matrixSize));
|
||||
checkCudaErrors(cudaMalloc(reinterpret_cast<void**>(&deviceC), matrixSize));
|
||||
|
||||
cudaStream_t stream;
|
||||
|
||||
checkCudaErrors(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
|
||||
|
||||
// 将数据从host复制到device
|
||||
checkCudaErrors(cudaMemcpyAsync(deviceA, hostA, matrixSize, cudaMemcpyHostToDevice, stream));
|
||||
checkCudaErrors(cudaMemcpyAsync(deviceB, hostB, matrixSize, cudaMemcpyHostToDevice, stream));
|
||||
|
||||
constexpr int threadSize = 32 * 32;
|
||||
constexpr int grid = MATRIX_SIZE * MATRIX_SIZE / threadSize;
|
||||
|
||||
cudaEvent_t start, stop;
|
||||
cudaEventCreate(&start);
|
||||
cudaEventCreate(&stop);
|
||||
|
||||
cudaStreamSynchronize(stream);
|
||||
cudaEventRecord(start, stream);
|
||||
|
||||
cudaMatrixMultiply<<<grid, threadSize, 0, stream>>>(deviceA, deviceB, deviceC);
|
||||
|
||||
cudaEventRecord(stop, stream);
|
||||
cudaEventSynchronize(stop);
|
||||
|
||||
float cudaRunTime = 0;
|
||||
cudaEventElapsedTime(&cudaRunTime, start, stop);
|
||||
std::cout << "CUDA actual run time is " << cudaRunTime << " ms" << std::endl;
|
||||
|
||||
// 将数据从device复制到host
|
||||
cudaMemcpyAsync(hostC, deviceC, matrixSize, cudaMemcpyDeviceToHost, stream);
|
||||
cudaStreamSynchronize(stream);
|
||||
|
||||
std::vector<std::vector<int>> result;
|
||||
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
std::vector<int> row;
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
row.emplace_back(hostC[i * MATRIX_SIZE + j]);
|
||||
}
|
||||
result.emplace_back(row);
|
||||
}
|
||||
|
||||
// 释放内存
|
||||
cudaFreeHost(hostA);
|
||||
cudaFreeHost(hostB);
|
||||
cudaFreeHost(hostC);
|
||||
cudaFree(deviceA);
|
||||
cudaFree(deviceB);
|
||||
cudaFree(deviceC);
|
||||
cudaEventDestroy(start);
|
||||
cudaEventDestroy(stop);
|
||||
cudaStreamDestroy(stream);
|
||||
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
加速的结果如下所示:
|
||||
|
||||
| 类型 | 运行时间 | 比率 |
|
||||
| ---- | -------- | ----- |
|
||||
| CPU | 22059ms | 1.000 |
|
||||
| GPU | 32ms | 0.001 |
|
||||
|
||||
需要注意的是,上面编写的CUDA代码还没有完全利用GPU的并行计算能力。
|
||||
|
||||
> 这里我遇到的一个非常奇怪的问题是,相同的CPU计算代码,在运行完OpenMP测试之后再运行就会比在CUDA运行之后再运行慢上一倍,而且可复现性极高。这里我给出一个典型的运行时间比较:CUDA计算的时间是323毫秒,CUDA之后的CPU计算时间是38602毫秒,OpenMP的计算时间是8721毫秒,OpenMP之后的计算时间是76598毫秒。
|
||||
>
|
||||
> 针对这个比较奇怪的情况我觉得可以做出三个猜想:
|
||||
>
|
||||
> - 考虑到我使用的CPU是Intel的i7-13600K,这是一个有性能核和效率核组成的大小核异构系统,可能在两次计算的过程中调度到了不同的核上;
|
||||
> - 在进行CUDA计算的过程中提高了缓存的亲和性;
|
||||
> - 在测试中没有设计热身(Warm up)的过程,而在CUDA计算的过程中部分起到了这个作用。
|
||||
>
|
||||
> 针对上面三个猜测做个两个实验:
|
||||
>
|
||||
> - 首先是换了一台没有大小核异构设计的计算机进行实验,发现这下两次使用CPU计算的时间差异不大;
|
||||
> - 加上了热身的阶段之后,计算时间没有发生明显的变化。
|
||||
>
|
||||
> 综上所述,可以认为此现象和异构CPU之间存在着明显的关联,但是缺乏直接证据。
|
||||
>
|
||||
> 在我们调整了矩阵的数据布局之后,这里提到的实验结果又发生了变化。上面的实验结果是使用二维数据存储矩阵得到的,而在修改为使用一维数组(也就是现在提供的代码)之后,相同的CPU计算代码的计算时间又没有产生明显的变化了。看来这个问题可能和数据布局、CPU缓存等问题相关。
|
||||
|
||||
### OpenCL
|
||||
|
||||
OpenCL是目前最为典型、发展最好的异构并行编程模型,毕竟其在官网的第一句话就是“为异构系统中并行编程的开放标准“。
|
||||
|
||||
![image-20241020142938110](./heterogeneous-programming-model/image-20241020142938110.png)
|
||||
|
||||
从上图的OpenCL工作原理中可以看出,OpenCL和CUDA类似,也采用了Device-Host类型的编程接口。主机代码通常通过普通的C/C++代码进行编写,编译之后在CPU上执行,而设备代码使用一个特定的C语言方言OpenCL C进行编写,这个方言针对并行编程进行了扩展,并提供了一系列封装好的数学计算函数。
|
||||
|
||||
设备代码上的编译方法有两种:在线编译和离线编译。其中在线编译就是指在程序运行时由对应设备厂商开发的OpenCL驱动将设备代码编译为在对应设备上运行的可执行代码,离线编译则有两种表现形式,第一种是在线编译的扩展版,由驱动编译得到的可执行程序可以通过API获取并保存下来,当下一需要在同一设备上调用时可以直接使用而不是再次编译,第二种则是完全独立的编译过程,在OpenCL程序运行之前使用单独的编译工具编译得到可执行文件。
|
||||
|
||||
![image-20241020155656219](./heterogeneous-programming-model/image-20241020155656219.png)
|
||||
|
||||
在提出离线编译之后,为了让驱动编译好的二进制文件可以在不同的设备之间复用,同时也是支持更为丰富的编译器生态系统,OpenCL的提出者Khronos设计了一种跨设备的、可迁移的中间表示形式[SPIRV](https://www.khronos.org/spir/)。这种中间形式的提出使得编程语言的提出者、编译器的开发人员可以直接将语言编译为`SPIRV`内核,这样就可以在任何支持`SPIRV`的OpenCL驱动上运行。下面将会介绍的`SYCL`和`Julia`语言都是基于`SPIRV`的中间语言进行构建的。`SPIRV`中间语言的提出也扩展了可以支持`OpenCL`的设备范围,现在已经有开发者和公司在探索将`SPIRV`编译到`Vulkan`、`DirectX`和`Metal`等传统意义上的图形API。
|
||||
|
||||
下面是一个使用OpenCL进行矩阵计算的例子。
|
||||
|
||||
```cpp
|
||||
struct ComputationContext
|
||||
{
|
||||
cl_platform_id platform;
|
||||
cl_device_id device;
|
||||
};
|
||||
|
||||
static std::unique_ptr<ComputationContext> selectDevice()
|
||||
{
|
||||
cl_uint platformCount;
|
||||
checkOpenCLError(clGetPlatformIDs(0, nullptr, &platformCount));
|
||||
std::cout << "Platform count: " << platformCount << std::endl;
|
||||
|
||||
std::vector<cl_platform_id> platforms(platformCount);
|
||||
checkOpenCLError(clGetPlatformIDs(platformCount, platforms.data(), nullptr));
|
||||
|
||||
std::unique_ptr<ComputationContext> selectedDevice = nullptr;
|
||||
|
||||
for (const auto& platform : platforms)
|
||||
{
|
||||
cl_uint deviceCount = 0;
|
||||
checkOpenCLError(clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, 0, nullptr, &deviceCount));
|
||||
|
||||
std::vector<cl_device_id> devices(deviceCount);
|
||||
checkOpenCLError(clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, deviceCount, devices.data(), nullptr));
|
||||
|
||||
for (const auto& device : devices)
|
||||
{
|
||||
size_t deviceNameLength;
|
||||
checkOpenCLError(clGetDeviceInfo(device, CL_DEVICE_NAME, 0, nullptr, &deviceNameLength));
|
||||
|
||||
std::vector<char> deviceNameArray(deviceNameLength);
|
||||
checkOpenCLError(
|
||||
clGetDeviceInfo(device, CL_DEVICE_NAME, deviceNameLength, deviceNameArray.data(), nullptr));
|
||||
|
||||
std::string deviceName(deviceNameArray.data(), deviceNameArray.size() - 1);
|
||||
|
||||
std::cout << "Found device: " << deviceName << std::endl;
|
||||
|
||||
if (deviceName.find("4060") != std::string::npos)
|
||||
{
|
||||
std::cout << "Select device '" << deviceName << "' as runner." << std::endl;
|
||||
selectedDevice = std::make_unique<ComputationContext>();
|
||||
selectedDevice->platform = platform;
|
||||
selectedDevice->device = device;
|
||||
}
|
||||
else
|
||||
{
|
||||
clReleaseDevice(device);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (selectedDevice == nullptr)
|
||||
{
|
||||
std::cout << "Failed to find the target device." << std::endl;
|
||||
std::exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
return selectedDevice;
|
||||
}
|
||||
|
||||
std::vector<int> clCalculateMatrix(const std::vector<int>& a,
|
||||
const std::vector<int>& b)
|
||||
{
|
||||
cl_int error;
|
||||
|
||||
const std::unique_ptr<ComputationContext> computationContext = selectDevice();
|
||||
// A key-value list ends with 0
|
||||
// See also https://www.khronos.org/registry/OpenCL/specs/3.0-unified/html/OpenCL_API.html#context-properties-table
|
||||
std::array<cl_context_properties, 3> properties = {
|
||||
CL_CONTEXT_PLATFORM,
|
||||
reinterpret_cast<cl_context_properties>(computationContext->platform),
|
||||
0
|
||||
};
|
||||
|
||||
cl_context context = clCreateContext(properties.data(), 1, &computationContext->device, nullptr, nullptr,
|
||||
&error);
|
||||
checkOpenCLError(error);
|
||||
cl_command_queue queue = clCreateCommandQueueWithProperties(context, computationContext->device, nullptr,
|
||||
&error);
|
||||
checkOpenCLError(error);
|
||||
|
||||
std::vector result(MATRIX_SIZE * MATRIX_SIZE, 0);
|
||||
constexpr size_t matrixSize = MATRIX_SIZE * MATRIX_SIZE * sizeof(int);
|
||||
|
||||
cl_mem deviceA = clCreateBuffer(context, CL_MEM_READ_ONLY, matrixSize, nullptr, &error);
|
||||
checkOpenCLError(error);
|
||||
cl_mem deviceB = clCreateBuffer(context, CL_MEM_READ_ONLY, matrixSize, nullptr, &error);
|
||||
checkOpenCLError(error);
|
||||
cl_mem deviceC = clCreateBuffer(context, CL_MEM_READ_WRITE, matrixSize, nullptr, &error);
|
||||
checkOpenCLError(error);
|
||||
|
||||
checkOpenCLError(
|
||||
clEnqueueWriteBuffer(queue, deviceA, CL_TRUE, 0, matrixSize, a.data(), 0, nullptr,
|
||||
nullptr));
|
||||
checkOpenCLError(
|
||||
clEnqueueWriteBuffer(queue, deviceB, CL_TRUE, 0, matrixSize, b.data(), 0, nullptr,
|
||||
nullptr));
|
||||
// Copy result to erase the previous result
|
||||
checkOpenCLError(
|
||||
clEnqueueWriteBuffer(queue, deviceC, CL_TRUE, 0, matrixSize, result.data(), 0,
|
||||
nullptr, nullptr
|
||||
));
|
||||
|
||||
auto source = R"(
|
||||
#define MATRIX_SIZE 2048
|
||||
|
||||
__kernel void calculate(const __global int* a, const __global int* b, __global int* c)
|
||||
{
|
||||
const int x = get_global_id(0);
|
||||
const int y = get_global_id(1);
|
||||
|
||||
int result = 0;
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
result += a[x * MATRIX_SIZE + i] * b[i * MATRIX_SIZE + y];
|
||||
}
|
||||
|
||||
c[x * MATRIX_SIZE + y] = result;
|
||||
})";
|
||||
|
||||
cl_program program = clCreateProgramWithSource(context, 1, &source, nullptr, &error);
|
||||
checkOpenCLError(error);
|
||||
checkOpenCLError(clBuildProgram(program, 0, nullptr, "", nullptr, nullptr));
|
||||
|
||||
size_t messageSize;
|
||||
checkOpenCLError(
|
||||
clGetProgramBuildInfo(program, computationContext->device, CL_PROGRAM_BUILD_LOG, 0, nullptr, &messageSize));
|
||||
std::vector<char> messageArray(messageSize);
|
||||
checkOpenCLError(
|
||||
clGetProgramBuildInfo(program, computationContext->device, CL_PROGRAM_BUILD_LOG, messageSize, messageArray.data(
|
||||
), nullptr));
|
||||
std::string message(messageArray.data(), messageSize - 1);
|
||||
std::cout << "Build log: " << message << std::endl;
|
||||
|
||||
cl_kernel kernel = clCreateKernel(program, "calculate", &error);
|
||||
checkOpenCLError(error);
|
||||
|
||||
checkOpenCLError(clSetKernelArg(kernel, 0, sizeof(cl_mem), &deviceA));
|
||||
checkOpenCLError(clSetKernelArg(kernel, 1, sizeof(cl_mem), &deviceB));
|
||||
checkOpenCLError(clSetKernelArg(kernel, 2, sizeof(cl_mem), &deviceC));
|
||||
|
||||
cl_event event;
|
||||
constexpr std::size_t globalSize[2] = {MATRIX_SIZE, MATRIX_SIZE};
|
||||
checkOpenCLError(clEnqueueNDRangeKernel(queue, kernel, 2, nullptr,
|
||||
globalSize, nullptr, 0, nullptr, &event));
|
||||
|
||||
checkOpenCLError(clWaitForEvents(1, &event));
|
||||
|
||||
checkOpenCLError(
|
||||
clEnqueueReadBuffer(queue, deviceC, CL_TRUE, 0, matrixSize, result.data(), 0,
|
||||
nullptr, nullptr));
|
||||
|
||||
clReleaseMemObject(deviceA);
|
||||
clReleaseMemObject(deviceB);
|
||||
clReleaseMemObject(deviceC);
|
||||
|
||||
clReleaseKernel(kernel);
|
||||
clReleaseProgram(program);
|
||||
clReleaseCommandQueue(queue);
|
||||
clReleaseContext(context);
|
||||
clReleaseDevice(computationContext->device);
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
从上面的代码中可以看出两点:
|
||||
|
||||
- OpenCL的编程比CUDA的更为繁琐,因为OpenCL支持的设备种类更多,在主机代码上还需要多出一块选择运行设备的代码;
|
||||
- OpenCL在主机代码和核函数的解耦更为彻底,核函数直接以字符串的形式存在于主机代码中,而各个厂商提供的驱动才是真正的编译器。
|
||||
|
||||
测试的运行结果如下:
|
||||
|
||||
| 类型 | 运行时间 | 比率 |
|
||||
| ----------------------------- | -------- | ---- |
|
||||
| NVIDIA 4060 Ti OpenCL | 173ms | 0.01 |
|
||||
| Intel UHD Graphics 770 OpenCL | 1020ms | 0.04 |
|
||||
| CPU | 21255ms | 1.00 |
|
||||
|
||||
### SYCL
|
||||
|
||||
SYCL是一个使用标准C++编写在各种异构计算设备上运行核函数的抽象层,并提供了一套新的API来查找各种设备并管理这些设备上的内存资源和代码执行。这个标准是开发、无版税、跨平台的抽象标准。同时也是因为这是一个**标准**,因此需要寻找支持这个标准的编译器才能使用这个标准。按照官网上的说明,我们选择了两个看上去还在活跃开发的项目,Intel的[oneAPI](https://www.intel.com/content/www/us/en/developer/tools/oneapi/overview.html)和开源的[AdaptiveCpp](https://github.com/AdaptiveCpp/AdaptiveCpp)进行调研,考虑到在后文中还将继续介绍oneAPI相关的工作,因此这里将重点放在AdaptiveCpp上。
|
||||
|
||||
AdaptiveCpp由四个部分组成,分别在不同的C++命名空间中提供。
|
||||
|
||||
- SYCL Interface:实现了SYCL标准中规定的各种类和函数,是实际上同用户交互的接口。这些接口实际上可以仍然可以分成主机API和核函数库两个部分。主机API是普通的C++代码,负责任务调度、任务管理和平台射别管理等。核函数库包括了这种在编写核函数时可以使用的类和函数,这些接口暴露一些后端特定的功能,其中的一些甚至需要使用后端特定的方言来编写,例如CUDA。
|
||||
|
||||
- AdaptiveCpp Runtime:运行时实际上实现了设备调度、任务图管理和执行、数据管理、后端管理、任务调度和同步等等功能,运行时负责同各种支持后端的运行时交互来实现上述的功能。
|
||||
|
||||
![image-20241029123308139](./heterogeneous-programming-model/image-20241029123308139.png)
|
||||
|
||||
- Compiler:考虑到在用户编写的代码中可能使用一些特定后端的方言,因此普通的C++编译器无法正常编译所有的用户代码。因此用户代码的编译是通过一个名为`acpp`的Python脚本驱动的,这个脚本将各个后端的不同编译器暴露为一个统一的编程接口。
|
||||
|
||||
- Glue:将上述的各个部分连接在一起的胶水代码。一种典型的胶水代码是内核函数的启动代码`kernel launcher`,由于启动器中往往涉及到一些后端特定的方言,例如CUDA中的`<<<>>>`或者OpenMP中的各种`pragma`,因此这些代码通常需要使用特定的编译器进行编译,所以这些胶水代码直接以头文件的方式提供,以方便在编译时被特定的编译器处理。这些胶水代码将会把核函数包裹为一个合法的C++函数对象,这样运行时就可以获得这个函数对象并控制代码在设备上的运行。
|
||||
|
||||
AdaptiveCpp同时支持多种不同的编译流程。
|
||||
|
||||
1. 一种通用的一遍编译流程,将核函数编译到一种统一的中间表示形式,这种中间表示形式将在运行时被编译到特定的后端架构上。这种编译流程提供了高度的可移植性和较快的编译速度。这种编译设施支持的后端有:通过`PTX`在NVIDIA的GPU上运行,通过`amdgcn`在AMD的GPU上运行,通过`SPIR-V`在Intel的GPU上运行,通过`SPIR-V`在任何支持OpenCL驱动的设备上运行,也可以通过LLVM直接在CPU上运行。
|
||||
2. 一种为互操作性优化的多遍编译流程,在这个流程中AdaptiveCpp将聚合现有的各种LLVM/Clang的编译工具链,使得用户可以在单个代码文件中混合编写SYCL和各种特定的编程模型,例如CUDA和HIP。使用这个编译流程的好处有亮点:(1)在这种编译流程中可以直接在SYCL代码使用各个特定编译模型中提供最新设备内部优化(Intrinsics),不用等待SYCL标准的支持;(2)在这种编译流程中可以使用各个厂商提供的优化模板库,例如`rocPRIM`和`CUB`。这种编译流程是提供聚合`CUDA`的clang前端和`ROCm`的clang前端来实现的。
|
||||
3. 一种只将AdaptiveCpp作为函数使用的编程流程。在这种情况AdaptiveCpp作为一个三方库被引入其他的编译器编译流程中。
|
||||
|
||||
第一种通用的编译流程显然是泛用性最广的一种编译流程,同时也是AdaptiveCpp推荐的编译流程。
|
||||
|
||||
![image-20241029163654675](./heterogeneous-programming-model/image-20241029163654675.png)
|
||||
|
||||
下面是一段使用SYCL进行矩阵乘法加速的代码:
|
||||
|
||||
```cpp
|
||||
struct CustomDeviceSelector
|
||||
{
|
||||
explicit CustomDeviceSelector(std::string vendorName) : _vendorName(std::move(vendorName))
|
||||
{
|
||||
}
|
||||
|
||||
int operator()(const sycl::device& d) const
|
||||
{
|
||||
int deviceRating = 0;
|
||||
|
||||
if (d.is_gpu() && d.get_info<sycl::info::device::name>().find(_vendorName) != std::string::npos)
|
||||
{
|
||||
deviceRating = 3;
|
||||
}
|
||||
else if (d.is_cpu())
|
||||
{
|
||||
deviceRating = 1;
|
||||
}
|
||||
|
||||
return deviceRating;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string _vendorName;
|
||||
};
|
||||
|
||||
static std::vector<int> syclCalculateMatrix(const std::vector<int>& a, const std::vector<int>& b,
|
||||
const std::string& hint)
|
||||
{
|
||||
const CustomDeviceSelector selector(hint);
|
||||
sycl::queue queue(selector);
|
||||
|
||||
const std::string deviceName = queue.get_device().get_info<sycl::info::device::name>();
|
||||
std::cout << "Select device: " << deviceName << std::endl;
|
||||
|
||||
std::vector result(MATRIX_SIZE * MATRIX_SIZE, 0);
|
||||
|
||||
sycl::buffer aBuffer(a);
|
||||
sycl::buffer bBuffer(b);
|
||||
sycl::buffer resultBuffer(result);
|
||||
|
||||
queue.submit([&](sycl::handler& h)
|
||||
{
|
||||
const sycl::accessor aBufferAccessor(aBuffer, h, sycl::read_only);
|
||||
const sycl::accessor bBufferAccessor(bBuffer, h, sycl::read_only);
|
||||
const sycl::accessor resultBufferAccessor(resultBuffer, h, sycl::write_only);
|
||||
|
||||
h.parallel_for(sycl::nd_range<2>({MATRIX_SIZE, MATRIX_SIZE}, {16, 16}), [=](const sycl::nd_item<2>& item)
|
||||
{
|
||||
const size_t x = item.get_global_id(0);
|
||||
const size_t y = item.get_global_id(1);
|
||||
|
||||
int temp = 0;
|
||||
for (size_t k = 0; k < MATRIX_SIZE; ++k)
|
||||
{
|
||||
temp += aBufferAccessor[x * MATRIX_SIZE + k] * bBufferAccessor[k * MATRIX_SIZE + y];
|
||||
}
|
||||
resultBufferAccessor[x * MATRIX_SIZE + y] = temp;
|
||||
});
|
||||
});
|
||||
|
||||
sycl::host_accessor resultHostAccessor(resultBuffer, sycl::read_only);
|
||||
|
||||
for (size_t i = 0; i < MATRIX_SIZE; ++i)
|
||||
{
|
||||
for (size_t j = 0; j < MATRIX_SIZE; ++j)
|
||||
{
|
||||
result[i * MATRIX_SIZE + j] = resultHostAccessor[i * MATRIX_SIZE + j];
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
测试之后的运行结果如下所示:
|
||||
|
||||
| 类型 | 运行时间 | 比率 |
|
||||
| --------------------------- | -------- | ----- |
|
||||
| Intel UHD Graphics 770 SYCL | 488ms | 0.023 |
|
||||
| NVIDIA 4060 Ti SYCL | 180ms | 0.008 |
|
||||
| OpenMP SYCL | 1591ms | 0.076 |
|
||||
| CPU | 20930ms | 1.000 |
|
||||
|
||||
### OpenACC
|
||||
|
||||
OpenACC是一个通过编译器制导来在代码中表达并行性并利用并行编译器为多个并行加速器生成代码的编程模型。为了保证OpenACC可以适配于各种计算架构的加速设备,OpenACC设计了一个各种并行层次和有着不同速度和寻址方式内存的编程模型。同时OpenACC主要的功能即是支持同时将计算和数据卸载到一个加速设备上,考虑到加速设备可能有着同宿主设备完全不同的内存架构,OpenACC编译器和运行时将会自动分析代码并负责加速器上内存的管理和加速器和主机之间的数据传输。
|
||||
|
||||
作为一个高等级、平台独立的加速器编程框架,使用OpenACC进行开发能够使开发人员将一个源代码编译到一系列设备上运行并实现一个相对较好的性能,但是这个简易性和移植性也在一定程度上造成使用OpenACC编程无法完全利用加速设备上的算力。
|
||||
|
||||
OpenACC是作为一个标准的形式提供的,实现了该标准的编译器有:
|
||||
|
||||
| 编译器名称 | 情况 |
|
||||
| ------------------------------------------------------------ | ------------------------------------------------------------ |
|
||||
| NVIDIA HPC SDK | 支持在NVIDIA GPU和多核CPU上的OpenACC并行编程 |
|
||||
| Sourcery CodeBench Lite | OpenACC官网上说支持针对AMD GPU的编译,但是官网页面似乎改版了,没有找到相关的内容 |
|
||||
| GCC 12 | 支持到OpenACC 2.6 |
|
||||
| [Omni Compiler Project](https://github.com/omni-compiler/omni-compiler) | 源到源编译器,将带有制导的源代码翻译到带有运行时调用的平台代码,近两年没有活跃开发 |
|
||||
| [OpenUH](https://github.com/uhhpctools/openuh) | 项目开发者在7年前的最后一次提交了中删除了README中有关OpenACC的内容 |
|
||||
| [OpenArc](https://csmd.ornl.gov/project/openarc-open-accelerator-research-compiler) | 是学术界出品的还在活跃开发的编译器,看上去还做了不少工作的样子,就是OpenACC官网上的链接已经失效了找起来比较麻烦,而且宣称是一个开源编译器,但是获取源代码和二进制文件需要联系他们(美国橡树岭国家实验室)创建账户,这看去对于我们这些Foreign Adversary有些抽象了。 |
|
||||
|
||||
在试验OpenACC时遇到了巨大的困难,不论是使用gcc还是NVIDIA HPC SDK都没有办法实现明显的并行编程加速,多次实验之后都没有找到的问题的所在。这里还是贴一下实验的代码和实验的数据。
|
||||
|
||||
实验中编写的OpenACC加速代码如下:
|
||||
|
||||
```cpp
|
||||
static std::vector<int> OpenACCCpuCalculateMatrix(const std::vector<int>& a, const std::vector<int>& b)
|
||||
{
|
||||
constexpr int length = MATRIX_SIZE * MATRIX_SIZE;
|
||||
|
||||
const auto aBuffer = new int[length];
|
||||
const auto bBuffer = new int[length];
|
||||
const auto cBuffer = new int[length];
|
||||
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
aBuffer[i] = a[i];
|
||||
bBuffer[i] = b[i];
|
||||
cBuffer[i] = 0;
|
||||
}
|
||||
|
||||
#pragma acc enter data copyin(aBuffer[0:length], bBuffer[0:length])
|
||||
#pragma acc enter data create(bBuffer[0:length])
|
||||
#pragma acc data present(aBuffer[0:length], bBuffer[0:length], cBuffer[0:length])
|
||||
{
|
||||
#pragma acc kernels loop independent
|
||||
for (int i = 0; i < MATRIX_SIZE; i++)
|
||||
{
|
||||
#pragma acc loop independent
|
||||
for (int j = 0; j < MATRIX_SIZE; j++)
|
||||
{
|
||||
int temp = 0;
|
||||
#pragma acc loop independent reduction(+:temp)
|
||||
for (int k = 0; k < MATRIX_SIZE; k++)
|
||||
{
|
||||
temp += aBuffer[i * MATRIX_SIZE + k] * bBuffer[k * MATRIX_SIZE + j];
|
||||
}
|
||||
cBuffer[i * MATRIX_SIZE + j] = temp;
|
||||
}
|
||||
}
|
||||
}
|
||||
#pragma acc exit data copyout(cBuffer[0:length])
|
||||
#pragma acc exit data delete(aBuffer[0:length], bBuffer[0:length])
|
||||
|
||||
std::vector result(MATRIX_SIZE * MATRIX_SIZE, 0);
|
||||
|
||||
for (int i = 0; i < length; ++i)
|
||||
{
|
||||
result[i] = cBuffer[i];
|
||||
}
|
||||
|
||||
delete[] aBuffer;
|
||||
delete[] bBuffer;
|
||||
delete[] cBuffer;
|
||||
|
||||
return result;
|
||||
}
|
||||
```
|
||||
|
||||
实验中使用分别使用`NVIDIA HPC SDK`和`GCC`编译运行的结果如下:
|
||||
|
||||
| 编译器 | 类型 | 运行时间 |
|
||||
| -------------- | ------- | -------- |
|
||||
| NVIDIA HPC SDK | OpenACC | 19315ms |
|
||||
| NVIDIA HPC SDK | CPU | 22942ms |
|
||||
| GCC | OpenACC | 19999ms |
|
||||
| GCC | CPU | 22623ms |
|
||||
|
||||
### oneAPI
|
||||
|
||||
oneAPI是Intel公司提出的一套异构并行编程框架,该框架致力于达成如下几个目标:(1)定义一个跨架构、跨制造商的统一开放软件平台;(2)允许同一套代码可以在不同硬件制造商和加速技术的硬件上运行;(3)提供一套覆盖多个编程领域的库API。为了实现这些目标,oneAPI同上文中已经提到过的开放编程标准SYCL紧密合作,oneAPI也提供了一个SYCL的编译器和运行时;同时oneAPI也提供了一系列API库,包括`oneDPL`、`oneDNN`、`oneTBB`和`oneMKL`等。
|
||||
|
||||
![image-20241103162259981](./heterogeneous-programming-model/image-20241103162259981.png)
|
||||
|
||||
我对于oneAPI的理解就是Intel用来对标NVIDIA的CUDA的一套高性能编程工具箱。首先为了和NVIDIA完全闭源的CUDA形成鲜明的对比,Intel选择了OpenCL合作同时开发SYCL,当时也有可能是Intel知道自己的显卡技不如人,如果不兼容市面上其他的部件是没有出路的,同时为了和CUDA丰富的生态竞争,Intel再开发并开源了一系列的`oneXXX`。
|
||||
|
||||
这里我就把上面SYCL写的例子用Intel提供的`DPC++`编译运行一下,看看在效率上会不会有所变化。
|
||||
|
||||
| 类型 | 运行时间 | 比率 |
|
||||
| ----------------------------- | -------- | ----- |
|
||||
| Intel UHD Graphics 770 oneAPI | 429ms | 0.023 |
|
||||
| NVIDIA 4060 Ti oneAPI | 191ms | 0.010 |
|
||||
| Intel i5-13600K oneAPI | 198ms | 0.011 |
|
||||
| CPU | 18643ms | 1.000 |
|
||||
|
||||
在显卡上的计算时间没有明显的变化,但是我们Intel的编译器却在选择到使用Intel CPU进行计算时展现了不俗的实力。
|
||||
|
||||
|
||||
## 参考文献
|
||||
|
||||
1. 刘颖,吕方,王蕾,陈莉,崔慧敏,冯晓兵.异构并行编程模型研究与进展.软件学报,2014,25(7):1459-1475. [http://www.jos.org.cn/1000-9825/4608.htm](http://www.jos.org.cn/1000-9825/4608.htm)
|
||||
2. AdaptiveCpp官方文档. [https://adaptivecpp.github.io/AdaptiveCpp/](https://adaptivecpp.github.io/AdaptiveCpp/)
|
||||
3. Exploring the performance of SGEMM in OpenCL on NVIDIA GPUs. [https://github.com/CNugteren/myGEMM](https://github.com/CNugteren/myGEMM)
|
||||
4. OpenACC Programming and Best Practices Guide. [https://openacc-best-practices-guide.readthedocs.io/en/latest/01-Introduction.html](https://openacc-best-practices-guide.readthedocs.io/en/latest/01-Introduction.html)
|
||||
5. oneAPI What is it?. [https://www.intel.com/content/www/us/en/developer/articles/technical/oneapi-what-is-it.html](https://www.intel.com/content/www/us/en/developer/articles/technical/oneapi-what-is-it.html)
|
||||
|
BIN
YaeBlog/source/posts/heterogeneous-programming-model/83ee1d254d638536d0fb4197ff63e758.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/83ee1d254d638536d0fb4197ff63e758.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/9eb06d8be92ddef3db33e040163c67a7.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/9eb06d8be92ddef3db33e040163c67a7.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/Screenshot_20241016_214139.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/Screenshot_20241016_214139.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/Screenshot_20241016_214939.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/Screenshot_20241016_214939.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/eab553f9e30d8d866a1ddd201b5e4c85.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/eab553f9e30d8d866a1ddd201b5e4c85.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241020142938110.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241020142938110.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241020155656219.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241020155656219.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241029123308139.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241029123308139.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241029163654675.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241029163654675.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241103162259981.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/heterogeneous-programming-model/image-20241103162259981.png
(Stored with Git LFS)
Normal file
Binary file not shown.
65
YaeBlog/source/posts/rust-drop-stack-overflow.md
Normal file
65
YaeBlog/source/posts/rust-drop-stack-overflow.md
Normal file
|
@ -0,0 +1,65 @@
|
|||
---
|
||||
title: 内存栈被Rust自动生成的Drop函数塞满了
|
||||
date: 2024-11-05T20:36:07.3930374+08:00
|
||||
tags:
|
||||
- Rust
|
||||
- 技术笔记
|
||||
---
|
||||
|
||||
这辈子就是被Rust编译器害了.jpg
|
||||
|
||||
<!--more-->
|
||||
|
||||
最近在用Rust写一个[Sysy](https://gitlab.eduxiji.net/csc1/nscscc/compiler2022/-/blob/master/SysY2022%E8%AF%AD%E8%A8%80%E5%AE%9A%E4%B9%89-V1.pdf)语言的编译器,但是在实现完语法分析之后针对官方提供的测试用例进行测试时遇到的一个抽象的栈溢出报错。
|
||||
|
||||
事情是这样的,当我实现完`Sysy`语言的语法分析器并编写了一些白盒测试用例之后,我便打算将官方提供的100个测试用例作为输入运行看看能不能**正常**的解析成抽象语法树(显然不可能手动检查生成的抽象语法树是否正确)。我首先在`main.rs`里面实现了读取所有的`.sy`文件,进行词法分析和语法分析的逻辑,程序在这里这正常的识别了大多数的输入文件,在一些浮点数的输入上还存在一些问题。于是我便打算将这些逻辑重构到一个Rust的集成测试中,方便在CI中使用`cargo test`进行运行测试。但是在重构完成之后使用`cargo test`进行运行时我去遇到了如下的运行时错误。
|
||||
|
||||
![image-20241105181144993](./rust-drop-stack-overflow/image-20241105181144993.png)
|
||||
|
||||
看到这个报错的第一瞬间,我怀疑是因为`cargo test`和`cargo run`的运行环境不同,导致测试程序读取到了其他其实不是`sysy`程序但是以`.sy`结尾的文件,而恰好这个文件又能被解析,使得解析器组合子工作的过程中调用链太长而导致栈溢出,于是我在`RustRover`中打断点调试运行,却发现程序正确的读取到输入文件。这就奇怪了,我于是让程序继续运行到报错,看看报错时候程序的调用栈是被什么东西填满了,然后发现程序的调用栈长这样:
|
||||
|
||||
![image-20241105181612954](./rust-drop-stack-overflow/image-20241105181612954.png)
|
||||
|
||||
并不是我程序中代码的调用太深导致的,而是Rust编译器自动生成的`drop`函数导致的。于是尝试看看调用栈的底部,看看是在读取什么输入数据,`drop`什么神仙数据结构的时候发生的。调试器很快告诉我们,`drop`的数据结构是抽象语法树中的二元表达式,而此时的输入代码则如下图所示,而且图中的代码重复了400行。
|
||||
|
||||
![image-20241105182036975](./rust-drop-stack-overflow/image-20241105182036975.png)
|
||||
|
||||
我已经能想象到那棵高耸如云的抽象语法树了。
|
||||
|
||||
虽然找到了问题的根源,但是还有一个问题没有解决:为什么在`main.rs`上运行的时候程序并不会出现问题,但是在`cargo test`上运行时却会遇到栈溢出的问题?
|
||||
|
||||
这个问题其实在[Rust语言圣经](https://course.rs/compiler/pitfalls/stack-overflow.html)中就有记载,不过问题的背景略有不同。Rust语言圣经中导致栈溢出的问题是尝试在栈上分配一个4MB的超大数组,但是出现问题的原因是一致的。在`main.rs`中运行程序时,如果不使用多线程,那么程序的所有逻辑将运行在`main`线程上,这个线程在Linux下的栈大小是8MB,而当使用Rust提供的集成测试时,Rust为了实现测试的并行运行,会把所有的测试都运行在新线程上,这就导致在使用`cargo test`时程序会出现问题。
|
||||
|
||||
解决这个问题的方案可以是设置环境变量设置创建新线程的栈大小:`RUST_MIN_STACK=8388608 cargo test`,但是这种方法总是不太优雅。合理的解决方案是重写造成问题数据结构的`drop`方法,避免使用编译器自动生成的`drop`方法。这里我提供的抽象语法树`drop`方法如下所示。通过广度优先搜索的方式遍历语法树,手动释放一些可能子节点可能较多的语法树节点(其中释放内存的方式来自于[reddit](https://www.reddit.com/r/rust/comments/x97a4a/stack_overflow_during_drop_of_huge_abstract/))。
|
||||
|
||||
```rust
|
||||
fn collect_node_rubbishes(
|
||||
rubbish: &mut Vec<Rc<RefCell<SyntaxNode>>>,
|
||||
node_type: &mut SyntaxNodeType,
|
||||
) {
|
||||
match node_type {
|
||||
SyntaxNodeType::BinaryExpression(node) => {
|
||||
rubbish.push(std::mem::replace(&mut node.left, SyntaxNode::unit()));
|
||||
rubbish.push(std::mem::replace(&mut node.right, SyntaxNode::unit()));
|
||||
}
|
||||
SyntaxNodeType::Block(nodes) => {
|
||||
while let Some(child) = nodes.pop() {
|
||||
rubbish.push(child);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SyntaxNode {
|
||||
fn drop(&mut self) {
|
||||
let mut rubbish = Vec::new();
|
||||
collect_node_rubbishes(&mut rubbish, &mut self.node_type);
|
||||
|
||||
while let Some(node) = rubbish.pop() {
|
||||
collect_node_rubbishes(&mut rubbish, &mut node.borrow_mut().node_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105181144993.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105181144993.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105181612954.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105181612954.png
(Stored with Git LFS)
Normal file
Binary file not shown.
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105182036975.png
(Stored with Git LFS)
Normal file
BIN
YaeBlog/source/posts/rust-drop-stack-overflow/image-20241105182036975.png
(Stored with Git LFS)
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user