티스토리 뷰

개요

멀티 스레드 환경에서는 공유 변수(데이터) 에 대한 경쟁에 대한 처리 매커니즘이 필요하다

그를 위해, lock 을 이용한다

 

lock 을 이용하는데 사용되는 3가지 개념을 알아두자

Lock 구현 이론

(임계구역을 화장실이라 생각하자)

  1. 존버 메타 (화장실 앞에서 대기)
    스핀락이 그 예시이다.
    • busy waiting 이지만, user level 에서만 동작하므로, 다른 스레드가 lock 점유 시간이 짧을 것이라는 확신이 있다면, 좋은 선택이 될 것이다

  2.  랜덤 메타 (일단 자리로 돌아갔다올게)
    잠시 슬립하여 CPU 타임 슬라이스를 포기하는 것이다.
    이 메타는, 불확실성을 갖고있다. 슬립 사이에 다른 스레드가 들어갈 확률 존재한다

  3. 갑질 메타 (화장실이 비면 종업원에게 부탁하는거)
    이벤트 사용으로 구현한다. (C++ 11 에서는 조건변수(condition variable)가 표준 스펙으로 채택되었다)
    • 2.번 3.번 은 컨텍스트 스위칭이 발생한다. 즉, kernel level 까지 전환되기때문에 시간이 더 걸린다. 따라서, 대기상황이 길어질 것이라면 좋은 선택이 될 것이다.

보통 3가지의 개념을 섞어서 쓴다.


mutex 기반의 lock test

#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>

int sum = 0;
std::mutex mtx;

void Add()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<std::mutex> guard(mtx);
		++sum;
	}
}

void Sub()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<std::mutex> guard(mtx);
		--sum;
	}
}

int main()
{
	std::thread t1(Add);
	std::thread t2(Sub);

	t1.join();
	t2.join();

	std::cout << sum << std::endl;
	assert(sum == 0);

	return 0;
}

 


존버 메타 (Spinlock 구현)

테스트코드

#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>

int sum = 0;
std::mutex mtx;
SpinLock spinLock;

void Add()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		++sum;
	}
}

void Sub()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		--sum;
	}
}

int main()
{
	std::thread t1(Add);
	std::thread t2(Sub);

	t1.join();
	t2.join();

	std::cout << sum << std::endl;
	assert(sum == 0);

	return 0;
}

ver 1. 🔴

class SpinLock
{
public:
	void lock()
	{
		while (_locked)
		{
		}
		_locked = true;
	}

	void unlock()
	{
		_locked = false;
	}

private:
	bool _locked = false;
};
		while (_locked)
		{
		}

이 부분에서 컴파일러가 최적화해버린다. _locked를 false로

ver 2. 🔴

C++ 에서는 volatile 은 “(상수 폴딩)최적화하지 말아주세요” 의 의미를 갖는다

class SpinLock
{
public:
	void lock()
	{
		while (_locked)
		{
		}
		_locked = true;
	}

	void unlock()
	{
		_locked = false;
	}

private:
	volatile bool _locked = false;
};
  • 그러나 정상동작하지 않는다
    • “lock 상태 체크”와 “lock 상태 업데이트”가 원자적 연산이 아님
    • void lock() { // lock 상태 체크 while (_locked) { } // 여러 스레드에서 상태 체크를 동시에 통과할 수 있음 // lock 상태 업데이트 _locked = true; }
    • 여러 스레드에서 상태 체크를 동시에 통과할 수 있음

CAS(Compare-And-Swap) 연산 적용. atomic 클래스에서 메서드로 제공되고 있음

ver3. 🟢

atomic 적용 (해당 클래스 내부에서 CAS 지원)

코드로 보는 CAS

bool expected = false;
bool desired = true;
_locked.compare_exchange_strong(expected, desired);
  • _locked 가 false(expected)면 true(desire)로 바꿔주세요~

CAS 연산을 의사코드로 나타내면 아래와 같다

if (_locked == expected)
{
	expected = _locked;
	_locked = desired;
	return true;
}
else
{
	expected = _locked;
	return false;
}

따라서, lock 상태 체크 및 획득을 아래와 같이, while 문으로 사용하게된다

while (_locked.compare_exchange_strong(expected, desired) == false)
{
}

 

🚨 다만 주의할 점이 있는데, expected가 _locked 값으로 대입된다 (의사코드를 보라)

따라서, 다시 할당해줘야한다

while (_locked.compare_exchange_strong(expected, desired) == false)
{
	expected = false;
}

CAS 를 적용한 SpinLock의 구현코드

class SpinLock
{
public:
	void lock()
	{
		bool expected = false;
		bool desired = true;
		
		while (_locked.compare_exchange_strong(expected, desired) == false)
		{
			expected = false;
		}
	}

	void unlock()
	{
		_locked = false;
	}

private:
	std::atomic<bool> _locked = false;
};

여기서 unlock() 을 약간 리팩토링(store 메서드 적용)하면, 아래와 같다

class SpinLock
{
public:
	void lock()
	{
		bool expected = false;
		bool desired = true;
		
		while (_locked.compare_exchange_strong(expected, desired) == false)
		{
			expected = false;
		}
	}

	void unlock()
	{
		_locked.store(false);
	}

private:
	std::atomic<bool> _locked = false;
};

전체 코드

#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>

class SpinLock
{
public:
	void lock()
	{
		bool expected = false;
		bool desired = true;
		
		while (_locked.compare_exchange_strong(expected, desired) == false)
		{
			expected = false;
		}
	}

	void unlock()
	{
		_locked.store(false);
	}

private:
	std::atomic<bool> _locked = false;
};

int sum = 0;
std::mutex mtx;
SpinLock spinLock;

void Add()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		++sum;
	}
}

void Sub()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		--sum;
	}
}

int main()
{
	std::thread t1(Add);
	std::thread t2(Sub);

	t1.join();
	t2.join();

	std::cout << sum << std::endl;
	assert(sum == 0);

	return 0;
}

랜덤 메타 (Sleep)

  • 슬립을 이용해서 , CPU 스케줄링의 타임 슬라이스 를 포기함.. 혹은 소유권을 포기했다고 표현할수도 있겠다
    • spinlock의 경우는 타임슬라이스 내에서 무한루프로 계속 시도하는 것이다
  • 타임 슬라이스를 포기하는 방법에는 sleep, yield 가 존재한다
    • 개념상으로는 sleep_for(0ms) == yield() 인 것이다
std::this_thread::sleep_for(std::chrono::milliseconds(100));

std::this_thread::yield();

이것을 spinlock 구현에 더하면, 구현하고자한 개념이다

  • 하나 기억하고 넘어갈 것이 있다. 커널모드와 유저모드 전환은 비용이 꽤 된다. 따라서 시스템콜 호출은 최대한 자제하는 것이 좋다. 특히나, cout 을 박아버리면 환장할 노릇 일 것이다

전체코드

#include <atomic>
#include <cassert>
#include <iostream>
#include <thread>
#include <mutex>

class SpinLock
{
public:
	void lock()
	{
		bool expected = false;
		bool desired = true;
		
		while (_locked.compare_exchange_strong(expected, desired) == false)
		{
			expected = false;

			// std::this_thread::sleep_for(std::chrono::milliseconds(100));
			std::this_thread::yield();
		}
	}

	void unlock()
	{
		_locked.store(false);
	}

private:
	std::atomic<bool> _locked = false;
};

int sum = 0;
std::mutex mtx;
SpinLock spinLock;

void Add()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		++sum;
	}
}

void Sub()
{
	for (int i = 0; i < 100'000; ++i)
	{
		std::lock_guard<SpinLock> guard(spinLock);
		--sum;
	}
}

int main()
{
	std::thread t1(Add);
	std::thread t2(Sub);

	t1.join();
	t2.join();

	std::cout << sum << std::endl;
	assert(sum == 0);

	return 0;
}

 


갑질 메타 (event, condition variable)

  • 내 차례(lock release 시) 커널을 이용하여 event를 받는 것
  • 유저레벨에서는 스레드 순서에 대한 결론을 내릴 수 없으므로, 상위 개념에서 관리하는 것

event 개념으로

  1. auto reset event
  2. manual reset event

를 생각할 수 있다.

event 자체는 true/false 값이다

release 시 커널의 event 를

커널을 이용(컨텍스트 스위칭, 커널 리소스 소요)하므로, 어느정도 대기가 있다고 하면 좋은 선택이 될것이다

event는 락을 대상하지 않고, 일반적인 상황으로 실습해보자

spinlock 과 차이를 숙지하자

 

상황

#include <mutex>

std::mutex mtx;

void Producer()
{

}

void Consumer()
{
	
}

int main()
{
	return 0;
}
  • 한쪽 스레드에서는 데이터 생산
  • 한쪽 스레드에서는 데이터 소비

ex)

한쪽 (클라이언트에서 데이터를 수신받는) 스레드에서는 데이터를 큐에 밀어넣고

다른쪽 (게임 컨텐츠를 관리하는) 스레드에서 관련된 내용들을 추출

#include <mutex>
#include <queue>
#include <thread>
#include <iostream>

std::mutex mtx;
std::queue<int> q;

void Producer()
{
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}

		std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

void Consumer()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(mtx);
		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			std::cout << data << std::endl;
		}
	}

}

int main()
{
	std::thread t1(Producer);
	std::thread t2(Consumer);

	t1.join();
	t2.join();

	return 0;
}
  • 참고로 unique_lock == lock_guard + 제어옵션(defer 등) 이라고 생각하면된다
  • 위 동작에서 아쉬운 점이 존재한다.
    • 지금은 0.1초마다 데이터를 넣고있는데, 그게 아니라 정말 어쩌다가 한번 넣는 상황이라고 가정해보자
      • 100ms → 10000ms
      • consumer 스레드들은 그걸 모르기 때문에 while 문을 돌면서 계속 체크할 수 밖에 없다
      • 또한, 공유데이터에 접근하므로 lock 까지 걸고 있다
      • 의미없는 CPU 점유

event !

event 세팅

#include <mutex>
#include <queue>
#include <thread>
#include <iostream>

std::mutex mtx;
std::queue<int> q;
HANDLE handle;

void Producer()
{
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}

		std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

void Consumer()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(mtx);
		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			std::cout << data << std::endl;
		}
	}

}

int main()
{
	// 커널 오브젝트
		// Usage Count
		// Signal (파란불) / Non-Signal (빨간불) << bool 
	// event이므로 
		// Auto / Manual << bool
	handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*bManualReset*/, FALSE/*bInitialState*/, NULL);

	std::thread t1(Producer);
	std::thread t2(Consumer);

	t1.join();
	t2.join();

	::CloseHandle(handle);

	return 0;
}

producer 에서 데이터 밀어넣고, Signal 상태로 업데이트

void Producer()
{
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}
		::SetEvent(handle);
		
		std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

consumer 에서는 signal 이 없다면 재워지게됨.

void Consumer()
{
	while (true)
	{
		::WaitForSingleObject(handle, INFINITY);

		std::unique_lock<std::mutex> lock(mtx);
		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			std::cout << data << std::endl;
		}
	}
}
  • signal 상태가 된다면 깨워짐
  • 또한, createEvenet때, 3번째 인자(bManualReset) 을 FALSE, 즉 autoReset 이므로 , 깨워진 후에는 signal 상태가 Non-signal 이 된다

전체코드

#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <windows.h>

std::mutex mtx;
std::queue<int> q;
HANDLE handle;

void Producer()
{
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}
		::SetEvent(handle);

		std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

void Consumer()
{
	while (true)
	{
		::WaitForSingleObject(handle, INFINITY);

		std::unique_lock<std::mutex> lock(mtx);
		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			std::cout << data << std::endl;
		}
	}

}

int main()
{
	// 커널 오브젝트
		// Usage Count
		// Signal (파란불) / Non-Signal (빨간불) << bool 
	// event이므로 
		// Auto / Manual << bool
	handle = ::CreateEvent(NULL/*보안속성*/, FALSE/*bManualReset*/, FALSE/*bInitialState*/, NULL);

	std::thread t1(Producer);
	std::thread t2(Consumer);

	t1.join();
	t2.join();

	::CloseHandle(handle);

	return 0;
}

 

🚧 event 는 C++ 표준이 아니기에 다른 OS(리눅스, macOS 등)로 포팅하려면 추가적인 작업이 필요할 것이다.

 

불안정한 부분

  • 100% 확률로 sleep을 오래한다고하면 이상하지 않지만, producer 코드를 바쁘게 데이터를 밀어넣는다고하자
void Producer()
{
	while (true)
	{
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}
		::SetEvent(handle);

		// std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

 

  • consumer 코드에서 큐 사이즈를 읽는 코드가 있다고 할때, 정확하게 읽을 수 있을까?
void Consumer()
{
	while (true)
	{
		::WaitForSingleObject(handle, INFINITY);

		std::unique_lock<std::mutex> lock(mtx);
		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			// std::cout << data << std::endl;
			std::cout << q.size() << std::endl;
		}
	}
}
  • 기존 로직을 생각하면 1개 생성 & 1개 소비이므로 0 이어야 하지만, q 의 사이즈가 무한으로 커진다

이는 , 이벤트와 그외 연산이 원자작 연산이 아니기 때문에 그러하다. 즉, consumer 스레드가 깨어나고, 처리하는 코드 사이에, producer 스레드에서 데이터를 계속 밀어넣고 있는 것이다

  • 꼼수로 당장의 상황을 해결하는 방법을 생각해보면, lock 획득 후 큐에 쌓인 데이터를 모두 처리하는 것도 일종의 방법일 것이다
    • if (q.empty() == false) 을 while (q.empty() == false)

🔥 위 상황을 condition_variable 를 이용하여 해결해보자

 

condition variable

condition variable 은 C++ 표준 스펙이다.

#incldue <mutex>

std::condition_variable cv; 
  • Kernel-level Object가 아닌, User-level Object 이다.
    • 즉, 동일한 프로그램 내에서 처리가 가능한 것이다.
    • 이를 좀더 풀어쓰면, Kernel-level Object으로는 다른 프로그램 간에도 동기화가 가능하지만, User-level Object 기반이므로 그건 안된다는 것이다
  • condition_variable은 lock 과 짝을 지어서 동작하게 된다
  • 전형적인 4단계가 있다.
    1. Lock을 잡고
    2. 공유변수 값을 수정
    3. Lock을 풀고
    4. 조건변수를 통해, 다른 스레드에게 통지

producer 코드

void Producer()
{
	while (true)
	{
		// 1) Lock을 잡고
		// 2) 공유 변수 값을 수정
		// 3) Lock을 풀고
		// 4) 조건변수를 통해, 다른 스레드에게 통지
		
		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(100);
		}
		
		cv.notify_one(); // wait 중인 스레드가 있으면, 딱 1개를 깨운다
		
		// std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}

👁️ consumer 코드

void Consumer()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(mtx);
		cv.wait(lock, []() { return q.empty() == false; });

		if (q.empty() == false)
		{
			int data = q.front();
			q.pop();
			// std::cout << data << std::endl;
			std::cout << q.size() << std::endl;
			
		}
	}
}
  • 코드 분석
		std::unique_lock<std::mutex> lock(mtx);
		cv.wait(lock, []() { return q.empty() == false; });

두 줄의 동작을 분석해보자

 

  • wait 메서드 (1번 인자: lock, 2번 인자: 깨어날 조건(aka. 탈출조건))
void condition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred);

 

  1. Lock을 잡으려고 시도한다
    • 이미 lock이 잡혀있는 상태라면 넘어감
  2. 조건 확인
    • 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
    • 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
      • lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다

 

  • 전체적으로 볼때, 핵심은
    • lock 이 전체적으로, 끝까지 잡힌 것이 아니라,
    • wait에 의해서 조건에 따라 lock을 잡거나 푸는 것이다
    • wait 상태일때, notify_one() 이 호출되면, wait 중인 스레드가 깨어나서 다시 처음스텝부터 반복한다
  • 또한, condition_variable을 사용하므로, if (q.empty() == false) 문이 이미 처리된 것이다.
void Consumer()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(mtx);
		cv.wait(lock, []() { return q.empty() == false; });
		// 1) Lock을 잡고
			// 이미 lock이 잡혀있는 상태라면 넘어감 
		// 2) 조건 확인
			// - 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
			// - 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
			//     - lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다

		{
			int data = q.front();
			q.pop();
			// std::cout << data << std::endl;
			std::cout << q.size() << std::endl;
		}
	}
}

생각해볼 문제 2가지

Q1. notify_one()을 했다면 consume 조건을 항상 만족하는가?

  • notify_one() 으로 wait 인 스레드를 깨운 것이니, 굳이 상태체크가 필요없지 않을까?

A. 그렇지 않다 (Spurious Wakeup; 가짜 기상)

  • 조건변수를 notify_one() 하는 시점에 Lock을 잡고있지 않다
  • 깨어난 스레드가 다시 Lock을 잡을때까지, 다른 스레드가 개입해서 조건을 영향줄 수 있다.
  • 따라서, []() { return q.empty() == false; } 조건 검사를 하는 것이다

 

Q2. Lock 범위에서 notify_one 해주면 어떨까?

{
	std::unique_lock<std::mutex> lock(mtx);
	q.push(100);
	cv.notify_one();
}
  • producer의 전형적 4단계의 순서를 지켜야하는가?
  • Lock을 풀지 않고, 4) 조건변수를 통해, 다른 스레드에게 통지 하면 어떨까?

A. 동작은 잘 된다. 다만, 문제가 존재한다

  • 문제:
    • Lock을 잡은 상태에서 notify 하면
    • consume에서 동작을 생각해보자
      • wait에서 깨어나서 첫번째로 하는것은, “Lock 획득 시도”
    • 그런데, producer에서 lock이 풀린 상태가 아니라면, 경합에 들어가는 것이다!
    • 즉 불필요한 경합이 있는 것이다

테스트용 전체 코드

#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <cassert>

std::mutex mtx;
std::queue<int> q;

// #incldue <mutex>
// User-level Object 이다. (커널 오브젝트가 아님)
std::condition_variable cv;

void Producer()
{
	// while (true)
	for (int i = 0; i < 10; ++i) // 10개의 데이터를 생산
	{
		// 1) Lock을 잡고
		// 2) 공유 변수 값을 수정
		// 3) Lock을 풀고
		// 4) 조건변수를 통해, 다른 스레드에게 통지

		{
			std::unique_lock<std::mutex> lock(mtx);
			q.push(i);
			std::cout << "Produced: " << i << std::endl; // 디버그 출력
		}

		cv.notify_one(); // wait 중인 스레드가 있으면, 딱 1개를 깨운다
	}
}

void Consumer()
{
	// while (true)
	for (int i = 0; i < 10; ++i) // 10개의 데이터를 소비
	{
		std::unique_lock<std::mutex> lock(mtx);
		cv.wait(lock, []()
				{ return q.empty() == false; });
		// 1) Lock을 잡고
		// 이미 lock이 잡혀있는 상태라면 넘어감
		// 2) 조건 확인
		// - 만족 → q 가 비어있지 않음: Lock 잡고 이후 코드들 진행
		// - 불만족 → q 가 비어있음: ⭐️ Lock을 풀고, 대기상태로 전환한다
		//     - lock_guard가 아닌 unique_lock인 이유도, wait 내부적으로 lock을 풀어줘야 하는 경우가 있기 때문이다

		{
			int data = q.front();
			q.pop();
			std::cout << "Consumed: " << data << std::endl; // 디버그 출력
			// assert: 소비한 데이터가 i와 일치하는지 확인
			assert(data == i);

			// std::cout << q.size() << std::endl;
		}
	}
}

int main()
{
	std::thread t1(Producer);
	std::thread t2(Consumer);

	t1.join();
	t2.join();

	return 0;
}

 

event 와 굉장히 유사한 내용이지만, 좀 더 우아하게 동작된다. C++ 11 부터 표준스펙에 들어왓으므로, condition_variable(조건변수)를 사용하면 된다.

'C・C++ > 멀티스레드' 카테고리의 다른 글

[C++] 멀티 스레드로 소수 구하기  (0) 2024.10.05
[C++] Reader-Writer Lock 구현  (0) 2024.10.05
[C++] Thread Local Storage (aka. TLS)  (0) 2024.10.04
[C++] Future  (3) 2024.10.04
[C++] Lock-Based Stack 구현  (1) 2024.10.03
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함