Selaa lähdekoodia

Add starpu_task_notify_ready_soon_register

to be notified when it is determined when a task will be ready an estimated
amount of time from now.
Samuel Thibault 7 vuotta sitten
vanhempi
commit
af9b8aa005

+ 2 - 0
ChangeLog

@@ -39,6 +39,8 @@ New features:
   * Add implicit support for asynchronous partition planning. This means one
   * Add implicit support for asynchronous partition planning. This means one
     does not need to call starpu_data_partition_submit etc. explicitly any
     does not need to call starpu_data_partition_submit etc. explicitly any
     more, StarPU will make the appropriate calls as needed.
     more, StarPU will make the appropriate calls as needed.
+  * Add starpu_task_notify_ready_soon_register to be notified when it is
+    determined when a task will be ready an estimated amount of time from now.
 
 
 Small features:
 Small features:
   * Scheduling contexts may now be associated a user data pointer at creation
   * Scheduling contexts may now be associated a user data pointer at creation

+ 7 - 1
doc/doxygen/chapters/api/scheduling_policy.doxy

@@ -2,7 +2,7 @@
  *
  *
  * Copyright (C) 2011-2013                                Inria
  * Copyright (C) 2011-2013                                Inria
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2010-2017                                CNRS
- * Copyright (C) 2009-2011,2014-2017                      Université de Bordeaux
+ * Copyright (C) 2009-2011,2014-2018                      Université de Bordeaux
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
  * it under the terms of the GNU Lesser General Public License as published by
@@ -255,6 +255,12 @@ Prefetch data for a given task on a given node
 \ingroup API_Scheduling_Policy
 \ingroup API_Scheduling_Policy
 Prefetch data for a given task on a given node when the bus is idle
 Prefetch data for a given task on a given node when the bus is idle
 
 
+\fn void starpu_task_notify_ready_soon_register(starpu_notify_ready_soon_func f, void *data);
+\ingroup API_Scheduling_Policy
+Register a callback to be called when it is determined when a task will be ready
+an estimated amount of time from now, because its last dependency has just
+started and we know how long it will take.
+
 \fn void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
 \fn void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
 \ingroup API_Scheduling_Policy
 \ingroup API_Scheduling_Policy
 The scheduling policies indicates if the worker may pop tasks from the list of other workers
 The scheduling policies indicates if the worker may pop tasks from the list of other workers

+ 3 - 2
include/starpu_data.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2011-2013,2016-2017                      Inria
  * Copyright (C) 2011-2013,2016-2017                      Inria
- * Copyright (C) 2009-2017                                Université de Bordeaux
+ * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2010-2015,2017                           CNRS
  * Copyright (C) 2010-2015,2017                           CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -73,7 +73,8 @@ int starpu_data_acquire_cb(starpu_data_handle_t handle, enum starpu_data_access_
 int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);
 int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg);
 int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
-int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, long *pre_sync_jobid, long *post_sync_jobid);
+int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, int quick);
+int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency, int quick, long *pre_sync_jobid, long *post_sync_jobid);
 
 
 int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode);
 int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode);

+ 5 - 1
include/starpu_scheduler.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2011-2013,2015-2017                      Inria
  * Copyright (C) 2011-2013,2015-2017                      Inria
- * Copyright (C) 2010-2017                                Université de Bordeaux
+ * Copyright (C) 2010-2018                                Université de Bordeaux
  * Copyright (C) 2011-2013,2015,2017                      CNRS
  * Copyright (C) 2011-2013,2015,2017                      CNRS
  * Copyright (C) 2011                                     Télécom-SudParis
  * Copyright (C) 2011                                     Télécom-SudParis
  * Copyright (C) 2016                                     Uppsala University
  * Copyright (C) 2016                                     Uppsala University
@@ -95,6 +95,10 @@ double starpu_task_bundle_expected_length(starpu_task_bundle_t bundle, struct st
 double starpu_task_bundle_expected_data_transfer_time(starpu_task_bundle_t bundle, unsigned memory_node);
 double starpu_task_bundle_expected_data_transfer_time(starpu_task_bundle_t bundle, unsigned memory_node);
 double starpu_task_bundle_expected_energy(starpu_task_bundle_t bundle, struct starpu_perfmodel_arch *arch, unsigned nimpl);
 double starpu_task_bundle_expected_energy(starpu_task_bundle_t bundle, struct starpu_perfmodel_arch *arch, unsigned nimpl);
 
 
+typedef void (*starpu_notify_ready_soon_func)(void *data, struct starpu_task *task, double delay);
+void starpu_task_notify_ready_soon_register(starpu_notify_ready_soon_func f, void *data);
+
+
 void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id);
 void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id);
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus

+ 1 - 1
mpi/src/starpu_mpi.c

@@ -46,7 +46,7 @@ static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum sta
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
 	 * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
 	 * the request is actually submitted */
 	 * the request is actually submitted */
-	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
+	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency, 1, &req->pre_sync_jobid, &req->post_sync_jobid);
 }
 }
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,

+ 1 - 1
mpi/src/starpu_mpi_coop_sends.c

@@ -263,7 +263,7 @@ void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_
 	if (first)
 	if (first)
 	{
 	{
 		/* We were first, we are responsible for acquiring the data for everybody */
 		/* We were first, we are responsible for acquiring the data for everybody */
-		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_coop_sends_data_ready, coop_sends, sequential_consistency, 0, &req->pre_sync_jobid, NULL);
 	}
 	}
 }
 }
 
 

+ 94 - 0
src/core/dependencies/cg.c

@@ -288,6 +288,64 @@ void _starpu_notify_cg(void *pred STARPU_ATTRIBUTE_UNUSED, struct _starpu_cg *cg
 	}
 	}
 }
 }
 
 
+/* Note: in case of a tag, it must be already locked */
+void _starpu_notify_job_ready_soon_cg(void *pred STARPU_ATTRIBUTE_UNUSED, struct _starpu_cg *cg, _starpu_notify_job_start_data *data)
+{
+	STARPU_ASSERT(cg);
+
+	if (cg->remaining == 1)
+	{
+		/* the group is to be completed */
+		switch (cg->cg_type)
+		{
+			case STARPU_CG_APPS:
+				/* Not a task */
+				break;
+
+			case STARPU_CG_TAG:
+			{
+				struct _starpu_cg_list *tag_successors;
+				struct _starpu_tag *tag;
+
+				tag = cg->succ.tag;
+				tag_successors = &tag->tag_successors;
+
+				/* Note: the tag is already locked by the
+				 * caller. */
+				if ((tag->state == STARPU_BLOCKED) &&
+					(tag_successors->ndeps == tag_successors->ndeps_completed + 1))
+				{
+					/* This is to be ready */
+					_starpu_enforce_deps_notify_job_ready_soon(tag->job, data, 1);
+				}
+				break;
+			}
+
+ 		        case STARPU_CG_TASK:
+			{
+				struct _starpu_cg_list *job_successors;
+				struct _starpu_job *j;
+
+				j = cg->succ.job;
+				job_successors = &j->job_successors;
+
+				if (job_successors->ndeps == job_successors->ndeps_completed + 1 &&
+					j->task->status == STARPU_TASK_BLOCKED_ON_TASK)
+				{
+					/* This is to be ready */
+					_starpu_enforce_deps_notify_job_ready_soon(j, data, 0);
+				}
+
+				break;
+			}
+
+			default:
+				STARPU_ABORT();
+		}
+	}
+}
+
+
 /* Caller just has to promise that the list will not disappear.
 /* Caller just has to promise that the list will not disappear.
  * _starpu_notify_cg_list protects the list itself.
  * _starpu_notify_cg_list protects the list itself.
  * No job lock should be held, since we might want to immediately call the callback of an empty task.
  * No job lock should be held, since we might want to immediately call the callback of an empty task.
@@ -332,3 +390,39 @@ void _starpu_notify_cg_list(void *pred, struct _starpu_cg_list *successors)
 	successors->terminated = 1;
 	successors->terminated = 1;
 	_starpu_spin_unlock(&successors->lock);
 	_starpu_spin_unlock(&successors->lock);
 }
 }
+
+/* Caller just has to promise that the list will not disappear.
+ * _starpu_notify_cg_list protects the list itself.
+ * No job lock should be held, since we might want to immediately call the callback of an empty task.
+ */
+void _starpu_notify_job_start_cg_list(void *pred, struct _starpu_cg_list *successors, _starpu_notify_job_start_data *data)
+{
+	unsigned succ;
+
+	_starpu_spin_lock(&successors->lock);
+	/* Note: some thread might be concurrently adding other items */
+	for (succ = 0; succ < successors->nsuccs; succ++)
+	{
+		struct _starpu_cg *cg = successors->succ[succ];
+		_starpu_spin_unlock(&successors->lock);
+		STARPU_ASSERT(cg);
+		unsigned cg_type = cg->cg_type;
+
+		struct _starpu_tag *cgtag = NULL;
+
+		if (cg_type == STARPU_CG_TAG)
+		{
+			cgtag = cg->succ.tag;
+			STARPU_ASSERT(cgtag);
+			_starpu_spin_lock(&cgtag->lock);
+		}
+
+		_starpu_notify_job_ready_soon_cg(pred, cg, data);
+
+		if (cg_type == STARPU_CG_TAG)
+			_starpu_spin_unlock(&cgtag->lock);
+
+		_starpu_spin_lock(&successors->lock);
+	}
+	_starpu_spin_unlock(&successors->lock);
+}

+ 9 - 1
src/core/dependencies/cg.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2017                                Université de Bordeaux
+ * Copyright (C) 2010-2018                                Université de Bordeaux
  * Copyright (C) 2010-2011,2013,2015,2017                 CNRS
  * Copyright (C) 2010-2011,2013,2015,2017                 CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -108,6 +108,12 @@ struct _starpu_cg
 	} succ;
 	} succ;
 };
 };
 
 
+typedef struct _starpu_notify_job_start_data _starpu_notify_job_start_data;
+
+void _starpu_notify_dependencies(struct _starpu_job *j);
+void _starpu_job_notify_start(struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch);
+void _starpu_job_notify_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data);
+
 void _starpu_cg_list_init(struct _starpu_cg_list *list);
 void _starpu_cg_list_init(struct _starpu_cg_list *list);
 void _starpu_cg_list_deinit(struct _starpu_cg_list *list);
 void _starpu_cg_list_deinit(struct _starpu_cg_list *list);
 int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg);
 int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg);
@@ -116,6 +122,8 @@ int _starpu_list_task_scheduled_successors_in_cg_list(struct _starpu_cg_list *su
 int _starpu_list_tag_successors_in_cg_list(struct _starpu_cg_list *successors, unsigned ndeps, starpu_tag_t tag_array[]);
 int _starpu_list_tag_successors_in_cg_list(struct _starpu_cg_list *successors, unsigned ndeps, starpu_tag_t tag_array[]);
 void _starpu_notify_cg(void *pred, struct _starpu_cg *cg);
 void _starpu_notify_cg(void *pred, struct _starpu_cg *cg);
 void _starpu_notify_cg_list(void *pred, struct _starpu_cg_list *successors);
 void _starpu_notify_cg_list(void *pred, struct _starpu_cg_list *successors);
+void _starpu_notify_job_start_cg_list(void *pred, struct _starpu_cg_list *successors, _starpu_notify_job_start_data *data);
 void _starpu_notify_task_dependencies(struct _starpu_job *j);
 void _starpu_notify_task_dependencies(struct _starpu_job *j);
+void _starpu_notify_job_start_tasks(struct _starpu_job *j, _starpu_notify_job_start_data *data);
 
 
 #endif // __CG_H__
 #endif // __CG_H__

+ 38 - 1
src/core/dependencies/data_concurrency.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2013,2015,2017                           Inria
  * Copyright (C) 2013,2015,2017                           Inria
- * Copyright (C) 2009-2015,2017                           Université de Bordeaux
+ * Copyright (C) 2009-2015,2017-2018                      Université de Bordeaux
  * Copyright (C) 2010-2013,2015,2017                      CNRS
  * Copyright (C) 2010-2013,2015,2017                      CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -298,6 +298,43 @@ static unsigned _submit_job_enforce_data_deps(struct _starpu_job *j, unsigned st
 	return 0;
 	return 0;
 }
 }
 
 
+/* This is called when the tag+task dependencies are to be finished releasing.  */
+void _starpu_enforce_data_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data)
+{
+	unsigned buf;
+
+	if (j->task->cl) {
+		unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
+
+		for (buf = 0; buf < nbuffers; buf++)
+		{
+			starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, buf);
+			if (handle->arbiter)
+				/* Oops, it's the arbiter's decision */
+				return;
+		}
+
+		/* We need to check data availability only if sequential consistency
+		 * dependencies have not been used */
+		if (!j->sequential_consistency) {
+			for (buf = 0; buf < nbuffers; buf++)
+			{
+				starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, buf);
+				enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(j->task, buf) & ~STARPU_COMMUTE;
+
+				if (handle->reduction_refcnt)
+					/* Reduction pending, don't bother trying */
+					return;
+				if (handle->refcnt != 0 && (mode == STARPU_W || handle->current_mode != mode))
+					/* Incompatible modes, not ready immediately */
+					return;
+			}
+		}
+	}
+	/* Ok, it really looks like this job will be ready soon */
+	_starpu_job_notify_ready_soon(j, data);
+}
+
 void _starpu_job_set_ordered_buffers(struct _starpu_job *j)
 void _starpu_job_set_ordered_buffers(struct _starpu_job *j)
 {
 {
 	/* Compute an ordered list of the different pieces of data so that we
 	/* Compute an ordered list of the different pieces of data so that we

+ 2 - 1
src/core/dependencies/data_concurrency.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2015                                     Inria
  * Copyright (C) 2015                                     Inria
- * Copyright (C) 2009-2012,2014-2015                      Université de Bordeaux
+ * Copyright (C) 2009-2012,2014-2015,2018                 Université de Bordeaux
  * Copyright (C) 2010-2011,2013,2015,2017                 CNRS
  * Copyright (C) 2010-2011,2013,2015,2017                 CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -25,6 +25,7 @@ void _starpu_job_set_ordered_buffers(struct _starpu_job *j);
 
 
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
 unsigned _starpu_submit_job_enforce_data_deps(struct _starpu_job *j);
 void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
 void _starpu_submit_job_enforce_arbitered_deps(struct _starpu_job *j, unsigned buf, unsigned nbuffers);
+void _starpu_enforce_data_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data);
 
 
 int _starpu_notify_data_dependencies(starpu_data_handle_t handle);
 int _starpu_notify_data_dependencies(starpu_data_handle_t handle);
 void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);
 void _starpu_notify_arbitered_dependencies(starpu_data_handle_t handle);

+ 57 - 1
src/core/dependencies/dependencies.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2012,2014                           Université de Bordeaux
+ * Copyright (C) 2010-2012,2014, 2018                     Université de Bordeaux
  * Copyright (C) 2010-2012,2015,2017                      CNRS
  * Copyright (C) 2010-2012,2015,2017                      CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -37,3 +37,59 @@ void _starpu_notify_dependencies(struct _starpu_job *j)
 		_starpu_notify_tag_dependencies(j->tag);
 		_starpu_notify_tag_dependencies(j->tag);
 
 
 }
 }
+
+/* TODO: make this a hashtable indexed by func+data and pass that through data. */
+static starpu_notify_ready_soon_func notify_ready_soon_func;
+static void *notify_ready_soon_func_data;
+
+struct _starpu_notify_job_start_data {
+	double delay;
+};
+
+void starpu_task_notify_ready_soon_register(starpu_notify_ready_soon_func f, void *data)
+{
+	STARPU_ASSERT(!notify_ready_soon_func);
+	notify_ready_soon_func = f;
+	notify_ready_soon_func_data = data;
+}
+
+/* Called when a job has just started, so we can notify tasks which were waiting
+ * only for this one when they can expect to start */
+static void __starpu_job_notify_start(struct _starpu_job *j, double delay);
+void _starpu_job_notify_start(struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch)
+{
+	double delay;
+
+	if (!notify_ready_soon_func)
+		return;
+
+	delay = starpu_task_expected_length(j->task, perf_arch, j->nimpl);
+	if (isnan(delay) || _STARPU_IS_ZERO(delay))
+		return;
+
+	__starpu_job_notify_start(j, delay);
+}
+
+static void __starpu_job_notify_start(struct _starpu_job *j, double delay)
+{
+	_starpu_notify_job_start_data data = { .delay = delay };
+
+	_starpu_notify_job_start_tasks(j, &data);
+
+	if (j->task->use_tag)
+		_starpu_notify_job_start_tag_dependencies(j->tag, &data);
+
+	/* TODO: check data notification */
+}
+
+void _starpu_job_notify_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data)
+{
+	struct starpu_task *task = j->task;
+	notify_ready_soon_func(notify_ready_soon_func_data, task, data->delay);
+	if (!task->cl || task->cl->where == STARPU_NOWHERE || task->where == STARPU_NOWHERE)
+		/* This task will immediately terminate, so transition this */
+		__starpu_job_notify_start(_starpu_get_job_associated_to_task(task), data->delay);
+	if (j->quick_next)
+		/* This job is actually a pre_sync job with a post_sync job to be released right after */
+		_starpu_job_notify_ready_soon(j->quick_next, data);
+}

+ 4 - 0
src/core/dependencies/implicit_data_deps.c

@@ -383,6 +383,8 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 	if (j->reduction_task)
 	if (j->reduction_task)
 		return;
 		return;
 
 
+	j->sequential_consistency = 1;
+
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
 	struct _starpu_task_wrapper_dlist *dep_slots = _STARPU_JOB_GET_DEP_SLOTS(j);
 	struct _starpu_task_wrapper_dlist *dep_slots = _STARPU_JOB_GET_DEP_SLOTS(j);
 
 
@@ -398,6 +400,8 @@ void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 			continue;
 			continue;
 
 
 		STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
 		STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+		if (!handle->sequential_consistency)
+			j->sequential_consistency = 0;
 		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, &dep_slots[buffer], handle, mode);
 		new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, &dep_slots[buffer], handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		if (new_task)
 		if (new_task)

