NCCL源码解析⑦:机器间Channel连接

679次阅读
没有评论

NCCL源码解析⑦:机器间Channel连接
作者|KIDGINBROOK


上节中完成了单机内部的channel搜索,仍然以ringGraph为例的话,相当于在单台机器内部搜索出来了一系列的环,接下来需要将机器之间的环连接起来。


为了方便理解,假设两机十六卡的情况下第一台机器的一个ring为:


graph->intra: GPU/0 GPU/7 GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1graph->inter: NET/0 NET/0

第二个机器对应的ring为:


graph->intra: GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11graph->inter: NET/0 NET/0

allGather3Data用于rank间聚合channel的信息,ncclGraphInfo记录了环的信息,比如speed和type


struct ncclGraphInfo { int sameC++hannels; float speedIntra; float speedInter; int typeIntra; }; struct { int cudaCompCap; int fullCudaCompCap; int nChannels; struct ncclGraphInfo tree; struct ncclGraphInfo ring; struct ncclGraphInfo collNet; struct ncclTopoRanks topoRanks; } *allGather3Data; NCCLCHECK(ncclCalloc(&allGather3Data, nranks)); allGather3Data[rank].cudaCompCap = ncclCudaCompCap(); allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); ... allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels; allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra; allGather3Data[rank].ring.speedInter = ringGraph.speedInter; allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra; ...

然后开始设置ncclTopoRanks,获取当前rank在ring中的prev和next,其中第一个rank的prev和最后一个rank的next为-1,如rank6的prev为7,next为3;获取当前ring的ringRecv和ringSend,即ring的第一个节点和最后一个节点,最后将搜索到的环复制了一遍,这里在官方issue中看到相关解释是为了进一步的并行以充分利用带宽。


struct ncclTopoRanks { int ringRecv[MAXCHANNELS]; int ringSend[MAXCHANNELS]; int ringPrev[MAXCHANNELS]; int ringNext[MAXCHANNELS]; int treeUpRecv[MAXCHANNELS]; int treeUpSend[MAXCHANNELS]; int treeDnRecv[MAXCHANNELS]; int treeDnSend[MAXCHANNELS];}; ncclResult_t ncclTopoPreset(struct ncclComm* comm, struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph, struct ncclTopoRanks* topoRanks) { int rank = comm->rank; int localRanks = comm->localRanks; int nChannels = comm->nChannels; for (int c=0; c<nChannels; c++) { struct ncclChannel* channel = comm->channels+c; channel->ring.prev = channel->ring.next = -1; ... int* ringIntra = ringGraph->intra+c*localRanks; int* treeIntra = treeGraph->intra+c*localRanks; int* collNetIntra = collNetGraph->intra+c*localRanks; for (int i=0; i<localRanks; i++) { if (ringIntra[i] == rank) { topoRanks->ringRecv[c] = ringIntra[0]; topoRanks->ringSend[c] = ringIntra[localRanks-1]; channel->ring.prev = (i == 0) ? -1 : ringIntra[i-1]; channel->ring.next = (i == localRanks-1) ? -1 : ringIntra[i+1]; } ... } topoRanks->ringPrev[c] = channel->ring.prev; topoRanks->ringNext[c] = channel->ring.next; } // Duplicate channels rings/trees struct ncclChannel* channel0 = comm->channels; struct ncclChannel* channel1 = channel0+nChannels; memcpy(channel1, channel0, nChannels*sizeof(struct ncclChannel)); return ncclSuccess;}

然后通过bootstrapAllGather获取全局的allGather3Data信息,计算出当前rank所在的node保存在comm->node,以及每个node的第一个rank保存在nodesFirstRank,因此例子中:


nodesFirstRank[0]: 0nodesFirstRank[1]: 10


然后开始将每个机器的环首尾相连组成大环。


ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct ncclTopoRanks** allTopoRanks, int* rings) { // Gather data from all ranks int *ringRecv, *ringSend, *ringPrev, *ringNext, *treeUpRecv, *treeUpSend, *treeDnRecv,*treeDnSend; int nranks = comm->nRanks; int nChannels = comm->nChannels; NCCLCHECK(ncclCalloc(&ringRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringSend, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringPrev, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringNext, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeUpRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeUpSend, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeDnRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeDnSend, nranks*MAXCHANNELS)); for (int i=0; i<nranks; i++) { for (int c=0; c<nChannels;c++) { ringRecv[c*nranks+i] = allTopoRanks[i]->ringRecv[c]; ringSend[c*nranks+i] = allTopoRanks[i]->ringSend[c]; ringPrev[c*nranks+i] = allTopoRanks[i]->ringPrev[c]; ringNext[c*nranks+i] = allTopoRanks[i]->ringNext[c]; treeUpRecv[c*nranks+i] = allTopoRanks[i]->treeUpRecv[c]; treeUpSend[c*nranks+i] = allTopoRanks[i]->treeUpSend[c]; treeDnRecv[c*nranks+i] = allTopoRanks[i]->treeDnRecv[c]; treeDnSend[c*nranks+i] = allTopoRanks[i]->treeDnSend[c]; } } // Connect rings and trees. This should also duplicate the channels. NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks)); NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks)); // Duplicate ringPrev/ringNext for ncclBuildRing memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int)); memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int)); // Duplication should be complete now nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2); // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS. // We permit combining max, then min, to only use the first channels, then duplicate them. nChannels = comm->nChannels = std::min((int)ncclMaxNchannels(), nChannels); int c; for (c=nChannels; c<ncclMinNchannels(); c++) { memcpy(ringPrev+c*nranks, ringPrev+(c-nChannels)*nranks, nranks*sizeof(int)); memcpy(ringNext+c*nranks, ringNext+(c-nChannels)*nranks, nranks*sizeof(int)); memcpy(comm->channels+c, comm->channels+c-nChannels, sizeof(struct ncclChannel)); } nChannels = comm->nChannels = c; // Create rings array and check all is fine NCCLCHECK(ncclBuildRings(nChannels, rings, comm->rank, comm->nRanks, ringPrev, ringNext)); free(ringRecv); free(ringSend); free(ringPrev); free(ringNext); free(treeUpRecv); free(treeUpSend); free(treeDnRecv); free(treeDnSend); return ncclSuccess;}

这里将所有channel的prev,next,send,recv信息打平到数组中,例如recv[0]表示第一个ring中rank0的recv是哪个rank,然后开始计算当前机器第一个rank的prev和最后一个rank的next。


static ncclResult_t connectRings(struct ncclComm* comm, int* ringRecv, int* ringSend, int* ringPrev, int* ringNext, int* firstRanks) { int nChannels = comm->nChannels; int nNodes = comm->nNodes; for (int c=0; c<nChannels; c++) { int* recv = ringRecv+c*comm->nRanks; int* send = ringSend+c*comm->nRanks; int* prev = ringPrev+c*comm->nRanks; int* next = ringNext+c*comm->nRanks; struct ncclChannel* channel0 = comm->channels+c; struct ncclChannel* channel1 = channel0+nChannels; for (int n=0; n<nNodes; n++) { int recvRank = recv[firstRanks[n]]; int prevSendRank = send[firstRanks[(n-1+nNodes)%nNodes]]; prev[recvRank] = prevSendRank; if (comm->rank == recvRank) { channel0->ring.prev = prevSendRank; channel1->ring.prev = prevSendRank; } int sendRank = send[firstRanks[n]]; int nextRecvRank = recv[firstRanks[(n+1)%nNodes]]; next[sendRank] = nextRecvRank; if (comm->rank == sendRank) { channel0->ring.next = nextRecvRank; channel1->ring.next = nextRecvRank; } } TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c, channel0->ring.prev, comm->rank, channel0->ring.next); TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c+nChannels, channel1->ring.prev, comm->rank, channel1->ring.next); } return ncclSuccess;}

如上所示,当前机器recv rank的prev就是前一个机器的send rank,当前机器send rank的next就是下一个机器的recv rank。然后执行ncclBuildRings按照大环的顺序依次记录rank到rings。


ncclResult_t ncclBuildRings(int nrings, int* rings, int rank, int nranks, int* prev, int* next) { for (int r=0; r<nrings; r++) { char prefix[30]; int current = rank; for (int i=0; i<nranks; i++) { rings[r*nranks+i] = current; current = next[r*nranks+current]; } ... // Check that all ranks are there for (int i=0; i<nranks; i++) { int found = 0; for (int j=0; j<nranks; j++) { if (rings[r*nranks+j] == i) { found = 1; break; } } if (found == 0) { WARN("Error : ring %d does not contain rank %d", r, i); return ncclInternalError; } } } return ncclSuccess;}


还是以上述为例,其中rank6记录的rings的第一个大环为:

 

GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1 GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11 GPU/0 GPU/7


到这里就完成了机器之间大环建立,每个rank都知道自己的上一个和下一个rank是谁,那么就可以建立实际的通信链路了。


接下来每个rank都要为通信分配一些内存,为了提高性能,这里会在分配buffer之前设置cpu亲和性,使得分配的内存尽量是当前numa本地的。


cpu_set_t affinitySave; sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave); NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank)); ncclResult_t ncclTopoSetAffinity(struct ncclTopoSystem* system, int rank) { struct ncclTopoNode* cpu = NULL, *gpu = NULL; for (int g=0; g<system->nodes[GPU].count; g++) { if (system->nodes[GPU].nodes[g].gpu.rank == rank) { gpu = system->nodes[GPU].nodes+g; // Find closer CPU int cpuIndex = -1, minHops = 0; for (int c=0; c<system->nodes[CPU].count; c++) { int nHops = system->nodes[GPU].nodes[g].paths[CPU][c].count; if (cpuIndex == -1 || nHops < minHops) { cpuIndex = c; minHops = nHops; } } cpu = system->nodes[CPU].nodes+cpuIndex; } } if (cpu == NULL) { WARN("Set CPU affinity : unable to find GPU/CPU for rank %d", rank); return ncclInternalError; } // Query the CPU affinity set we were provided cpu_set_t mask; SYSCHECK(sched_getaffinity(0, sizeof(cpu_set_t), &mask), "sched_getaffinity"); // Get the affinity of the CPU close to our GPU. cpu_set_t cpuMask = cpu->cpu.affinity; cpu_set_t finalMask; if (ncclParamIgnoreCpuAffinity()) // Ignore the CPU affinity set and use the GPU one instead finalMask = cpuMask; else // Use a subset of the GPU affinity set CPU_AND(&finalMask, &mask, &cpuMask); // If there is a non empty set, use it to set affinity if (CPU_COUNT(&finalMask)) { char affinityStr[sizeof(cpu_set_t)*2]; NCCLCHECK(ncclCpusetToStr(&finalMask, affinityStr)); INFO(NCCL_INIT, "Setting affinity for GPU %d to %s", gpu->gpu.dev, affinityStr); SYSCHECK(sched_setaffinity(0, sizeof(cpu_set_t), &finalMask), "sched_setaffinity"); } return ncclSuccess;}


首先获取当前线程的cpu亲和性保存到affinitySave,分配好buffer之后会用affinitySave来恢复亲和性。


然后通过ncclTopoSetAffinity设置cpu亲和性,找到当前rank对应的cpu节点之后,可以获取到该cpu对应的core,即cpuMask,然后获取当前线程对应的亲和性,即mask,默认会取cpuMask和mask的交集finalMask,如果交集不为空的话,会将finalMask设置给当前线程。


struct ncclConnect { char data[CONNECT_SIZE];}; struct ncclConnect *connect; NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore); for (int c=0; c<comm->nChannels; c++) { struct ncclChannel* channel = comm->channels+c; NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore); if (comm->nRanks == 1) continue; NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore); ... }


