devcken.io

Thoughts, stories and ideas.

ThreadLocal 변수와 Scala Future

원문: ThreadLocal Variables and Scala Futures

Thread-Local Storage (TLS)는 현재 실행 중인 스레드에 정적 변수를 추가할 수 있도록 해줍니다. TLS의 가장 흔한 용도는 메서드 파라메터없이 콜스택을 통해 글로벌 컨텍스트를 전달하는 것입니다. 덕분에, 웹애플리케이션에서 (현재 요청 URL과 같은) 데이터를 코드 기반을 통해 글로벌하게 이용 가능할 수 있도록 만들어 줍니다(로깅 또는 감사 목적에 매우 유용합니다).

TLS가 실패하는 경우는 스레드 간에 실행 경로가 변경되는 경우입니다. Future가 코드를 병렬화하는 곳이면 어디서나, 모든 TLS가 손실된 비동기 실행을 위한 스레드 풀의 무작위 스레드로부터 실행이 떨어져 나갑니다. Future는 Play!와 같은 새로운 반응형 웹 프레임워크의 심장이기에, 모든 이들이 TLS가 어떻게 동작하는지를 다시 생각해봐야 합니다.

해결책은 좀 더 간단한데, ExecutionContext 트레이트 내에 있습니다. ExecutionContext는 프로그램 로직을 실행할 수 있는 개체에 대한 추상화이며, Future를 만들기 위한 암시적 요구사항입니다:

import scala.concurrent.ExecutionContext.Implicits.global  
val f = Future { /*this block executes in another thread*/ }  

가장 흔한 구현은 ForkJoinPool인데, 효율적인 작업 훔치기 알고리즘을 구현하여 기초 스레드 풀을 향상시킨 것입니다. 그것은 Play!와 Akka를 기반으로 하는 병렬 처리 애플리케이션의 기초입니다.

기초 스레드 정보를 출력하는 작은 프로그램을 살펴보겠습니다:

import scala.concurrent.{ Await, Future }  
import scala.concurrent.duration._

def printThreadInfo(id: String) = println {  
  id + " : " + Thread.currentThread.getName
}

implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global

printThreadInfo("main")  
val fut1 = Future { printThreadInfo("fut1") }

Await.result(fut1, 1.second)

//Output:
//> main : main
//> fut1 : ForkJoinPool-1-worker-13

그러므로 Future는 확실히 메인 스레드가 아닌 다른 스레드 상에서 실행됩니다. TLS 안에 다른 값들을 저장할 수 있을까요?

여기서는 스칼라에 대해서 이야기하고 있으니, 자바의 ThreadLocal를 직접 사용하지 않고 DynamicVariable 클래스를 사용해보겠습니다. 실행 중인 스레드에 따라 동적으로 값이 주어지는 정적 변수이기에 그렇게 이름지어졌습니다.

DynamicVariablescala.language.dynamics의 동적 필드와는 아무련 관련이 없습니다.

import scala.concurrent.{ Await, Future }  
import scala.concurrent.duration._  
import scala.util.DynamicVariable

def printThreadInfo(id: String) = println {  
  id + " : " + Thread.currentThread.getName + " = " + dyn.value
}

//create a dynamic variable
val dyn = new DynamicVariable[Int](0)

implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global

val fut1 = dyn.withValue(1) { Future { printThreadInfo("fut1") } }  
val fut2 = dyn.withValue(2) { Future { printThreadInfo("fut2") } }  
val fut3 = dyn.withValue(3) { Future { printThreadInfo("fut3") } }

Await.result(fut1, 1.second)  
Await.result(fut2, 1.second)  
Await.result(fut3, 1.second)

//Output:
//> fut1 : ForkJoinPool-1-worker-13 = 1
//> fut2 : ForkJoinPool-1-worker-11 = 2
//> fut3 : ForkJoinPool-1-worker-9 = 3

//But wait, threads work when created, what happens if we reuse threads already in the pool?

