diff --git a/mindspore/ccsrc/backend/common/mem_reuse/mem_dynamic_allocator.cc b/mindspore/ccsrc/backend/common/mem_reuse/mem_dynamic_allocator.cc index d4b327f55092fe5446aef45262983e5319b66dac..ed96d0a9ea6a2517e7d8531a4901b205fa63eed6 100644 --- a/mindspore/ccsrc/backend/common/mem_reuse/mem_dynamic_allocator.cc +++ b/mindspore/ccsrc/backend/common/mem_reuse/mem_dynamic_allocator.cc @@ -106,19 +106,8 @@ DeviceMemPtr DynamicMemPoolBestFit::AllocTensorMem(size_t size, bool from_persis } if (device_addr == nullptr) { - MS_LOG(WARNING) << "Alloc tensor mem failed and try to wait events to release more memory."; - // Since address may be duplicate, use set. - std::set carry_event_addresses; - for (const auto &stream_pair_address : stream_pair_addresses_) { - for (const auto &address : stream_pair_address.second) { - (void)carry_event_addresses.emplace(address); - } - } - for (auto &address : carry_event_addresses) { - if (address->WaitAllEvents() && address->status_ == DynamicMemBufStatus::kMemBufUsedByEvent) { - FreeTensorMemInner(address->device_addr_); - } - } + MS_LOG(WARNING) << "Alloc tensor mem failed and try to sync all events to release memory."; + SyncAllEventsInner(); device_addr = FindAvailableMemBuf(align_size, from_persistent_mem, stream_id); } @@ -1060,24 +1049,30 @@ bool DynamicMemPoolBestFit::WaitEvent(int64_t task_id_on_stream, uint32_t memory return true; } -bool DynamicMemPoolBestFit::WaitAllEvents() { +bool DynamicMemPoolBestFit::SyncAllEvents() { #ifdef __APPLE__ std::lock_guard spin_lock(spin_lock_); #else std::lock_guard locker(mutex_); #endif - MS_LOG(DEBUG) << "Wait events, stream_pair_addresses_ size : " << stream_pair_addresses_.size(); - for (auto &stream_pair_addresses : stream_pair_addresses_) { - auto addresses = stream_pair_addresses.second; - MS_LOG(DEBUG) << "addresses size : " << addresses.size(); - for (const auto &address : addresses) { - if (!address->WaitAllEvents()) { - continue; - } - stream_pair_addresses.second.erase(address); - if (address->status_ == DynamicMemBufStatus::kMemBufUsedByEvent) { - FreeTensorMemInner(address->device_addr_); - } + return SyncAllEventsInner(); +} + +bool DynamicMemPoolBestFit::SyncAllEventsInner() { + MS_LOG(INFO) << "Sync all events, stream_pair_addresses_ size : " << stream_pair_addresses_.size() << "."; + if (stream_pair_addresses_.empty()) { + return false; + } + + std::set carry_event_addresses; + for (const auto &stream_pair_address : stream_pair_addresses_) { + for (const auto &address : stream_pair_address.second) { + (void)carry_event_addresses.emplace(address); + } + } + for (auto &address : carry_event_addresses) { + if (address->SyncAllEvents() && address->status_ == DynamicMemBufStatus::kMemBufUsedByEvent) { + FreeTensorMemInner(address->device_addr_); } } @@ -1169,8 +1164,8 @@ bool DynamicMemBuf::WaitEvent(uint32_t task_id_on_stream, uint32_t user_stream_i bool DynamicMemBuf::IsEventNotUsed() { return events_ == nullptr ? true : events_->empty(); } -bool DynamicMemBuf::WaitAllEvents() { - MS_LOG(DEBUG) << "Wait all events for address : " << device_addr_ << "."; +bool DynamicMemBuf::SyncAllEvents() { + MS_LOG(INFO) << "Sync all events for address : " << device_addr_ << "."; if (IsEventNotUsed()) { return false; } @@ -1180,18 +1175,19 @@ bool DynamicMemBuf::WaitAllEvents() { MS_EXCEPTION_IF_NULL(event_list); for (auto list_iter = event_list->begin(); list_iter != event_list->end();) { auto &event = list_iter->second; - if (event->QueryEvent()) { - // event is completed, erase event in list. - list_iter = event_list->erase(list_iter); - } else { - list_iter++; + MS_LOG(DEBUG) << "Query event : " << event << "."; + if (!event->QueryEvent()) { + // Sync event if event is not arrived. + MS_LOG(DEBUG) << "Sync event : " << event << "."; + event->SyncEvent(); } + list_iter = event_list->erase(list_iter); } if (event_list->empty()) { // list is empty, erase list in map. iter = events_->erase(iter); } else { - iter++; + MS_LOG(INTERNAL_EXCEPTION) << "Event list is not empty."; } } return events_->empty(); diff --git a/mindspore/ccsrc/include/backend/mem_reuse/mem_dynamic_allocator.h b/mindspore/ccsrc/include/backend/mem_reuse/mem_dynamic_allocator.h index a90c4f9ac82002c3d2d551ef685f9b81d5004a29..1b300969493ce67336c9dcb7062ab923824b5340 100644 --- a/mindspore/ccsrc/include/backend/mem_reuse/mem_dynamic_allocator.h +++ b/mindspore/ccsrc/include/backend/mem_reuse/mem_dynamic_allocator.h @@ -142,7 +142,7 @@ class BACKEND_EXPORT DynamicMemPoolBestFit { const DeviceEventPtr &event); bool WaitEvent(int64_t task_id_on_stream, uint32_t user_stream_id, uint32_t memory_stream_id); bool WaitEvent(int64_t task_id_on_stream, uint32_t memory_stream_id); - bool WaitAllEvents(); + bool SyncAllEvents(); #ifdef WITH_BACKEND protected: @@ -208,6 +208,8 @@ class BACKEND_EXPORT DynamicMemPoolBestFit { std::tuple FindByKeepAddr( const DeviceMemPtr &device_addr) const; DynamicMemBufPtr FindMemBufByKeepAddr(const DeviceMemPtr &device_addr, const DynamicMemBlockPtr &mem_block) const; + // Sync all events inner without lock. + bool SyncAllEventsInner(); #ifdef __APPLE__ // There are some problems with using mutex on Mac, use spinlocks instead. @@ -280,8 +282,8 @@ struct DynamicMemBuf { // Indidates if mem buf used by event, return true when no event bind on mem buf. bool IsEventNotUsed(); - // Wait all events that bound on mem buf. - bool WaitAllEvents(); + // Sync all events that bound on mem buf. + bool SyncAllEvents(); DeviceMemPtr device_addr_; DynamicMemBufStatus status_; diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.cc b/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.cc index b3d4d5e2352ddce41dd64a01c212cdb4cf065d9b..48f82cc9df25fa2ab2e12c9d8dfaab201e136cf0 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.cc +++ b/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.cc @@ -16,6 +16,7 @@ #include #include "plugin/device/ascend/hal/device/ascend_memory_manager.h" #include "plugin/device/ascend/hal/device/ascend_memory_adapter.h" +#include "plugin/device/ascend/hal/device/ascend_stream_manager.h" #include "utils/ms_context.h" #ifndef ENABLE_SECURITY #include "plugin/device/ascend/hal/profiler/memory_profiling.h" @@ -49,13 +50,23 @@ uint64_t AscendMemoryManager::GetMsUsedHbmSize() const { return AscendMemAdapter void *AscendMemoryManager::MallocMemFromMemPool(size_t size, bool from_persistent_mem, bool need_recycle, uint32_t stream_id) { auto align_size = GetCommonAlignSize(size); - const auto device_addr = + void *device_addr = AscendMemoryPool::GetInstance().AllocTensorMem(align_size, from_persistent_mem, need_recycle, stream_id); - return device_addr; -} - -void *AscendMemoryManager::MallocOverflowMemFromMemFromMemPool(size_t size, bool from_persistent_mem) const { - const auto device_addr = AscendMemoryPool::GetInstance().AllocOverflowTensorMem(size, from_persistent_mem); + if (device_addr != nullptr) { + return device_addr; + } + MS_LOG(INFO) << "Alloc tensor mem failed, will sync streams and try to alloc agagin later."; + AscendStreamMng::GetInstance().SyncAllStreams(); + const int32_t max_alloc_retries = 2; + for (int32_t i = 0; i < max_alloc_retries; i++) { + MS_LOG(INFO) << "Try to alloc tensor mem again, count : " << i + 1 << "."; + std::this_thread::yield(); + device_addr = + AscendMemoryPool::GetInstance().AllocTensorMem(align_size, from_persistent_mem, need_recycle, stream_id); + if (device_addr != nullptr) { + break; + } + } return device_addr; } @@ -110,19 +121,6 @@ uint8_t *AscendMemoryManager::MallocDynamicMem(size_t size, bool communication_m return communication_mem ? alloc_address + kMemAlignSize : alloc_address; } -// communication memory: [512align_size + data + 512align_size] -// return the pointer to the start of data address. -uint8_t *AscendMemoryManager::MallocCommunicationMemFromMemPool(size_t size, uint32_t stream_id) { - auto align_size = GetCommunicationAlignSize(size); - uint8_t *base_ptr = - reinterpret_cast(AscendMemoryPool::GetInstance().AllocTensorMem(align_size, false, false, stream_id)); - if (base_ptr != nullptr) { - return base_ptr + kMemAlignSize; - } - MS_LOG(EXCEPTION) << "#umsg#Framework Error Message:#umsg#Fail to alloc memory, size: " << align_size - << "B, memory statistics:" << AscendMemAdapter::GetInstance().DevMemStatistics(); -} - bool AscendMemoryManager::MallocContinuousMemFromMemPool(const DeviceAddressPtrList &addr_list, size_t /* total_size */, std::vector size_list, uint32_t stream_id) { auto device_ptr_list = MallocContinuousMemFromMemPool(size_list, stream_id); diff --git a/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.h b/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.h index 7dda92a289365d384ec345c430a082371905a2f7..b3e6539264fcb0119c7d07d8103ff4a9f4627701 100644 --- a/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.h +++ b/mindspore/ccsrc/plugin/device/ascend/hal/device/ascend_memory_manager.h @@ -35,11 +35,9 @@ class AscendMemoryManager : public MemoryManager { void ClearGlobalIdleMem() override; void *MallocMemFromMemPool(size_t size, bool from_persistent_mem, bool need_recycle = false, uint32_t stream_id = kDefaultStreamIndex) override; - void *MallocOverflowMemFromMemFromMemPool(size_t size, bool from_persistent_mem) const; void FreeMemFromMemPool(void *device_ptr) override; size_t GetMaxUsedMemorySize() const override; uint64_t GetMsMaxMemSize() const; - uint8_t *MallocCommunicationMemFromMemPool(size_t size, uint32_t stream_id = kDefaultStreamIndex) override; bool MallocContinuousMemFromMemPool(const DeviceAddressPtrList &addr_list, size_t total_size, std::vector size_list, uint32_t stream_id = kDefaultStreamIndex) override; std::vector MallocContinuousMemFromMemPool(const std::vector &size_list, diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_event.h b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_event.h index 866d9c2cb2e46edb9b716368f3777e88a4374dd8..3f0c8860851de362605faeb23364b2546fb3ce0e 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_event.h +++ b/mindspore/ccsrc/plugin/device/gpu/hal/device/gpu_event.h @@ -18,6 +18,7 @@ #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_GPU_EVENT_H_ #include +#include #include "ir/device_event.h" namespace mindspore::device::gpu { diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.cc b/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.cc index 63e990a526a88fcb24459416ff44d12b0b3a10ab..feafc64658d14dc2a5fc246b9ac5d40cbc0ba6c9 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.cc +++ b/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.cc @@ -984,6 +984,21 @@ uint32_t GPUKernelExecutor::GetRankID() const { return rank_id; } +DeviceEventPtr GPUDeviceResManager::CreateRuntimeEvent(bool enable_blocking, bool enable_record_wait) { + if (!enable_blocking && !enable_record_wait) { + MS_LOG(INTERNAL_EXCEPTION) << "Bad parameters, enable_blocking is false and enable_record_wait is false."; + } + uint32_t flag = cudaEventDefault; + flag |= cudaEventDisableTiming; + if (enable_blocking) { + flag |= cudaEventBlockingSync; + } + if (enable_record_wait) { + flag |= (cudaEventRecordDefault | cudaEventRecordExternal | cudaEventWaitExternal); + } + return std::make_shared(flag); +} + DeviceEventPtr GPUDeviceResManager::CreateEventWithFlag(bool enable_timing, bool blocking) { uint32_t flag = (blocking ? cudaEventBlockingSync : cudaEventDefault) | (enable_timing ? cudaEventDefault : cudaEventDisableTiming); diff --git a/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.h b/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.h index d5f89c25f7be6e1174bd39d157d203b72a2e8422..76f8c32408acefc91708551e047614d80655d86a 100644 --- a/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.h +++ b/mindspore/ccsrc/plugin/device/gpu/hal/hardware/gpu_device_context.h @@ -66,6 +66,9 @@ class GPUDeviceResManager : public DeviceResManager { bool SyncNotDefaultStreams() const override; size_t DefaultStream() const override; + // Create device event for runtime. + DeviceEventPtr CreateRuntimeEvent(bool enable_blocking, bool enable_record_wait) override; + DeviceEventPtr CreateEventWithFlag(bool enable_timing, bool blocking) override; bool LoadCollectiveCommLib() override; diff --git a/mindspore/ccsrc/runtime/device/kernel_runtime.h b/mindspore/ccsrc/runtime/device/kernel_runtime.h index 32da51643990d5f406288edf2ac3eb9c8da31d21..7a585a03d7adf810c62d40ce76e737315eb567ad 100644 --- a/mindspore/ccsrc/runtime/device/kernel_runtime.h +++ b/mindspore/ccsrc/runtime/device/kernel_runtime.h @@ -88,9 +88,6 @@ class BACKEND_EXPORT KernelRuntime { uint8_t *MallocMem(MemType type, size_t size, const DeviceAddressPtr &address) { return mem_manager_->MallocMem(type, size, address); } - uint8_t *MallocCommunicationMemFromMemPool(size_t size, uint32_t stream_id = kDefaultStreamIndex) { - return mem_manager_->MallocCommunicationMemFromMemPool(size, stream_id); - } bool MallocContinuousMemFromMemPool(const DeviceAddressPtrList &addr_list, size_t total_size, const std::vector &size_list, uint32_t stream_id = kDefaultStreamIndex) { return mem_manager_->MallocContinuousMemFromMemPool(addr_list, total_size, size_list, stream_id); diff --git a/mindspore/ccsrc/runtime/device/memory_manager.h b/mindspore/ccsrc/runtime/device/memory_manager.h index 05f1d2e157ec5dd690fc77cc7776b5ab5938ccee..4cfbad8dc66358e176696706866c32ca1daac88e 100644 --- a/mindspore/ccsrc/runtime/device/memory_manager.h +++ b/mindspore/ccsrc/runtime/device/memory_manager.h @@ -58,9 +58,6 @@ class BACKEND_EXPORT MemoryManager { virtual void *MallocMemFromMemPool(size_t size, bool from_persistent_mem, bool need_recycle = false, uint32_t stream_id = kDefaultStreamIndex); virtual size_t GetMaxUsedMemorySize() const { return 0; } - virtual uint8_t *MallocCommunicationMemFromMemPool(size_t size, uint32_t stream_id = kDefaultStreamIndex) { - return nullptr; - } virtual void FreeMemFromMemPool(const DeviceAddressPtr address); virtual void FreeMemFromMemPool(void *device_ptr); virtual bool MallocContinuousMemFromMemPool(const DeviceAddressPtrList &addr_list, size_t total_size, @@ -105,12 +102,12 @@ class BACKEND_EXPORT MemoryManager { } return memory_pool_->WaitEvent(task_id_on_stream, memory_stream_id); } - bool WaitAllEvents() { + bool SyncAllEvents() { if (memory_pool_ == nullptr) { MS_LOG(WARNING) << "memory_pool_ is nullptr."; return false; } - return memory_pool_->WaitAllEvents(); + return memory_pool_->SyncAllEvents(); } protected: diff --git a/mindspore/ccsrc/runtime/device/multi_stream_controller.cc b/mindspore/ccsrc/runtime/device/multi_stream_controller.cc index 36851d029eb6615d970799dd82bd989acc5494e5..01dfb14683200063f2c479a43431fde6db1e4d53 100644 --- a/mindspore/ccsrc/runtime/device/multi_stream_controller.cc +++ b/mindspore/ccsrc/runtime/device/multi_stream_controller.cc @@ -82,9 +82,7 @@ bool MultiStreamController::RecordEvent(const DeviceContext *device_context, int } auto event = device_context->device_res_manager_->CreateRuntimeEvent(false, true); - if (event == nullptr) { - return true; - } + MS_EXCEPTION_IF_NULL(event); event->RecordEvent(user_stream_id); // Record event on mem buf. return mem_manager->RecordEvent(task_id_on_stream, user_stream_id, memory_stream_addresses, event); @@ -144,7 +142,7 @@ bool MultiStreamController::SyncAllStreams(const DeviceContext *device_context) bool ret = device_res_manager->SyncAllStreams(); auto mem_manager = device_res_manager->mem_manager(); if (mem_manager != nullptr) { - mem_manager->WaitAllEvents(); + mem_manager->SyncAllEvents(); } return ret; } @@ -214,7 +212,7 @@ DeviceEventPtr EventPool::Get() { // Try to create event firstly before reached core size. if (size_ < core_size_) { auto created_event = event_creator_(); - if (created_event->IsReady()) { + if (created_event != nullptr && created_event->IsReady()) { cached_events_.push_back(created_event); size_++; event = created_event.get(); @@ -239,7 +237,7 @@ DeviceEventPtr EventPool::Get() { // Reuse failed, try to create more event. if (event == nullptr) { auto created_event = event_creator_(); - if (created_event->IsReady()) { + if (created_event != nullptr && created_event->IsReady()) { cached_events_.push_back(created_event); event = created_event.get(); size_++; diff --git a/mindspore/ccsrc/runtime/device/stream_synchronizer.cc b/mindspore/ccsrc/runtime/device/stream_synchronizer.cc index 3f6631eec1d3e069b02cfe6dd2bf37b8836ccfd0..c1ae49b2778b2ba35fc700f7638cfe9f9952de96 100644 --- a/mindspore/ccsrc/runtime/device/stream_synchronizer.cc +++ b/mindspore/ccsrc/runtime/device/stream_synchronizer.cc @@ -15,9 +15,11 @@ */ #include "runtime/device/stream_synchronizer.h" -#include "utils/ms_context.h" + #include "include/backend/distributed/collective/collective_manager.h" #include "include/backend/distributed/recovery/recovery_context.h" +#include "runtime/device/multi_stream_controller.h" +#include "utils/ms_context.h" namespace mindspore { namespace device { @@ -61,7 +63,8 @@ bool StreamSynchronizer::SyncStream(const std::string &device_name, uint32_t tim // If disable recovery or timeout==0, sync stream directly to improve performance. if (!RecoveryContext::GetInstance()->enable_recovery() || timeout == 0) { device_context->Initialize(); - return device_context->device_res_manager_->SyncAllStreams(); + auto multi_stream_controller = MultiStreamController::GetInstance(); + return multi_stream_controller->SyncAllStreams(device_context); } std::unique_lock lock(task_mutex_); diff --git a/mindspore/ccsrc/runtime/graph_scheduler/actor/kernel_actor.cc b/mindspore/ccsrc/runtime/graph_scheduler/actor/kernel_actor.cc index abc22aae1fda760a4296cf5d0c8a060e1ffe8f70..90ed7d15bd16c06b9d2cd9a0d66c47b308785d62 100644 --- a/mindspore/ccsrc/runtime/graph_scheduler/actor/kernel_actor.cc +++ b/mindspore/ccsrc/runtime/graph_scheduler/actor/kernel_actor.cc @@ -939,8 +939,9 @@ void KernelActor::ProcessMultiStream(OpContext *const context) { auto user_stream_id = stream_id; auto memory_stream_id = cross_stream_kernel_tensor->stream_id(); if (cross_stream_kernel_tensor->task_id_on_stream() == nullptr) { - MS_LOG(INTERNAL_EXCEPTION) << "Cross_stream_kernel_tensor : " << cross_stream_kernel_tensor - << " task id on stream is nullptr."; + MS_LOG(WARNING) << "Cross_stream_kernel_tensor : " << cross_stream_kernel_tensor + << " task id on stream is nullptr, will skip multi stream process."; + continue; } auto memory_task_id_on_stream = *cross_stream_kernel_tensor->task_id_on_stream(); auto safe_task_id_on_stream = diff --git a/mindspore/core/utils/ms_utils.h b/mindspore/core/utils/ms_utils.h index 874ad08a40e8261a2c2240e32012626b938b5e2e..8cc80766cb9ded94d24296547079f51311be202a 100644 --- a/mindspore/core/utils/ms_utils.h +++ b/mindspore/core/utils/ms_utils.h @@ -99,6 +99,7 @@ static inline void SetOMPThreadNum() { std::string OMP_env = std::to_string(OMP_thread_num); (void)SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0); + (void)SetEnv("GRAPH_OP_RUN", "1", 0); } static inline bool IsLittleByteOrder() {