然后简单看下ncclChannel数据结构,其中collectives保存了用户向nccl提交的通信操作,比如ncclSend,ncclRecv等都会向collectives里加一项,ncclColl则保存了这些操作对应的参数;collectives是一个环形队列,所以collStart指向了开始位置,collCount表示队列中操作数量;FifoHead和FifoTail用于协调kernel产出数据和NET发送数据,其实就是生产者消费者,ncclPeer保存了通信相关的信息,后续再具体介绍。


struct ncclRing { // Shortcuts for userRanks[1] and userRanks[n-1] int prev; // 记录环中当前rank的上一个rank int next; // 记录环中当前rank的下一个rank // Maps an internal nccl index to user-specified rank order. This is necessary // since we need to know how the user expects data to be ordered across // devices. Ordered from current device. int* userRanks; // 以当前rank为起点记录整个环 int* devUserRanks; // device断的userRanks}; struct ncclChannel { union { struct { struct ncclRing ring; struct ncclTree treeUp; struct ncclTree treeDn; struct ncclTree collTreeUp; struct ncclTree collTreeDn; int id; // Communication structures struct ncclPeer* peers; struct ncclPeer* devPeers; // Operation list for aggregation struct ncclColl* collectives; int collStart; int collCount; int collFifoHead; // Only used by GPU int collFifoTail; // Only used by CPU }; int data[0x80]; }; };


然后开始初始化channel,initChannel主要是buffer的分配,分配userRanks和devUserRanks,设置ncclPeer,分配collectives,因为host和device都会访问collectives这个数据结构,所以需要通过cudaHostAlloc分配host端的锁页内存,并通过flag cudaHostAllocMapped将其映射到cuda的地址空间。不过在uva系统上,cudaMallocHost,cudaHostAlloc + cudaHostAllocDefault以及cudaHostAlloc + cudaHostAllocMapped这三种方式没啥区别,host和device都可以访问。


ncclResult_t initChannel(struct ncclComm* comm, int channelid) { struct ncclChannel* channel = comm->channels+channelid; if (channel->id != -1) return ncclSuccess; channel->id = channelid; // Ring index to user rank table. NCCLCHECK(ncclCudaCalloc(&channel->ring.devUserRanks, comm->nRanks)); NCCLCHECK(ncclCalloc(&channel->ring.userRanks, comm->nRanks)); // Communication structures with peers. NCCLCHECK(ncclCudaCalloc(&channel->devPeers, comm->nRanks+1)); // The extra one rank is for collnet root (i.e. network) NCCLCHECK(ncclCalloc(&channel->peers, comm->nRanks+1)); for (size_t i=0; i<comm->nRanks+1; ++i) { channel->peers[i].send.comm = comm; channel->peers[i].recv.comm = comm; } // Per-channel operation list. NCCLCHECK(ncclCudaHostCalloc(&channel->collectives, NCCL_MAX_OPS)); return ncclSuccess;} template <typename T>static ncclResult_t ncclCudaHostCalloc(T** ptr, size_t nelem) { CUDACHECK(cudaHostAlloc(ptr, nelem*sizeof(T), cudaHostAllocMapped)); memset(*ptr, 0, nelem*sizeof(T)); return ncclSuccess; }


然后从当前rank为起点,将环写到userRanks。


static ncclResult_t setupChannel(struct ncclComm* comm, int channelId, int rank, int nranks, int* ringRanks) { TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks); NCCLCHECK(initChannel(comm, channelId)); struct ncclRing* ring = &comm->channels[channelId].ring; // Reorganize ranks to start with rank. int shift; for (shift = 0; shift<nranks; shift++) { if (ringRanks[shift] == rank) { break; } } for (int i=0; i<nranks; i++) { ring->userRanks[i] = ringRanks[(i+shift)%nranks]; } return ncclSuccess;}


然后执行ncclTransportP2pSetup建立当前rank和prev,next的通信链路。


到这里就完成了机器之间channel的连接,下节会了解到通信链路的建立过程。


(本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/128144057)


其他人都在看

试用OneFlow: github.com/Oneflow-Inc/oneflow/

NCCL源码解析⑦:机器间Channel连接

 

Read More 

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 
评论(没有评论)
Generated by Feedzy