Commit 9183fce0 authored by Thomas Koopman's avatar Thomas Koopman
Browse files

All examples and OpenMPI use callbacks, but I do not see why this is...

All examples and OpenMPI use callbacks, but I do not see why this is necessary. Remove for the time being
No related merge requests found
Showing with 61 additions and 67 deletions
+61 -67
......@@ -5,7 +5,7 @@
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=16
#SBATCH --mem=28GB
#SBATCH --time=00:01:30
#SBATCH --time=00:00:30
#SBATCH --output=bench_unblocking.out
module load 2022
......
......@@ -22,22 +22,20 @@ int main(int argc, char **argv)
memset(buf, (char)ShrayRank(), total_size);
struct timeval tv1, tv2;
gettimeofday(&tv1, NULL);
double start = Shray_Wtime();
if (ShrayRank() == 0) {
for (size_t i = 0; i < total_size; i += msg_size) {
Shray_Send(1, buf + i, msg_size, i);
SHRAY_SAFE(Shray_Send(1, buf + i, msg_size, i));
}
} else {
for (size_t i = 0; i < total_size; i += msg_size) {
Shray_Recv(buf + i, msg_size, i);
SHRAY_SAFE(Shray_Recv(buf + i, msg_size, i));
}
}
gettimeofday(&tv2, NULL);
double duration = (double) (tv2.tv_usec - tv1.tv_usec) / 1e6 +
(double) (tv2.tv_sec - tv1.tv_sec);
double end = Shray_Wtime();
double duration = end - start;
printf("BW with msg size %lu, total size %lu: %lf MB/s\n",
msg_size, total_size, (double)total_size / duration / 1e6);
......
......@@ -7,6 +7,7 @@
#include <string.h>
#include <sys/time.h>
int main(int argc, char **argv)
{
if (argc != 3) {
......@@ -15,6 +16,9 @@ int main(int argc, char **argv)
}
ShrayInit(&argc, &argv);
printf("Rank %d has started\n", ShrayRank());
fflush(stdout);
size_t msg_size = atol(argv[1]);
size_t total_size = (atol(argv[2]) + msg_size - 1) / msg_size * msg_size;
......@@ -29,24 +33,28 @@ int main(int argc, char **argv)
for (size_t i = 0; i < total_size; i += msg_size) {
req[i / msg_size] = Shray_Isend(1, buf + i, msg_size, i);
}
Shray_Waitall(total_size / msg_size, req);
} else {
for (size_t i = 0; i < total_size; i += msg_size) {
req[i / msg_size] = Shray_Irecv(buf + i, msg_size, i);
}
Shray_Waitall(total_size / msg_size, req);
}
SHRAY_SAFE(Shray_Waitall(total_size / msg_size, req));
double end = Shray_Wtime();
double duration = end - start;
printf("BW with msg size %lu, total size %lu: %lf MB/s\n",
msg_size, total_size, (double)total_size / duration / 1e6);
printf("Rank %d: BW with msg size %lu, total size %lu: %lf MB/s\n",
ShrayRank(), msg_size, total_size,
(double)total_size / duration / 1e6);
fflush(stdout);
for (size_t i = 0; i < total_size / msg_size; i++) {
Shray_Request_free(req[i]);
}
printf("Rank %d, succesfully freed the requests\n", ShrayRank());
fflush(stdout);
if (ShrayRank() == 1) {
bool success = true;
for (size_t i = 0; i < total_size; i++) {
......
......@@ -3,6 +3,7 @@
#include <stdint.h>
#include <ucp/api/ucp_def.h>
#include <ucp/api/ucp.h>
/**
* Core Shray API
......@@ -18,8 +19,9 @@ double Shray_Wtime(void);
**/
typedef struct ucx_context {
int completed;
ucs_status_t status;
/* Currently, no callback functions are used, so this struct is not used,
* except for calling ucp_request_check_status */
int placeholder;
} *Shray_request;
ucs_status_t Shray_Send(uint32_t t, void *buf, size_t count, ucp_tag_t tag);
......@@ -30,4 +32,14 @@ ucs_status_t Shray_Wait(Shray_request request);
ucs_status_t Shray_Waitall(size_t count, Shray_request *requests);
void Shray_Request_free(Shray_request request);
#define SHRAY_SAFE(fncall) \
do { \
ucs_status_t status = (fncall); \
if (status != UCS_OK) { \
fprintf(stderr, "%s:%d Call %s failed with %s\n", \
__FILE__, __LINE__, #fncall, ucs_status_string(status)); \
} \
} while (0)
#endif /* _SHRAY_H_HLDSJKFDLDSF */
......@@ -5,6 +5,7 @@
**/
#include <errno.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -47,7 +48,7 @@ struct shray_regions regions;
static void request_init(void *request)
{
Shray_request req = (Shray_request)request;
req->completed = 0;
req->placeholder = 0;
}
/**
......@@ -196,54 +197,34 @@ void ShrayInit(int *argc, char ***argv)
free(remote_addr_len);
}
static void send_handler(void *request, ucs_status_t status, void *ctx)
static ucs_status_t ucx_wait(ucp_worker_h ucp_worker, Shray_request request)
{
(void)ctx;
Shray_request req = (Shray_request)request;
req->completed = 1;
req->status = status;
}
static void recv_handler(void *request, ucs_status_t status,
const ucp_tag_recv_info_t *info, void *user_data)
{
(void)user_data;
(void)info;
Shray_request req = (Shray_request)request;
req->completed = 1;
req->status = status;
}
static ucs_status_t ucx_wait(ucp_worker_h ucp_worker,
Shray_request request)
{
ucs_status_t status;
ucs_status_t stat;
if (UCS_PTR_IS_ERR(request)) {
status = UCS_PTR_STATUS(request);
stat = UCS_PTR_STATUS(request);
} else if (UCS_PTR_IS_PTR(request)) {
while (!request->completed) {
while ((stat = ucp_request_check_status(request)) == UCS_INPROGRESS) {
ucp_worker_progress(ucp_worker);
}
request->completed = 0;
status = ucp_request_check_status(request);
stat = UCS_OK;
} else {
status = UCS_OK;
stat = UCS_OK;
}
request->status = status;
return status;
return stat;
}
ucs_status_t Shray_Send(uint32_t t, void *buf, size_t count, ucp_tag_t tag)
{
ucp_request_param_t send_param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA,
.cb.send = send_handler,
.user_data = NULL};
/* request can be preallocated using UCP_OP_ATTR_FIELD_REQUEST,
* then it would not have to be allocated and freed every send.
* Likewise, may want to set ucp_mem_h */
ucp_request_param_t send_param = {.op_attr_mask = 0};
/* TODO there seems to be a maximum number of these that can be pending,
* after which my tests timeout. Perhaps because ucp_progress_worker
* becomes prohibitively expensive. */
Shray_request request = ucp_tag_send_nbx(endpoints[t], buf, count,
tag, &send_param);
......@@ -258,11 +239,9 @@ ucs_status_t Shray_Recv(void *buf, size_t count, ucp_tag_t tag)
ucp_tag_t tag_mask = 0xFFFFFFFFFFFFFFFFu;
ucp_request_param_t recv_param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_DATATYPE |
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FLAG_NO_IMM_CMPL,
.datatype = ucp_dt_make_contig(1),
.cb.recv = recv_handler};
.datatype = ucp_dt_make_contig(1)};
Shray_request request = ucp_tag_recv_nbx(ucp_worker, buf, count,
tag, tag_mask, &recv_param);
......@@ -275,6 +254,9 @@ ucs_status_t Shray_Recv(void *buf, size_t count, ucp_tag_t tag)
void ShrayFinalize(int error_code)
{
printf("Exiting with error code %d\n", error_code);
/* This can be done through info PMIX_EMBED_BARRIER as well, but
* I am not sure how to construct this argument properly */
PMIx_Fence(NULL, 0, NULL, 0);
PMIx_Finalize(NULL, 0);
}
......@@ -290,11 +272,7 @@ uint32_t ShraySize(void)
Shray_request Shray_Isend(uint32_t t, void *buf, size_t count, ucp_tag_t tag)
{
ucp_request_param_t send_param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA,
.cb.send = send_handler,
.user_data = NULL};
ucp_request_param_t send_param = {.op_attr_mask = 0};
return ucp_tag_send_nbx(endpoints[t], buf, count, tag, &send_param);
}
......@@ -303,11 +281,9 @@ Shray_request Shray_Irecv(void *buf, size_t count, ucp_tag_t tag)
/* Don't ignore any bits */
ucp_tag_t tag_mask = 0xFFFFFFFFFFFFFFFFu;
ucp_request_param_t recv_param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FLAG_NO_IMM_CMPL,
.datatype = ucp_dt_make_contig(1),
.cb.recv = recv_handler};
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FLAG_NO_IMM_CMPL,
.datatype = ucp_dt_make_contig(1)};
return ucp_tag_recv_nbx(ucp_worker, buf, count, tag, tag_mask, &recv_param);
}
......@@ -319,9 +295,9 @@ ucs_status_t Shray_Wait(Shray_request request)
ucs_status_t Shray_Waitall(size_t count, Shray_request *requests)
{
for (size_t i = 0; i < count; i++) {
ucx_wait(ucp_worker, requests[i]);
if (requests[i]->status != UCS_OK) {
return requests[i]->status;
ucs_status_t stat = ucx_wait(ucp_worker, requests[i]);
if (stat != UCS_OK) {
return stat;
}
}
return UCS_OK;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment