티스토리 뷰
[C++] 멀티스레드 경쟁과 생상자-소비자 문제: 스핀락, 슬립락 구현 / 이벤트, condition_variable
jiggyjiggy 2024. 10. 4. 15:49개요
멀티 스레드 환경에서는 공유 변수(데이터) 에 대한 경쟁에 대한 처리 매커니즘이 필요하다
그를 위해, lock 을 이용한다
lock 을 이용하는데 사용되는 3가지 개념을 알아두자
Lock 구현 이론
(임계구역을 화장실이라 생각하자)
- 존버 메타 (화장실 앞에서 대기)
스핀락이 그 예시이다.- busy waiting 이지만, user level 에서만 동작하므로, 다른 스레드가 lock 점유 시간이 짧을 것이라는 확신이 있다면, 좋은 선택이 될 것이다
- busy waiting 이지만, user level 에서만 동작하므로, 다른 스레드가 lock 점유 시간이 짧을 것이라는 확신이 있다면, 좋은 선택이 될 것이다
- 랜덤 메타 (일단 자리로 돌아갔다올게)
잠시 슬립하여 CPU 타임 슬라이스를 포기하는 것이다.
이 메타는, 불확실성을 갖고있다. 슬립 사이에 다른 스레드가 들어갈 확률 존재한다 - 갑질 메타 (화장실이 비면 종업원에게 부탁하는거)
이벤트 사용으로 구현한다. (C++ 11 에서는 조건변수(condition variable)가 표준 스펙으로 채택되었다)- 2.번 3.번 은 컨텍스트 스위칭이 발생한다. 즉, kernel level 까지 전환되기때문에 시간이 더 걸린다. 따라서, 대기상황이 길어질 것이라면 좋은 선택이 될 것이다.
보통 3가지의 개념을 섞어서 쓴다.
mutex 기반의 lock test
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>
int sum = 0;
std::mutex mtx;
void Add()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<std::mutex> guard(mtx);
++sum;
}
}
void Sub()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<std::mutex> guard(mtx);
--sum;
}
}
int main()
{
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
std::cout << sum << std::endl;
assert(sum == 0);
return 0;
}
존버 메타 (Spinlock 구현)
테스트코드
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>
int sum = 0;
std::mutex mtx;
SpinLock spinLock;
void Add()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
++sum;
}
}
void Sub()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
--sum;
}
}
int main()
{
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
std::cout << sum << std::endl;
assert(sum == 0);
return 0;
}
ver 1. 🔴
class SpinLock
{
public:
void lock()
{
while (_locked)
{
}
_locked = true;
}
void unlock()
{
_locked = false;
}
private:
bool _locked = false;
};
while (_locked)
{
}
이 부분에서 컴파일러가 최적화해버린다. _locked를 false로
ver 2. 🔴
C++ 에서는 volatile 은 “(상수 폴딩)최적화하지 말아주세요” 의 의미를 갖는다
class SpinLock
{
public:
void lock()
{
while (_locked)
{
}
_locked = true;
}
void unlock()
{
_locked = false;
}
private:
volatile bool _locked = false;
};
- 그러나 정상동작하지 않는다
- “lock 상태 체크”와 “lock 상태 업데이트”가 원자적 연산이 아님
- void lock() { // lock 상태 체크 while (_locked) { } // 여러 스레드에서 상태 체크를 동시에 통과할 수 있음 // lock 상태 업데이트 _locked = true; }
- 여러 스레드에서 상태 체크를 동시에 통과할 수 있음
CAS(Compare-And-Swap) 연산 적용. atomic 클래스에서 메서드로 제공되고 있음
ver3. 🟢
atomic 적용 (해당 클래스 내부에서 CAS 지원)
코드로 보는 CAS
bool expected = false;
bool desired = true;
_locked.compare_exchange_strong(expected, desired);
- _locked 가 false(expected)면 true(desire)로 바꿔주세요~
CAS 연산을 의사코드로 나타내면 아래와 같다
if (_locked == expected)
{
expected = _locked;
_locked = desired;
return true;
}
else
{
expected = _locked;
return false;
}
따라서, lock 상태 체크 및 획득을 아래와 같이, while 문으로 사용하게된다
while (_locked.compare_exchange_strong(expected, desired) == false)
{
}
🚨 다만 주의할 점이 있는데, expected가 _locked 값으로 대입된다 (의사코드를 보라)
따라서, 다시 할당해줘야한다
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
}
CAS 를 적용한 SpinLock의 구현코드
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
}
}
void unlock()
{
_locked = false;
}
private:
std::atomic<bool> _locked = false;
};
여기서 unlock() 을 약간 리팩토링(store 메서드 적용)하면, 아래와 같다
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
}
}
void unlock()
{
_locked.store(false);
}
private:
std::atomic<bool> _locked = false;
};
전체 코드
#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
}
}
void unlock()
{
_locked.store(false);
}
private:
std::atomic<bool> _locked = false;
};
int sum = 0;
std::mutex mtx;
SpinLock spinLock;
void Add()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
++sum;
}
}
void Sub()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
--sum;
}
}
int main()
{
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
std::cout << sum << std::endl;
assert(sum == 0);
return 0;
}
랜덤 메타 (Sleep)
- 슬립을 이용해서 , CPU 스케줄링의 타임 슬라이스 를 포기함.. 혹은 소유권을 포기했다고 표현할수도 있겠다
- spinlock의 경우는 타임슬라이스 내에서 무한루프로 계속 시도하는 것이다
- 타임 슬라이스를 포기하는 방법에는 sleep, yield 가 존재한다
- 개념상으로는 sleep_for(0ms) == yield() 인 것이다
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
이것을 spinlock 구현에 더하면, 구현하고자한 개념이다
- 하나 기억하고 넘어갈 것이 있다. 커널모드와 유저모드 전환은 비용이 꽤 된다. 따라서 시스템콜 호출은 최대한 자제하는 것이 좋다. 특히나, cout 을 박아버리면 환장할 노릇 일 것이다
전체코드
#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>
class SpinLock
{
public:
void lock()
{
bool expected = false;
bool desired = true;
while (_locked.compare_exchange_strong(expected, desired) == false)
{
expected = false;
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::yield();
}
}
void unlock()
{
_locked.store(false);
}
private:
std::atomic<bool> _locked = false;
};
int sum = 0;
std::mutex mtx;
SpinLock spinLock;
void Add()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
++sum;
}
}
void Sub()
{
for (int i = 0; i < 100'000; ++i)
{
std::lock_guard<SpinLock> guard(spinLock);
--sum;
}
}
int main()
{
std::thread t1(Add);
std::thread t2(Sub);
t1.join();
t2.join();
std::cout << sum << std::endl;
assert(sum == 0);
return 0;
}
갑질 메타 (event, condition variable)
- 내 차례(lock release 시) 커널을 이용하여 event를 받는 것
- 유저레벨에서는 스레드 순서에 대한 결론을 내릴 수 없으므로, 상위 개념에서 관리하는 것
event 개념으로
- auto reset event
- manual reset event
를 생각할 수 있다.
event 자체는 true/false 값이다
release 시 커널의 event 를
커널을 이용(컨텍스트 스위칭, 커널 리소스 소요)하므로, 어느정도 대기가 있다고 하면 좋은 선택이 될것이다
event는 락을 대상하지 않고, 일반적인 상황으로 실습해보자
spinlock 과 차이를 숙지하자
상황
#include <mutex>
std::mutex mtx;
void Producer()
{
}
void Consumer()
{
}
int main()
{
return 0;
}
- 한쪽 스레드에서는 데이터 생산
- 한쪽 스레드에서는 데이터 소비
ex)
한쪽 (클라이언트에서 데이터를 수신받는) 스레드에서는 데이터를 큐에 밀어넣고
다른쪽 (게임 컨텐츠를 관리하는) 스레드에서 관련된 내용들을 추출
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
std::mutex mtx;
std::queue<int> q;
void Producer()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Consumer()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
if (q.empty() == false)
{
int data = q.front();
q.pop();
std::cout << data << std::endl;
}
}
}
int main()
{
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
return 0;
}
- 참고로 unique_lock == lock_guard + 제어옵션(defer 등) 이라고 생각하면된다
- 위 동작에서 아쉬운 점이 존재한다.
- 지금은 0.1초마다 데이터를 넣고있는데, 그게 아니라 정말 어쩌다가 한번 넣는 상황이라고 가정해보자
- 100ms → 10000ms
- consumer 스레드들은 그걸 모르기 때문에 while 문을 돌면서 계속 체크할 수 밖에 없다
- 또한, 공유데이터에 접근하므로 lock 까지 걸고 있다
- 의미없는 CPU 점유
- 지금은 0.1초마다 데이터를 넣고있는데, 그게 아니라 정말 어쩌다가 한번 넣는 상황이라고 가정해보자
event !
event 세팅
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
std::mutex mtx;
std::queue<int> q;
HANDLE handle;
void Producer()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Consumer()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
if (q.empty() == false)
{
int data = q.front();
q.pop();
std::cout << data << std::endl;
}
}
}
int main()
{
// 커널 오브젝트
// Usage Count
// Signal (파란불) / Non-Signal (빨간불) << bool
// event이므로
// Auto / Manual << bool
handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*bManualReset*/, FALSE/*bInitialState*/, NULL);
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
::CloseHandle(handle);
return 0;
}
producer 에서 데이터 밀어넣고, Signal 상태로 업데이트
void Producer()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
::SetEvent(handle);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
consumer 에서는 signal 이 없다면 재워지게됨.
void Consumer()
{
while (true)
{
::WaitForSingleObject(handle, INFINITY);
std::unique_lock<std::mutex> lock(mtx);
if (q.empty() == false)
{
int data = q.front();
q.pop();
std::cout << data << std::endl;
}
}
}
- signal 상태가 된다면 깨워짐
- 또한, createEvenet때, 3번째 인자(bManualReset) 을 FALSE, 즉 autoReset 이므로 , 깨워진 후에는 signal 상태가 Non-signal 이 된다
전체코드
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <windows.h>
std::mutex mtx;
std::queue<int> q;
HANDLE handle;
void Producer()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
::SetEvent(handle);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Consumer()
{
while (true)
{
::WaitForSingleObject(handle, INFINITY);
std::unique_lock<std::mutex> lock(mtx);
if (q.empty() == false)
{
int data = q.front();
q.pop();
std::cout << data << std::endl;
}
}
}
int main()
{
// 커널 오브젝트
// Usage Count
// Signal (파란불) / Non-Signal (빨간불) << bool
// event이므로
// Auto / Manual << bool
handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*bManualReset*/, FALSE/*bInitialState*/, NULL);
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
::CloseHandle(handle);
return 0;
}
🚧 event 는 C++ 표준이 아니기에 다른 OS(리눅스, macOS 등)로 포팅하려면 추가적인 작업이 필요할 것이다.
불안정한 부분
- 100% 확률로 sleep을 오래한다고하면 이상하지 않지만, producer 코드를 바쁘게 데이터를 밀어넣는다고하자
void Producer()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
::SetEvent(handle);
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
- consumer 코드에서 큐 사이즈를 읽는 코드가 있다고 할때, 정확하게 읽을 수 있을까?
void Consumer()
{
while (true)
{
::WaitForSingleObject(handle, INFINITY);
std::unique_lock<std::mutex> lock(mtx);
if (q.empty() == false)
{
int data = q.front();
q.pop();
// std::cout << data << std::endl;
std::cout << q.size() << std::endl;
}
}
}
- 기존 로직을 생각하면 1개 생성 & 1개 소비이므로 0 이어야 하지만, q 의 사이즈가 무한으로 커진다
이는 , 이벤트와 그외 연산이 원자작 연산이 아니기 때문에 그러하다. 즉, consumer 스레드가 깨어나고, 처리하는 코드 사이에, producer 스레드에서 데이터를 계속 밀어넣고 있는 것이다
- 꼼수로 당장의 상황을 해결하는 방법을 생각해보면, lock 획득 후 큐에 쌓인 데이터를 모두 처리하는 것도 일종의 방법일 것이다
- if (q.empty() == false) 을 while (q.empty() == false)
🔥 위 상황을 condition_variable 를 이용하여 해결해보자
condition variable
condition variable 은 C++ 표준 스펙이다.
#incldue <mutex>
std::condition_variable cv;
- Kernel-level Object가 아닌, User-level Object 이다.
- 즉, 동일한 프로그램 내에서 처리가 가능한 것이다.
- 이를 좀더 풀어쓰면, Kernel-level Object으로는 다른 프로그램 간에도 동기화가 가능하지만, User-level Object 기반이므로 그건 안된다는 것이다
- condition_variable은 lock 과 짝을 지어서 동작하게 된다
- 전형적인 4단계가 있다.
- Lock을 잡고
- 공유변수 값을 수정
- Lock을 풀고
- 조건변수를 통해, 다른 스레드에게 통지
producer 코드
void Producer()
{
while (true)
{
// 1) Lock을 잡고
// 2) 공유 변수 값을 수정
// 3) Lock을 풀고
// 4) 조건변수를 통해, 다른 스레드에게 통지
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
}
cv.notify_one(); // wait 중인 스레드가 있으면, 딱 1개를 깨운다
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
👁️ consumer 코드
void Consumer()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []() { return q.empty() == false; });
if (q.empty() == false)
{
int data = q.front();
q.pop();
// std::cout << data << std::endl;
std::cout << q.size() << std::endl;
}
}
}
- 코드 분석
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []() { return q.empty() == false; });
두 줄의 동작을 분석해보자
- wait 메서드 (1번 인자: lock, 2번 인자: 깨어날 조건(aka. 탈출조건))
void condition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred);
- Lock을 잡으려고 시도한다
- 이미 lock이 잡혀있는 상태라면 넘어감
- 조건 확인
- 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
- 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
- lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다
- 전체적으로 볼때, 핵심은
- lock 이 전체적으로, 끝까지 잡힌 것이 아니라,
- wait에 의해서 조건에 따라 lock을 잡거나 푸는 것이다
- wait 상태일때, notify_one() 이 호출되면, wait 중인 스레드가 깨어나서 다시 처음스텝부터 반복한다
- 또한, condition_variable을 사용하므로, if (q.empty() == false) 문이 이미 처리된 것이다.
void Consumer()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []() { return q.empty() == false; });
// 1) Lock을 잡고
// 이미 lock이 잡혀있는 상태라면 넘어감
// 2) 조건 확인
// - 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
// - 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
// - lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다
{
int data = q.front();
q.pop();
// std::cout << data << std::endl;
std::cout << q.size() << std::endl;
}
}
}
생각해볼 문제 2가지
Q1. notify_one()을 했다면 consume 조건을 항상 만족하는가?
- notify_one() 으로 wait 인 스레드를 깨운 것이니, 굳이 상태체크가 필요없지 않을까?
A. 그렇지 않다 (Spurious Wakeup; 가짜 기상)
- 조건변수를 notify_one() 하는 시점에 Lock을 잡고있지 않다
- 깨어난 스레드가 다시 Lock을 잡을때까지, 다른 스레드가 개입해서 조건을 영향줄 수 있다.
- 따라서, []() { return q.empty() == false; } 조건 검사를 하는 것이다
Q2. Lock 범위에서 notify_one 해주면 어떨까?
{
std::unique_lock<std::mutex> lock(mtx);
q.push(100);
cv.notify_one();
}
- producer의 전형적 4단계의 순서를 지켜야하는가?
- Lock을 풀지 않고, 4) 조건변수를 통해, 다른 스레드에게 통지 하면 어떨까?
A. 동작은 잘 된다. 다만, 문제가 존재한다
- 문제:
- Lock을 잡은 상태에서 notify 하면
- consume에서 동작을 생각해보자
- wait에서 깨어나서 첫번째로 하는것은, “Lock 획득 시도”
- 그런데, producer에서 lock이 풀린 상태가 아니라면, 경합에 들어가는 것이다!
- 즉 불필요한 경합이 있는 것이다
테스트용 전체 코드
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <cassert>
std::mutex mtx;
std::queue<int> q;
// #incldue <mutex>
// User-level Object 이다. (커널 오브젝트가 아님)
std::condition_variable cv;
void Producer()
{
// while (true)
for (int i = 0; i < 10; ++i) // 10개의 데이터를 생산
{
// 1) Lock을 잡고
// 2) 공유 변수 값을 수정
// 3) Lock을 풀고
// 4) 조건변수를 통해, 다른 스레드에게 통지
{
std::unique_lock<std::mutex> lock(mtx);
q.push(i);
std::cout << "Produced: " << i << std::endl; // 디버그 출력
}
cv.notify_one(); // wait 중인 스레드가 있으면, 딱 1개를 깨운다
}
}
void Consumer()
{
// while (true)
for (int i = 0; i < 10; ++i) // 10개의 데이터를 소비
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []()
{ return q.empty() == false; });
// 1) Lock을 잡고
// 이미 lock이 잡혀있는 상태라면 넘어감
// 2) 조건 확인
// - 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
// - 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
// - lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다
{
int data = q.front();
q.pop();
std::cout << "Consumed: " << data << std::endl; // 디버그 출력
// assert: 소비한 데이터가 i와 일치하는지 확인
assert(data == i);
// std::cout << q.size() << std::endl;
}
}
}
int main()
{
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
return 0;
}
event 와 굉장히 유사한 내용이지만, 좀 더 우아하게 동작된다. C++ 11 부터 표준스펙에 들어왓으므로, condition_variable(조건변수)를 사용하면 된다.
'C・C++ > 멀티스레드' 카테고리의 다른 글
[C++] 멀티 스레드로 소수 구하기 (0) | 2024.10.05 |
---|---|
[C++] Reader-Writer Lock 구현 (0) | 2024.10.05 |
[C++] Thread Local Storage (aka. TLS) (0) | 2024.10.04 |
[C++] Future (3) | 2024.10.04 |
[C++] Lock-Based Stack 구현 (1) | 2024.10.03 |
- Total
- Today
- Yesterday
- condition variable
- JPA
- S4
- Dispatcher Servlet
- thread
- S1
- Memory
- CPU
- 톰캣11
- generic swap
- generic sort
- servlet
- 객체 변조 방어
- 백준
- core c++
- Java
- PS
- pocu
- sleep lock
- C
- 연관관계 편의 메서드
- 개발 공부 자료
- 논문추천
- tomcat11
- OOP
- reader-writer lock
- 이진탐색
- tree
- 엔티티 설계 주의점
- Spring MVC
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |