Commit 2735f01a authored by Thomas Koopman's avatar Thomas Koopman
Browse files

Allgather and start of RDMA support

No related merge requests found
Showing with 296 additions and 30 deletions
+296 -30
......@@ -7,8 +7,11 @@ all: $(patsubst examples/%.c, bin/%, $(wildcard examples/*.c))
bin/shray.o: src/shray.c include/shray.h
$(CC) $(FLAGS) -c $< -o $@
bin/%: examples/%.c bin/shray.o
bin/%: examples/%.c bin/shray.o bin/rma_hashmap.o
$(CC) $(FLAGS) $^ -o $@ $(LFLAGS)
bin/rma_hashmap.o: src/rma_hashmap.c src/rma_hashmap.h
$(CC) $(FLAGS) -c $< -o $@
clean:
$(RM) bin/*
#!/bin/sh
#SBATCH --partition=rome
#SBATCH --nodes=4
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=16
#SBATCH --mem=28GB
#SBATCH --time=00:00:30
#SBATCH --output=allgather.out
module load 2022
module load GCC/11.3.0
module load UCX/1.12.1-GCCcore-11.3.0
module load UCC/1.0.0-GCCcore-11.3.0
module load PMIx/4.1.2-GCCcore-11.3.0
make bin/allgather
srun --mpi=pmix bin/allgather
#include <stdlib.h>
#include <stdbool.h>
#include <stdio.h>
#include <shray.h>
int main(int argc, char **argv)
{
ShrayInit(&argc, &argv);
uint32_t me = ShrayRank();
uint32_t *recvbuf = malloc(ShraySize() * sizeof(uint32_t));
Shray_Allgather(&me, recvbuf, sizeof(uint32_t));
bool success = true;
for (uint32_t i = 0; i < ShraySize(); i++) {
if (recvbuf[i] != i) {
success = false;
printf("Rank %u, %u != %u\n", ShrayRank(), recvbuf[i], i);
}
}
printf("Rank %x: Allgather was %ssuccesfull\n",
ShrayRank(), (success) ? "" : "un");
free(recvbuf);
ShrayFinalize(EXIT_SUCCESS);
}
......@@ -6,12 +6,12 @@ int main(int argc, char **argv)
{
ShrayInit(&argc, &argv);
int buf = ShrayRank();
int buf = ShrayRank() + 42;
Shray_Broadcast(&buf, sizeof(int), 0);
printf("Rank %x: Broadcast was %ssuccesfull\n",
ShrayRank(), (buf == 0) ? "" : "un");
ShrayRank(), (buf == 42) ? "" : "un");
ShrayFinalize(EXIT_SUCCESS);
}
......@@ -20,14 +20,11 @@ double Shray_Wtime(void);
/**
* Explicit communication routines and datatypes
* TODO (necessary for Shray, ideally wrap everything):
* Reduce (for microbenchmark)
* Broadcast
* Split-phase barrier? Unblocking collectives?
**/
typedef void *Shray_request;
/* MPI-like operations */
ucs_status_t Shray_Send(uint32_t t, void *buf, size_t count, ucp_tag_t tag);
ucs_status_t Shray_Recv(void *buf, size_t count, ucp_tag_t tag);
Shray_request Shray_Irecv(void *buf, size_t count, ucp_tag_t tag);
......@@ -35,11 +32,24 @@ Shray_request Shray_Isend(uint32_t t, void *buf, size_t count, ucp_tag_t tag);
ucs_status_t Shray_Wait(Shray_request request);
ucs_status_t Shray_Waitall(size_t count, Shray_request *requests);
void Shray_Request_free(Shray_request request);
/* Collective operations */
void Shray_Barrier(void);
void Shray_Broadcast(void *buf, size_t count, uint32_t root);
void Shray_Reduce(void *buf, ucc_datatype_t type, ucc_reduction_op_t op,
uint32_t root);
void Shray_Allreduce(void *buf, ucc_datatype_t type, ucc_reduction_op_t op);
void Shray_Allgather(void *sendbuf, void *recvbuf, size_t count);
/* BSP operations */
/* buf does NOT need to be equal on all nodes */
void Shray_Push_reg(void *buf, size_t len);
//void Shray_Pop_reg(void *buf, size_t len);
//void Shray_Get(uint32_t t, void *buf, size_t size, size_t offset);
/**
* Error checking macro
**/
#define SHRAY_SAFE(fncall) \
do { \
......
#include <stddef.h>
#include <stdlib.h>
#include <stdint.h>
#include <ucp/api/ucp.h>
#include "rma_hashmap.h"
/**
* Global variables
**/
static uint32_t num_nodes;
struct value {
void *key;
rma_region_t region;
struct value *next;
};
struct rma_hashmap {
size_t size;
size_t occupancy;
struct value **values;
};
size_t hash(void *address, size_t size)
{
/* Most addresses are 8-byte aligned, so kill the least
* significant bits. */
return ((uintptr_t)address >> 4) % size;
}
void rma_hashmap_init(uint32_t Shray_size)
{
num_nodes = Shray_size;
}
rma_hashmap_t *rma_hashmap_alloc(size_t size)
{
rma_hashmap_t *hashmap = malloc(sizeof(rma_hashmap_t));
hashmap->size = size;
hashmap->occupancy = 0;
hashmap->values = malloc(size * sizeof(struct value *));
for (size_t i = 0; i < size; i++) {
hashmap->values[i] = NULL;
}
return hashmap;
}
void rma_hashmap_free(rma_hashmap_t *hashmap)
{
for (size_t i = 0; i < hashmap->size; i++) {
struct value *val = hashmap->values[i];
while (val != NULL) {
struct value *next = val->next;
free(val);
val = next;
}
}
free(hashmap->values);
free(hashmap);
}
/* Currently we do not resize */
void rma_hashmap_insert(rma_hashmap_t *map, void *key, void **addresses,
ucp_rkey_h *rkeys)
{
size_t index = hash(key, map->size);
struct value *val = malloc(sizeof(struct value));
val->key = key;
val->region.rkeys = rkeys;
val->region.addresses = addresses;
val->next = map->values[index];
map->values[index] = val;
map->occupancy++;
}
rma_region_t rma_hashmap_get(rma_hashmap_t *map, void *key)
{
size_t index = hash(key, map->size);
struct value *valp = map->values[index];
while (valp->key != key) {
valp = valp->next;
}
return valp->region;
}
void rma_hashmap_remove(rma_hashmap_t *map, void *key)
{
size_t index = hash(key, map->size);
struct value **valp = &(map->values[index]);
while ((*valp)->key != key) {
valp = &(*valp)->next;
}
struct value *to_delete = *valp;
*valp = (*valp)->next;
for (size_t i = 0; i < num_nodes; i++) {
ucp_rkey_destroy(to_delete->region.rkeys[i]);
free(to_delete->region.addresses[i]);
}
free(to_delete);
}
/**
* This data structure is used for the RDMA bookkeeping.
* Each entry corresponds to an array that is globally available for RDMA,
* as an MPI_Win or bsp_push_reg. The number of entries is small, but the
* hashmap is queried very often.
*
* TODO: write some tests
* TODO: maye use tombstones for better cache-locality?
**/
typedef struct rma_hashmap rma_hashmap_t;
typedef struct rma_region {
/* Must be allocated with ucp_ep_rkey_unpack() */
ucp_rkey_h *rkeys;
void **addresses;
} rma_region_t;
void rma_hashmap_init(uint32_t Shray_size);
rma_hashmap_t *rma_hashmap_alloc(size_t size);
/* Frees the hashmap, but not the rkeys and addresses. That is the
* responsibility of Shray_pop_reg */
void rma_hashmap_free(rma_hashmap_t *hashmap);
/* addresses and rkeys are not copied */
void rma_hashmap_insert(rma_hashmap_t *map, void *key, void **addresses,
ucp_rkey_h *rkeys);
/* key must be present. The corresponding rkeys and addresses are freed. */
void rma_hashmap_remove(rma_hashmap_t *map, void *key);
/* key must be present */
rma_region_t rma_hashmap_get(rma_hashmap_t *map, void *key);
......@@ -18,19 +18,9 @@
#include <shray.h>
#include "shray_internal.h"
#include "rma_hashmap.h"
#include "util.h"
/**
* Data types
**/
struct shray_regions {
size_t n;
size_t space;
void **addr;
ucp_mem_h *handle;
};
/**
* Global variables
**/
......@@ -41,10 +31,10 @@ static ucp_context_h ucp_context;
static ucp_ep_h *endpoints;
static ucp_ep_params_t *ep_params;
static ucp_worker_h ucp_worker;
static struct shray_regions regions;
static ucc_context_h ucc_context;
static ucc_lib_h ucc_lib_p;
static ucc_team_h ucc_team;
static rma_hashmap_t *rma_hashmap;
/**
* Helper functions
......@@ -281,10 +271,8 @@ void ShrayInit(int *argc, char ***argv)
* Prepare handles for RDMA memory regions
**/
regions.n = 0;
regions.space = 10;
regions.addr = malloc(regions.space * sizeof(void *));
regions.handle = malloc(regions.space * sizeof(ucp_mem_h));
rma_hashmap_init(Shray_size);
rma_hashmap = rma_hashmap_alloc(10);
/**
* UCC initalisation
......@@ -462,6 +450,7 @@ void ShrayFinalize(int error_code)
UCC_SAFE(ucc_team_destroy(ucc_team));
UCC_SAFE(ucc_context_destroy(ucc_context));
UCC_SAFE(ucc_finalize(ucc_lib_p));
rma_hashmap_free(rma_hashmap);
/* This can be done through info PMIX_EMBED_BARRIER as well, but
* I am not sure how to construct this argument properly */
PMIx_Fence(NULL, 0, NULL, 0);
......@@ -484,6 +473,10 @@ uint32_t ShraySize(void)
return Shray_size;
}
/**
* Message Passing
**/
Shray_request Shray_Isend(uint32_t t, void *buf, size_t count, ucp_tag_t tag)
{
ucp_request_param_t send_param = {.op_attr_mask = 0};
......@@ -517,13 +510,6 @@ ucs_status_t Shray_Waitall(size_t count, Shray_request *requests)
return UCS_OK;
}
double Shray_Wtime(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)(tv.tv_usec) / 1e6 + (double)tv.tv_sec;
}
void Shray_Request_free(Shray_request request)
{
if (request != UCS_OK && !UCS_PTR_IS_ERR(request)) {
......@@ -531,6 +517,10 @@ void Shray_Request_free(Shray_request request)
}
}
/**
* Collectives
**/
static void ucc_blocking_common(ucc_coll_args_t coll_args)
{
ucc_coll_req_h request;
......@@ -624,3 +614,80 @@ void Shray_Allreduce(void *buf, ucc_datatype_t type, ucc_reduction_op_t op)
ucc_blocking_common(coll_args);
}
void Shray_Allgather(void *sendbuf, void *recvbuf, size_t count)
{
ucc_coll_args_t coll_args = {
.mask = 0,
.coll_type = UCC_COLL_TYPE_ALLGATHER,
.src.info = {
.buffer = sendbuf,
.count = count,
.datatype = UCC_DT_INT8,
.mem_type = UCC_MEMORY_TYPE_HOST
},
.dst.info = {
.buffer = recvbuf,
.count = Shray_size * count,
.datatype = UCC_DT_INT8,
.mem_type = UCC_MEMORY_TYPE_HOST
}
};
ucc_blocking_common(coll_args);
}
/**
* RDMA
**/
void Shray_Push_reg(void *buf, size_t len)
{
(void)buf; (void)len;
// ucp_mem_map_params_t params = {
// .field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
// UCP_MEM_MAP_PARAM_FIELD_LENGTH |
// UCP_MEM_MAP_PARAM_FIELD_PROT |
// UCP_MEM_MAP_PARAM_FIELD_MEMORY_TYPE,
// .address = buf,
// .length = len,
// .prot = UCP_MEM_MAP_PROT_LOCAL_READ |
// UCP_MEM_MAP_PROT_LOCAL_WRITE |
// UCP_MEM_MAP_PROT_REMOTE_READ |
// UCP_MEM_MAP_PROT_REMOTE_WRITE,
// .type = UCS_MEMORY_TYPE_HOST
// /* TODO I don't understand what .exported_memh_buffer does */
// };
// ucp_mem_h memh;
// UCS_SAFE(ucp_mem_map(ucp_context, &params, &memh));
//
// void * rkey_buffer;
// size_t rkey_size;
// UCS_SAFE(ucp_rkey_pack(ucp_context, memh, &rkey_buffer, &rkey_size));
// ucp_rkey_h *rkeys = malloc(Shray_size * sizeof(ucp_rkey_h));
// void *rkeys_buf = malloc(Shray_size * rkey_size);
// Shray_Allgather(rkeys_buf, rkeys, rkey_size);
// for (uint32_t i = 0; i < Shray_size; i++) {
//
// }
//
//
// void **addresses = malloc(Shray_size * sizeof(void *));
// Shray_Allgather(&address, addresses, sizeof(void *));
// rma_hashmap_insert(rma_hashmap, buf, addresses, rkeys);
//
// /* allgather both the rkey and remote addresses */
// /* We need a hash map that has key buf, and has an array of rkeys and
// * remote addresses as value */
}
/**
* Utilities
**/
double Shray_Wtime(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)(tv.tv_usec) / 1e6 + (double)tv.tv_sec;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment