티스토리 뷰

Reader-Writer Lock

개요

대부분의 확률에서는 read 할 경우에는 lock 이 없는것처럼 공유해서 사용하다가,

write가 들어가면 상호배타적 특성을 지니는 lock을 적용한다

#ifndef LOCK_H
#define LOCK_H

#include <mutex>
#include <atomic>

using Mutex = std::mutex;
using CondVar = std::condition_variable;
using UniqueLock = std::unique_lock<std::mutex>;
using LockGuard = std::lock_guard<std::mutex>;

/* -------------------------
		RW SpinLock
------------------------- */

/* -------------------------------------------
32비트 변수를 활용한 Lock
[WWWW WWWW][WWWW WWWW] [RRRR RRRR][RRRR RRRR]
W : WriteFlag (Exclusive Lock Owner ThreadId)
R : ReadFlag (Shared Lock Count)
------------------------------------------- */

class Lock
{
	enum : uint32_t
	{
		ACQUIRE_TIMEOUT_TICK = 10'000,
		MAX_SPIN_COUNT = 5'000,
		WRITE_THREAD_MASK = 0xFFFF'0000, // 상위 16비트를 뽑기위한 마스크
		READ_COUNT_MASK = 0x0000'FFFF, // 하위 16비트를 뽑기위한 마스크
		EMPTY_FLAG = 0x0000'0000
	};

public:
	void WriteLock();
	void WriteUnlock();
	void ReadLock();
	void ReadUnlock();

private:
	std::atomic<uint32_t> _lockFlag = EMPTY_FLAG;
	uint16_t _writeCount = 0;
};

#endif /* LOCK_H */

Lock::WriteLock()

void Lock::WriteLock()
{
	// 아무도 소유 및 공유하고 있지 않을 때, 경합해서 소유권을 얻는다
	
	// 의사코드
	if (_lockFlag == EMPTY_FLAG)
	{
		const uint32_t desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
		_lockFlag = desired;
	}
	while (true)
	{

	}
	
}
#define OUT

void Lock::WriteLock()
{
	// 아무도 소유 및 공유하고 있지 않을 때, 경합해서 소유권을 얻는다
	const uint32_t desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
	while (true)
	{
		for (uint32_t spinCount = 0; spinCount < MAX_SPIN_COUNT; ++spinCount)
		{
			uint32_t expected = EMPTY_FLAG;
			if (_lockFlag.compare_exchange_strong(OUT expected, desired))
			{
				++_writeCount;
				return;
			}
		}
	}
	
}
  • writeCount 따로 관리하는 이유
    • 여러가지 정책 중에서 이 락을 재규적으로 호출하는 걸 허락한다고 했으니까
    • lock을 잡고 있는 상태에서 또 writeLock을 호출하게 되면은
      • crash가 나는 게 아니라,
      • writeCount를 증가시켜 한 번 더 잡게 허락하는 것이다

정책에 의해서 동일한 스레드가 소유하고 있다면 무조건 성공이다

	// 동일한 스레드가 소유하고 있다면 무조건 성공
	const uint32_t lockThreadId = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadId)
	{
		++_writeCount;
		return;
	}

또한, 스핀락 시도 정책을 두어, 잠시 쉬다 오는 코드도 넣어주면 아래와 같다. 만약 너무 오래시도하고있다면, 문제가 있는 것으로 간주하고 CRASH를 내준다

전체코드

void Lock::WriteLock()
{
	// 동일한 스레드가 소유하고 있다면 무조건 성공
	const uint32_t lockThreadId = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadId)
	{
		++_writeCount;
		return;
	}

	// 아무도 소유 및 공유하고 있지 않을 때, 경합해서 소유권을 얻는다
	auto start = std::chrono::high_resolution_clock::now();

	const uint32_t desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
	while (true)
	{
		for (uint32_t spinCount = 0; spinCount < MAX_SPIN_COUNT; ++spinCount)
		{
			uint32_t expected = EMPTY_FLAG;
			if (_lockFlag.compare_exchange_strong(OUT expected, desired))
			{
				++_writeCount;
				return;
			}
		}

		auto now = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
		if (duration.count() >= ACQUIRE_TIMEOUT_TICK)
		{
			CRASH("LOCK_TIMEOUT");
		}
		
		// MAX_SPIN_COUNT 만큼 시도했다면, 잠시 쉬다오자
		std::this_thread::yield();
	}
}
  • 자세히 볼 부분 모음
for (uint32_t spinCount = 0; spinCount < MAX_SPIN_COUNT; ++spinCount)
{
	uint32_t expected = EMPTY_FLAG;
	if (_lockFlag.compare_exchange_strong(OUT expected, desired))
	{
		++_writeCount;
		return;
	}
}
  • lock 의 비트 (flags) 를 CAS로 작업하는 것이다
	const uint32_t desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
  • 즉, readCount 가
    • 0 이면, 수행함
    • 0 이 아니면(누군가 읽고 있음-누군가 readLock을 잡고있음), 계속 시도하게된다 (writeCount 증가)
  • 정리하면,
    • writeLock은
      • 내가 처음부터 잡고있거나,
      • 이 Lock을 아무도 잡지 않아서 readCount, writeCount도 없는 깨끗한 상태일때만 잡을 수 있다

Lock::WriteUnlock()

void Lock::WriteUnlock()
{
	const int32_t lockCount = --_writeCount;
	if (lockCount == 0)
	{
		_lockFlag.store(EMPTY_FLAG); // 스레드ID(상위 16비트)포함한 0으로 초기화
	}
}
  • 정책 추가
    • 연속된 W 가능 : W → W → W → … 🟢
    • W 와중에 R 도 가능 : W → R 🟢
    • 하지만, R 중 W 불가능 : R → W 🔴
	// ReadLock 다 풀기 전에는 WriteUnlock 불가능
	if ((_lockFlag.load() & READ_COUNT_MASK) != 0) 
	{
		CRASH("INVALID_UNLOCK_ORDER");
	}

전체코드

void Lock::WriteUnlock()
{
	// ReadLock 다 풀기 전에는 WriteUnlock 불가능
	if ((_lockFlag.load() & READ_COUNT_MASK) != 0) 
	{
		CRASH("INVALID_UNLOCK_ORDER");
	}

	const int32_t lockCount = --_writeCount;
	if (lockCount == 0)
	{
		_lockFlag.store(EMPTY_FLAG); // 스레드ID(상위 16비트)포함한 0으로 초기화
	}
}

Lock::ReadLock()

void Lock::ReadLock()
{
	// 아무도 소유하고 있지 않다(상위 16비트 == 0)면, 경합해서 공유카운트(하위 16비트)를 올려야 한다 
	while (true)
	{
		for (uint32_t spinCount = 0; spinCount < MAX_SPIN_COUNT; ++spinCount)
		{
			uint32_t expected = (_lockFlag.load() & READ_COUNT_MASK); // 상위 16비트를 0으로 세팅하기 위함
			if (_lockFlag.compare_exchange_strong(OUT expected, expected + 1))
			{
				return;
			}
			
		}
	
		// MAX_SPIN_COUNT 만큼 시도했다면, 잠시 쉬다오자
		std::this_thread::yield();
	}

}
  • 여기에 시간제한을 넘어가면 크래쉬를 발생시키자
	void Lock::ReadLock()
{
	// 동일한 스레드가 소유하고 있다면 무조건 성공

	// 아무도 소유하고 있지 않다(상위 16비트 == 0)면, 경합해서 공유카운트(하위 16비트)를 올려야 한다 
	auto start = std::chrono::high_resolution_clock::now();
	while (true)
	{
		for (uint32_t spinCount = 0; spinCount < MAX_SPIN_COUNT; ++spinCount)
		{
			uint32_t expected = (_lockFlag.load() & READ_COUNT_MASK); // 상위 16비트를 0으로 세팅하기 위함
			if (_lockFlag.compare_exchange_strong(OUT expected, expected + 1))
			{
				return;
			}
		}

		auto now = std::chrono::high_resolution_clock::now();
		auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
		if (duration.count() >= ACQUIRE_TIMEOUT_TICK)
		{
			CRASH("LOCK_TIMEOUT");
		}

		// MAX_SPIN_COUNT 만큼 시도했다면, 잠시 쉬다오자
		std::this_thread::yield();
	}

}
  • 또한, 동일한 스레드가 lock을 소유하고 있다면 무조건 성공이다
	// 동일한 스레드가 소유하고 있다면 무조건 성공
	const uint32_t lockThreadId = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadId)
	{
		_lockFlag.fetch_add(1);
		return;
	}
  • write lock 을 잡고있을때, read lock 을 잡으려는 시도이므로, 다른 스레드는 접근 할 수 없다.
  • 따라서, 그냥 1을 더해주겠다

Lock::ReadUnlock()

void Lock::ReadUnlock()
{
	_lockFlag.fetch_sub(1);
}
  • 다만, read count 값이 0 이면 안되겠지요?
    • 조건문으로 체크해줘야겠다

전체코드

void Lock::ReadUnlock()
{
	if ((_lockFlag.fetch_sub(1) & READ_COUNT_MASK) == 0) // fetch_XXX 는 연산 이전의 값을 뱉음
	{
		CRASH("MULTIPLE_UNLOCK");
	}
}

Lock Guard 로서 제공하기

  • 손수 lock, unlock을 하는 것은 실수의 여지가 많다
  • 나만의 LockGuard를 만들어서 제공해주자
  • RAII 패턴으로 래핑하는 것이다
/* -------------------------
		LockGuards
------------------------- */

class ReadLockGuard
{
public:

private:

};

class WriteLockGuard
{
public:

private:

};
/* -------------------------
		LockGuards
------------------------- */

class ReadLockGuard
{
public:
	ReadLockGuard(Lock& lock) : _lock(lock) { _lock.ReadLock(); }
	~ReadLockGuard() { _lock.ReadUnlock(); }

private:
	Lock& _lock;
};

class WriteLockGuard
{
public:
	WriteLockGuard(Lock& lock) : _lock(lock) { _lock.WriteLock(); }
	~WriteLockGuard() { _lock.WriteUnlock(); }

private:
	Lock& _lock;
};

매크로 정리하기

#include <stdint.h>

#define OUT

/*---------------
	  Lock
---------------*/

#define USE_MANY_LOCKS(count)	Lock _locks[count];
#define USE_LOCK				USE_MANY_LOCKS(1)
#define	READ_LOCK_IDX(idx)		ReadLockGuard readLockGuard_##idx(_locks[idx]);
#define READ_LOCK				READ_LOCK_IDX(0)
#define	WRITE_LOCK_IDX(idx)		WriteLockGuard writeLockGuard_##idx(_locks[idx]);
#define WRITE_LOCK				WRITE_LOCK_IDX(0)

/*---------------
	  Crash
---------------*/

#if defined(_MSC_VER) // MSVC 컴파일러일 경우
    #define ASSUME(expr) __analysis_assume(expr)
#elif defined(__GNUC__) || defined(__clang__) // GCC 또는 Clang 컴파일러일 경우
    #define ASSUME(expr) __builtin_assume(expr)
#else
    #define ASSUME(expr) ((void)0) // 다른 컴파일러에서는 빈 정의로 처리
#endif

#define CRASH(cause)						\\
{											\\
	uint32_t* crash = nullptr;				\\
	ASSUME(crash != nullptr);				\\
	*crash = 0xDEADBEEF;					\\
}

동작 테스트

  • 상황
    • 멀티스레드 환경에서, 큐에 대해서 생산 및 소비 하는 상황
#include <queue>
#include "Lock.h"

class TestLock
{
	USE_LOCK; // == Lock _locks[1];

public:
	uint32_t TestRead()
	{

	}

	void TestPush()
	{

	}

	void TestPop()
	{

	}

private:
	std::queue<uint32_t> _queue;
};

TestLock testLock;

void ThreadWrite()
{
	while (true)
	{
		testLock.TestPush();
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
		testLock.TestPop();
	}
}

void ThreadRead()
{
	while (true)
	{
		uint32_t value = testLock.TestRead();
		std::cout << value << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
	}
}

int main()
{
	std::thread write1(ThreadWrite);
	std::thread write2(ThreadWrite);

	std::thread read1(ThreadRead);
	std::thread read2(ThreadRead);
	std::thread read3(ThreadRead);
	std::thread read4(ThreadRead);
	std::thread read5(ThreadRead);

	write1.join();
	write2.join();

	read1.join();
	read2.join();
	read3.join();
	read4.join();
	read5.join();
	
	return 0;
}

uint32_t TestRead()
{
	READ_LOCK;

	// TestPush(); // 문제가 있는 코드 (Read 중 Write 불가능)

	if (_queue.empty())
	{
		return -1;
	}
	return _queue.front();
}

void TestPush()
{
	WRITE_LOCK;

	_queue.push(rand() % 100);
}

void TestPop()
{
	WRITE_LOCK;

	if (_queue.empty() == false)
	{
		_queue.pop();
	}
}

전체코드

#include <queue>
#include <thread>
#include <iostream>

#include "Lock.h"

class TestLock
{
	USE_LOCK; // == Lock _locks[1];

public:
	uint32_t TestRead()
	{
		READ_LOCK;

		// TestPush(); // 문제가 있는 코드 (Read 중 Write 불가능)

		if (_queue.empty())
		{
			return -1;
		}
		return _queue.front();
	}

	void TestPush()
	{
		WRITE_LOCK;

		_queue.push(rand() % 100);
	}

	void TestPop()
	{
		WRITE_LOCK;

		if (_queue.empty() == false)
		{
			_queue.pop();
		}
	}

private:
	std::queue<uint32_t> _queue;
};

TestLock testLock;

void ThreadWrite()
{
	while (true)
	{
		testLock.TestPush();
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
		testLock.TestPop();
	}
}

void ThreadRead()
{
	while (true)
	{
		uint32_t value = testLock.TestRead();
		std::cout << value << std::endl;
		std::this_thread::sleep_for(std::chrono::milliseconds(1));
	}
}

int main()
{
	std::thread write1(ThreadWrite);
	std::thread write2(ThreadWrite);

	std::thread read1(ThreadRead);
	std::thread read2(ThreadRead);
	std::thread read3(ThreadRead);
	std::thread read4(ThreadRead);
	std::thread read5(ThreadRead);

	write1.join();
	write2.join();

	read1.join();
	read2.join();
	read3.join();
	read4.join();
	read5.join();

	return 0;
}

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함