+ 6 - 1
src/core/dependencies/tags.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2011-2012,2016                           Inria
  * Copyright (C) 2011-2012,2016                           Inria
- * Copyright (C) 2008-2014,2016-2017                      Université de Bordeaux
+ * Copyright (C) 2008-2014,2016-2018                      Université de Bordeaux
  * Copyright (C) 2010-2013,2015-2017                      CNRS
  * Copyright (C) 2010-2013,2015-2017                      CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -268,6 +268,11 @@ void _starpu_notify_tag_dependencies(struct _starpu_tag *tag)
 	_starpu_spin_unlock(&tag->lock);
 	_starpu_spin_unlock(&tag->lock);
 }
 }
 
 
+void _starpu_notify_job_start_tag_dependencies(struct _starpu_tag *tag, _starpu_notify_job_start_data *data)
+{
+	_starpu_notify_job_start_cg_list(tag, &tag->tag_successors, data);
+}
+
 void starpu_tag_restart(starpu_tag_t id)
 void starpu_tag_restart(starpu_tag_t id)
 {
 {
 	struct _starpu_tag *tag = gettag_struct(id);
 	struct _starpu_tag *tag = gettag_struct(id);

+ 2 - 2
src/core/dependencies/tags.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2012                                     Inria
  * Copyright (C) 2012                                     Inria
- * Copyright (C) 2008-2011,2014                           Université de Bordeaux
+ * Copyright (C) 2008-2011,2014, 2018                           Université de Bordeaux
  * Copyright (C) 2010-2011,2015,2017                      CNRS
  * Copyright (C) 2010-2011,2015,2017                      CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -64,8 +64,8 @@ struct _starpu_tag
 
 
 void _starpu_init_tags(void);
 void _starpu_init_tags(void);
 
 
-void _starpu_notify_dependencies(struct _starpu_job *j);
 void _starpu_notify_tag_dependencies(struct _starpu_tag *tag);
 void _starpu_notify_tag_dependencies(struct _starpu_tag *tag);
+void _starpu_notify_job_start_tag_dependencies(struct _starpu_tag *tag, _starpu_notify_job_start_data *data);
 
 
 void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job);
 void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job);
 void _starpu_tag_set_ready(struct _starpu_tag *tag);
 void _starpu_tag_set_ready(struct _starpu_tag *tag);

+ 6 - 1
src/core/dependencies/task_deps.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2017                                Université de Bordeaux
+ * Copyright (C) 2010-2018                                Université de Bordeaux
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2014,2016                                Inria
  * Copyright (C) 2014,2016                                Inria
  *
  *
@@ -68,6 +68,11 @@ void _starpu_notify_task_dependencies(struct _starpu_job *j)
 	_starpu_notify_cg_list(j, &j->job_successors);
 	_starpu_notify_cg_list(j, &j->job_successors);
 }
 }
 
 
+void _starpu_notify_job_start_tasks(struct _starpu_job *j, _starpu_notify_job_start_data *data)
+{
+	_starpu_notify_job_start_cg_list(j, &j->job_successors, data);
+}
+
 /* task depends on the tasks in task array */
 /* task depends on the tasks in task array */
 void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[], int check)
 void _starpu_task_declare_deps_array(struct starpu_task *task, unsigned ndeps, struct starpu_task *task_array[], int check)
 {
 {

+ 18 - 0
src/core/jobs.c

@@ -674,6 +674,24 @@ unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
 }
 }
 #endif
 #endif
 
 
+/* This is called when a tag or task dependency is to be released.  */
+void _starpu_enforce_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data, int tag)
+{
+	if (!j->submitted)
+		/* It's not even submitted actually */
+		return;
+	struct _starpu_cg_list *job_successors = &j->job_successors;
+        /* tag is 1 when we got woken up by a tag dependency about to be
+         * released, and thus we have to check the exact numbner of
+         * dependencies.  Otherwise it's a task dependency which is about to be
+         * released.  */
+	if (job_successors->ndeps != job_successors->ndeps_completed + 1 - tag)
+		/* There are still other dependencies */
+		return;
+
+	_starpu_enforce_data_deps_notify_job_ready_soon(j, data);
+}
+
 /* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
 /* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
  * ring buffer, indexed by order, and pulled from its head. */
  * ring buffer, indexed by order, and pulled from its head. */
 /* TODO: replace with perhaps a heap */
 /* TODO: replace with perhaps a heap */

+ 10 - 1
src/core/jobs.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
  * Copyright (C) 2011,2014                                Inria
  * Copyright (C) 2011,2014                                Inria
- * Copyright (C) 2008-2017                                Université de Bordeaux
+ * Copyright (C) 2008-2018                                Université de Bordeaux
  * Copyright (C) 2010-2011,2013-2015,2017,2018            CNRS
  * Copyright (C) 2010-2011,2013-2015,2017,2018            CNRS
  * Copyright (C) 2011                                     Télécom-SudParis
  * Copyright (C) 2011                                     Télécom-SudParis
  *
  *
@@ -76,6 +76,12 @@ struct _starpu_job
 	/* The task associated to that job */
 	/* The task associated to that job */
 	struct starpu_task *task;
 	struct starpu_task *task;
 
 
+        /* A task that this will unlock quickly, e.g. we are the pre_sync part
+         * of a data acquisition, and the caller promised that data release will
+	 * happen immediately, so that the post_sync task will be started
+         * immediately after. */
+	struct _starpu_job *quick_next;
+
 	/* These synchronization structures are used to wait for the job to be
 	/* These synchronization structures are used to wait for the job to be
 	 * available or terminated for instance. */
 	 * available or terminated for instance. */
 	starpu_pthread_mutex_t sync_mutex;
 	starpu_pthread_mutex_t sync_mutex;
@@ -159,6 +165,8 @@ struct _starpu_job
 
 
 	/* Is that task internal to StarPU? */
 	/* Is that task internal to StarPU? */
 	unsigned internal:1;
 	unsigned internal:1;
+	/* Did that task use sequential consistency for its data? */
+	unsigned sequential_consistency:1;
 
 
 	/* During the reduction of a handle, StarPU may have to submit tasks to
 	/* During the reduction of a handle, StarPU may have to submit tasks to
 	 * perform the reduction itself: those task should not be stalled while
 	 * perform the reduction itself: those task should not be stalled while
@@ -237,6 +245,7 @@ unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j);
 /* When waking up a continuation, we only enforce new task dependencies */
 /* When waking up a continuation, we only enforce new task dependencies */
 unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
 unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j);
 #endif
 #endif
+void _starpu_enforce_deps_notify_job_ready_soon(struct _starpu_job *j, _starpu_notify_job_start_data *data, int tag);
 
 
 /* Called at the submission of the job */
 /* Called at the submission of the job */
 void _starpu_handle_job_submission(struct _starpu_job *j);
 void _starpu_handle_job_submission(struct _starpu_job *j);

+ 18 - 5
src/datawizard/user_interactions.c

@@ -189,7 +189,7 @@ static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
 /* The data must be released by calling starpu_data_release later on */
 /* The data must be released by calling starpu_data_release later on */
 int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
 int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
-							  int sequential_consistency,
+							  int sequential_consistency, int quick,
 							  long *pre_sync_jobid, long *post_sync_jobid)
 							  long *pre_sync_jobid, long *post_sync_jobid)
 {
 {
 	STARPU_ASSERT(handle);
 	STARPU_ASSERT(handle);
@@ -215,21 +215,27 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	if (handle_sequential_consistency && sequential_consistency)
 	if (handle_sequential_consistency && sequential_consistency)
 	{
 	{
 		struct starpu_task *new_task;
 		struct starpu_task *new_task;
+		struct _starpu_job *pre_sync_job, *post_sync_job;
 		wrapper->pre_sync_task = starpu_task_create();
 		wrapper->pre_sync_task = starpu_task_create();
 		wrapper->pre_sync_task->name = "_starpu_data_acquire_cb_pre";
 		wrapper->pre_sync_task->name = "_starpu_data_acquire_cb_pre";
 		wrapper->pre_sync_task->detach = 1;
 		wrapper->pre_sync_task->detach = 1;
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
 		wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
 		wrapper->pre_sync_task->callback_arg = wrapper;
 		wrapper->pre_sync_task->callback_arg = wrapper;
 		wrapper->pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
 		wrapper->pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
+		pre_sync_job = _starpu_get_job_associated_to_task(wrapper->pre_sync_task);
 		if (pre_sync_jobid)
 		if (pre_sync_jobid)
-			*pre_sync_jobid = _starpu_get_job_associated_to_task(wrapper->pre_sync_task)->job_id;
+			*pre_sync_jobid = pre_sync_job->job_id;
 
 
 		wrapper->post_sync_task = starpu_task_create();
 		wrapper->post_sync_task = starpu_task_create();
 		wrapper->post_sync_task->name = "_starpu_data_acquire_cb_post";
 		wrapper->post_sync_task->name = "_starpu_data_acquire_cb_post";
 		wrapper->post_sync_task->detach = 1;
 		wrapper->post_sync_task->detach = 1;
 		wrapper->post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
 		wrapper->post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
+		post_sync_job = _starpu_get_job_associated_to_task(wrapper->post_sync_task);
 		if (post_sync_jobid)
 		if (post_sync_jobid)
-			*post_sync_jobid = _starpu_get_job_associated_to_task(wrapper->post_sync_task)->job_id;
+			*post_sync_jobid = post_sync_job->job_id;
+
+		if (quick)
+			pre_sync_job->quick_next = post_sync_job;
 
 
 		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode);
 		new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
@@ -259,11 +265,18 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	return 0;
 	return 0;
 }
 }
 
 
+int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_handle_t handle, int node,
+							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
+							  int sequential_consistency, int quick)
+{
+	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, callback, arg, sequential_consistency, quick, NULL, NULL);
+}
+
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
 							  enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
 							  int sequential_consistency)
 							  int sequential_consistency)
 {
 {
-	return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, callback, arg, sequential_consistency, NULL, NULL);
+	return starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, node, mode, callback, arg, sequential_consistency, 0);
 }
 }
 
 
 
 
@@ -621,7 +634,7 @@ static void _starpu_data_wont_use(void *data)
 void starpu_data_wont_use(starpu_data_handle_t handle)
 void starpu_data_wont_use(starpu_data_handle_t handle)
 {
 {
 	_STARPU_TRACE_DATA_WONT_USE(handle);
 	_STARPU_TRACE_DATA_WONT_USE(handle);
-	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_R, _starpu_data_wont_use, handle);
+	starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_R, _starpu_data_wont_use, handle, 1, 1);
 }
 }
 
 
 /*
 /*

+ 2 - 1
src/drivers/driver_common/driver_common.c

@@ -35,7 +35,7 @@
 #define BACKOFF_MAX 32  /* TODO : use parameter to define them */
 #define BACKOFF_MAX 32  /* TODO : use parameter to define them */
 #define BACKOFF_MIN 1
 #define BACKOFF_MIN 1
 
 
-void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch STARPU_ATTRIBUTE_UNUSED, int rank, int profiling)
+void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, int rank, int profiling)
 {
 {
 	struct starpu_task *task = j->task;
 	struct starpu_task *task = j->task;
 	struct starpu_codelet *cl = task->cl;
 	struct starpu_codelet *cl = task->cl;
@@ -67,6 +67,7 @@ void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job
 			_starpu_clock_gettime(&worker->cl_start);
 			_starpu_clock_gettime(&worker->cl_start);
 			_starpu_worker_register_executing_start_date(workerid, &worker->cl_start);
 			_starpu_worker_register_executing_start_date(workerid, &worker->cl_start);
 		}
 		}
+		_starpu_job_notify_start(j, perf_arch);
 	}
 	}
 
 
 	if (starpu_top)
 	if (starpu_top)

+ 16 - 0
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -43,6 +43,8 @@
 #define DBL_MAX __DBL_MAX__
 #define DBL_MAX __DBL_MAX__
 #endif
 #endif
 
 
+#define NOTIFY_READY_SOON
+
 struct _starpu_dmda_data
 struct _starpu_dmda_data
 {
 {
 	double alpha;
 	double alpha;
@@ -990,6 +992,16 @@ static double dmda_simulate_push_sorted_decision_task(struct starpu_task *task)
 	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
 	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
 }
 }
 
 
+#ifdef NOTIFY_READY_SOON
+static void dmda_notify_ready_soon(void *data STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task, double delay)
+{
+	if (!task->cl)
+		return;
+	/* fprintf(stderr, "task %lu %p %p %s %s will be ready within %f\n", starpu_task_get_job_id(task), task, task->cl, task->cl->name, task->cl->model?task->cl->model->symbol : NULL, delay); */
+	/* TODO: do something with it */
+}
+#endif
+
 static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
@@ -1082,6 +1094,10 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 	starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
 	starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
 					    idle_power_minimum, idle_power_maximum, param_modified);
 					    idle_power_minimum, idle_power_maximum, param_modified);
 #endif /* !STARPU_USE_TOP */
 #endif /* !STARPU_USE_TOP */
+
+#ifdef NOTIFY_READY_SOON
+	starpu_task_notify_ready_soon_register(dmda_notify_ready_soon, dt);
+#endif
 }
 }
 
 
 static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
 static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)