val fut4 = dyn.withValue(4) { Future { printThreadInfo("fut4") } }  
val fut5 = dyn.withValue(5) { Future { printThreadInfo("fut5") } }

Await.result(fut4, 1.second)  
Await.result(fut5, 1.second)

//Output:
//> fut4 : ForkJoinPool-1-worker-11 = 2
//> fut5 : ForkJoinPool-1-worker-11 = 2

그래서 DynamicVariable이 TLS에서 새로운 스레드로 올바르게 전달되는 문제를 다루지만, 스레드가 이미 만들어져 풀에서 다시 재사용되는 경우, TLS는 복사되지 않고 이전 사용 중 할당된 이전 값을 갖습니다.

ExecutionContext는 모든 스레드 스케줄링을 처리하는데, 그들이 실행되기 전에 TLS를 해당 스레드로 복사하라고 할 수 있을까요? 이 트레이트는 매우 간단합니다. execute는 수정하기 위한 가장 확실한 선택입니다:

/**
 * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
 */
trait ExecutionContext {  

  /** Runs a block of code on this execution context.
   */
  def execute(runnable: Runnable): Unit

  /** Reports that an asynchronous computation failed.
   */
  def reportFailure(t: Throwable): Unit

  /** Prepares for the execution of a task. Returns the prepared
   *  execution context. A valid implementation of `prepare` is one
   *  that simply returns `this`.
   */
  def prepare(): ExecutionContext = this
}

ForkJoinPool 구현을 사용하고 있으므로 수정할 새로운 상속 클래스를 만듭니다. 생성자 파라메터로 DynamicVariable을 보낸다면, 다른 클로저에 대해 염려하지 않아도 됩니다.

import scala.concurrent.forkjoin._

class ForkJoinPoolWithDynamicVariable[T](dynamicVariable: DynamicVariable[T]) extends ForkJoinPool {  
  override def execute(task: Runnable) {
    //need to inject dynamicVariable.value into task
    super.execute(task)
  }
}

그래서 execute는 메인 스레드 내에서 실행되지만, taskFuture의 스레드 풀 내에서 실행됩니다. 어쨌든 dynamicVariableRunnable 내에 주입해야 합니다. dynamicVariable의 값에 클로저를 넣은 후 task를 실행하는 Runnable을 만들어 보겠습니다.

override def execute(task: Runnable) {  
  val copyValue = dynamicVariable.value
  super.execute(new Runnable {
    override def run = {
      dynamicVariable.value = copyValue
      task.run
    }
  })
}

기본적으로, copyValue는 메인 스레드의 dynamicVariable을 읽은 후 run이 thread-pool 스레드 내에서 실행되는 동안 dynamicVariable에 적합한 값을 할당합니다. T가 제네릭이므로, Map을 포함해 어떤 스칼라 클래스든지 될 수 있으므로, 하나의 DynamicVariable은 대부분의 시나리오에 대해 충분히 유연합니다. 우리가 해야 할 일은 새로운 ExecutorService를 사용하는 것인데, 바꾸면:

implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global  

새로운 클래스를 보면:

val dyn = new DynamicVariable[T](/* default for T */)

implicit val executionContext = scala.concurrent.ExecutionContext.fromExecutorService(  
  new ForkJoinPoolWithDynamicVariable(dyn)
)

가비지 컬렉션에 대해 살짝 알아두어야 할 것이 있습니다. 스레드가 존재하는 한, ThreadPool 변수에 대한 참조를 유지합니다. 만약 thread-pool이 스레드를 재활용하지 않고 스레드가 TLS를 해제하지 않은 채로 풀로 되돌아 간다면, 그러한 객체들은 해제되지 않습니다. 일반적인 경우 이것이 큰 이슈는 아니지만, 더 큰 객체의 경우 사용한 뒤에 명시적으로 해제하거나 허용된다면 WeakReference를 사용하는 것이 현명할 겁니다.