Redux-Saga의 작동 방식

제너레이터 함수에서 다음 yield 구문을 실행하기 위해서는 generator.next() 메서드를 실행해줘야 함. 하지만 각각의 제너레이터 함수에 대해 next()를 실행해주는 로직은 없는데 Redux-Saga 내부에서 어떠한 과정을 거치길래 이러한 동작이 가능할까?

1. Saga 미들웨어는 어떤 과정을 거쳐 각각의 제너레이터 함수에 대해 next() 메서드를 실행시킬까?
2. UI에서 액션이 dispatch될 때, 어떻게 suspend되어 있는 제너레이터 함수가 다시 재개될까? 

아래와 같은 rootSaga를 Redux에 설정하는 경우를 예로 들어보자.

import { createStore, applyMiddleware } from 'redux'
import createSagaMiddleware from 'redux-saga'

import { watchIncrementAsync } from './sagas'

const sagaMiddleware = createSagaMiddleware()
const store = createStore(reducer, applyMiddleware(sagaMiddleware))

sagaMiddleware.run(watchIncrementAsync)

 

import { put, takeEvery } from 'redux-saga/effects'

const delay = (ms) => new Promise(res => setTimeout(res, ms))

export function* incrementAsync() {
  yield delay(1000)
  yield put({ type: 'INCREMENT' })
}

export function* watchIncrementAsync() {
  yield takeEvery('INCREMENT_ASYNC', incrementAsync)
}

 
 

A. rootSaga 초기화

sagaMiddleware.run(rootSaga)를 통해 작성한 Watcher Saga를 초기화해주는 과정을 거침.

// redux-saga/packages/core/src/internal/middleware.js
import { runSaga } from './runSaga'

// ...
let boundRunSaga

boundRunSaga = runSaga.bind(null, {
  ...options,
  context,
  channel,
  dispatch,
  getState,
  sagaMonitor,
})

sagaMiddleware.run = (...args) => boundRunSaga(...args)

runSaga.bind(null, { ... })는 runSaga() 함수의 첫번째 parameter인 option을 { ... } bind하는 과정으로 우리가 눈여겨봐야 할 부분은 아님. 함수 boundRunSaga는 함수 runSaga의 첫번째 parameter가 채워진 함수이므로 sagaMiddleware.run(rootSaga)에서 전달해주는 rootSaga는 runSaga()에 두번째 인자인 saga로 전달됨. 

// redux-saga/packages/core/src/internal/runSaga.js
import proc from './proc'

export function runSaga(options, saga, ...args) {
  // 1. 제너레이터 함수를 실행해 rootIterator를 얻음
  const iterator = saga(...args)
  
  // ...
  
  return immediately(() => {
    // 2. 생성된 rootIterator를 proc() 함수에 전달
    const task = proc(env, iterator, context, effectId, getMetaInfo(saga), true, undefined)

    if (sagaMonitor) {
      sagaMonitor.effectResolved(effectId, task)
    }
    return task
  })
}

 

 

B. rootSaga의 task 생성

rootSaga 제너레이터 함수가 반환하는 rootIterator이 아래 proc() 함수에 전달되어 task가 생성됨.

// redux-saga/packages/core/src/internal/proc.js
export default function proc(env, iterator, /* ... */) {
  // 별다른 설정을 하지 않았다면 finalRunEffect === runEffect
  const finalRunEffect = env.finalizeRunEffect(runEffect)
  
  next()
  
  function next(arg, isErr) {
    let result
    if (isErr) {
      // ...
    } else if (shouldCancel(arg)) {
      // ...
    } else if (shouldTerminate(arg)) {
      // ...
    } else {
      // 3. rootIterator.next() 메서드를 실행해 결과 얻음
      // 첫번째 yield 구문까지 실행
      result = iterator.next(arg)
    }

    if (!result.done) {
      // 4. iterator의 done: false일 때, 이 함수를 실행  
      digestEffect(result.value, parentEffectId, next)
    } else {
      // ...
    }
  }
}

 
3. iterator.next(arg)으로 iterator가 반환하는 값을 얻게 되는데 그 값은 무엇일까? 현재 예시에서는 watchIncrementAsync()의 rootIterator.next()가 첫번째로 반환하는 값인 takeEvery() result.value로 전달됨.

export function* watchIncrementAsync() {
  yield takeEvery('INCREMENT_ASYNC', incrementAsync)
}

이제 takeEvery()이 어떤 값인지 살펴보자.

 

 

C. takeEvery()

// redux-saga/packages/core/src/internal/sagaHelpers/takeEvery.js
import fsmIterator from './fsmIterator'

export default function takeEvery(patternOrChannel, worker, ...args) {
  const yTake = { done: false, value: take(patternOrChannel) }
  const yFork = (ac) => ({ done: false, value: fork(worker, ...args, ac) })

  let action,
    setAction = (ac) => (action = ac)

  return fsmIterator(/* ... */)
}

export default function fsmIterator(fsm, startState, name) {
  //... 

  return makeIterator(next, (error) => next(null, error), name)
}

takeEvery()는 yTake, yFork가 포함된 객체를 fsmIterator()에 전달해 iterator를 만듦. 여기서 생성된 iterator.next()는 fsm[nextState]()가 반환하는 객체의 effect를 반환함. 즉, 객체 yTake와 yFork(action).  

const yTake = { done: false, value: take(patternOrChannel) }
const yFork = (action) => ({ done: false, value: fork(worker, ...args, action) })

정리하면, takeEvery('INCREMENT_ASYNC', incrementAsync)는 아래와 같은 iterator를 반환함. (해당 iterator를 @ITERATOR라고 정의하자.)

takeEvery('INCREMENT_ASYNC', incrementAsync) === @ITERATOR

// @ITERATOR.next()의 반환값
{ done: false, value: take('INCREMENT_ASYNC') } // 홀수 번 호출
{ done: false, value: fork(incrementAsync, ...args, 'INCREMENT_ASYNC') } // 짝수 번 호출

 
4. takeEvery()가 iterator를 반환한다는 것을 알았지만, 3.에서 호출된 next()는 takeEvery()의 iterator.next()가 아니라 watchIncrementAsync()의 rootIterator.next()임에 유의해야 함. 즉, digestEffect()에 전달되는 result.value는 객체 @ITERATOR.

const result = { done: false, value: @ITERATOR }

digestEffect(result.value, parentEffectId, next)

 

 

D. Effect Creator의 처리

이 iterator @ITERATOR를 proc() 함수 내부에서 선언된 digestEffect()에 전달하면...

// digestEffect(yTake, parentEffectId, next)를 실행
function digestEffect(effect, parentEffectId, cb, label = '') {
  // ...
  
  finalRunEffect(effect, effectId, currCb)
}

아까 살펴본 대로 finalRunEffect === runEffect이므로 proc() 함수 내부에서 선언된 runEffect()가 실행됨

// runEffect(yTake, parentEffectId, currCb)를 실행
function runEffect(effect, effectId, currCb) {
  if (is.promise(effect)) {
    resolvePromise(effect, currCb)
  // 5. rootIterator.next()가 반환한 @ITERATOR는 이 분기에서 처리됨
  } else if (is.iterator(effect)) {
    proc(env, effect, task.context, effectId, meta, false, currCb)
  // 6. 5의 @ITERATOR.next()가 반환한 객체는 이 분기에서 처리됨  
  } else if (effect && effect[IO]) {
    const effectRunner = effectRunnerMap[effect.type]
    effectRunner(env, effect.payload, currCb, executingContext)
  } else {
    currCb(effect)
  }
}

 
5. rootIterator.next()가 반환한 값은 iterator이므로 다시 proc() 함수로 전달되어 실행됨. 3으로 다시 되돌아가 @ITERATOR에 대해 iterator.next()를 실행하면 이번에 얻어지는 result.value는 [IO] 속성을 갖는 일반 객체.

{ done: false, value: take('INCREMENT_ASYNC') }

// take('INCREMENT_ASYNC')의 반환값
{
  [IO]: true,
  combinator: false,
  type: effectTypes.TAKE,
  payload: { pattern: 'INCREMENT_ASYNC' },
}

 
6. 이제 이 객체가 runEffect()에 전달되는데 [IO] 속성이 true이므로 이번에는 5.에서의 분기가 아니라 effectRunner 분기에서 처리됨. @ITERATOR.next()가 첫번째로 반환한 값은 take() Effect Creator이므로 runTakeEffect()에서 처리됨.

// redux-saga/packages/core/src/internal/effectRunnerMap.js
function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
  const takeCb = (input) => {
    if (input instanceof Error) {
      // ...
    }
    if (isEnd(input) && !maybe) {
      // ...
    }
    cb(input)
  }
  
  try {
    // 7. runSaga()에서 생성된 channel의 take() 메서드 실행
    channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
  } catch (err) {
    cb(err, true)
    return
  }
  cb.cancel = takeCb.cancel
}

runTakeEffect()에 두번째로 전달된 인수는 effect creator가 반환하는 payload 객체. 즉, { pattern: 'INCREMENT_ASYNC' }이므로, channel 속성은 undefined. 따라서, channel은 처음에 runSaga()에서 생성된 env.channel이 사용됨. 또한, pattern이 string이므로 matcher() 함수가 반환하는 함수가 두번째 인수로 전달됨.

export const string = (pattern) => (input) => input.type === String(pattern)

// matcher('INCREMENT_ASYNC')
// Store에 dispatch되는 { type: 'INCREMENT_ASYNC' } Action 객체인지 체크
(input) => input.type === 'INCREMENT_ASYNC'

7. ① takeCb() 함수와 ② Store에서 구독할 Action 객체를 체크할 matcher 함수를 runSaga()에서 생성된 channel.take() 메서드에 전달해 실행함.

takeCb() 함수의 역할: Redux에서 해당하는 Action이 dispatch되면...
1. 해당 take()를 포함하는 제너레이터 함수의 iterator.next()를 실행(proc() 내부의 next() 함수 실행)
2. 취소, 에러 발생 등 상황에 따라 Redux-Saga 기능 실행

 

 

E. 각각의 Iterator가 Saga 미들웨어를 구독하도록 설정

8. 이제 channel이 어떤 객체인지 되돌아가서 살펴보자. 별도로 channel을 설정하지 않았다면 stdChannel()의 반환값이 사용됨.

export default function createSagaMiddleware({ 
  context = {}, 
  // 8. 기본값으로 사용되는 channel
  channel = stdChannel(), 
  sagaMonitor, 
  ...options 
} = {}) {
  // ...
}
// redux-saga/packages/core/src/internal/channel.js
export function stdChannel() {
  // multicastChannel()의 반환값이 기본 channel로 사용됨
  const chan = multicastChannel()
  const { put } = chan
  
  chan.put = (input) => {
    if (input[SAGA_ACTION]) {
      put(input)
      return
    }
    asap(() => {
      put(input)
    })
  }
  
  return chan
}
export function multicastChannel() {  
  let currentTakers = []
  let nextTakers = currentTakers
  // ...
  
  return {
    [MULTICAST]: true,
    put(input) { /* ... */ },
    take(cb, matcher = matchers.wildcard) {
      if (closed) {
        cb(END)
        return
      }
      cb[MATCH] = matcher
      ensureCanMutateNextTakers()
      nextTakers.push(cb)

      cb.cancel = once(() => {
        ensureCanMutateNextTakers()
        remove(nextTakers, cb)
      })
    },
    close,
  }
}

기본값으로 사용되는 channel 객체는 위와 같은 형태. 

// 1. cb: currCb()는 runEffect()에 전달되는 콜백 함수
const takeCb = (input) => {
  if (input instanceof Error) {
    // ...
  }
  if (isEnd(input) && !maybe) {
    // ...
  }
  currCb(input)
}
  
// 2. cb[MATCH]: matcher('INCREMENT_ASYNC')
(input) => input.type === 'INCREMENT_ASYNC'

// * 위 함수들의 인자인 input은 Action 객체
function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
  const takeCb = (input) => { /* ... */ }
  
  try {
    // 7. runSaga()에서 생성된 channel의 take() 메서드 실행
    channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
  } catch (err) {
    cb(err, true)
    return
  }
  cb.cancel = takeCb.cancel
}

다시 runTakeEffect() 함수로 돌아가 살펴보면 위 takeCb() 함수가 channel.take() 메서드에 전달됨. 그렇게 되면 channel 객체의 내부 변수인 nextTakers 배열에 위 함수가 추가됨.

 

 

F. Saga 미들웨어의 Action 구독

function sagaMiddleware({ getState, dispatch }) {
    boundRunSaga = runSaga.bind(null, { /* ... */ })

    return (next) => (action) => {
      if (sagaMonitor && sagaMonitor.actionDispatched) {
        sagaMonitor.actionDispatched(action)
      }
      const result = next(action) // hit reducers
      // 구독되어 있는 saga 함수들 실행
      channel.put(action)
      return result
    }
}

sagaMiddleware는 다른 Redux 미들웨어와 동일하게 다음 미들웨어를 실행하는 next() 함수, dispatch된 Action 객체를 나타내는 action을 인자로 가지는 커링 함수 형태인데, Saga 미들웨어가 추가된 환경에서는 Action 객체가 dispatch되면 아래와 같은 channel.put() 메서드가 실행됨.

export function multicastChannel() {  
  let currentTakers = []
  let nextTakers = currentTakers
  // ...
  
  return {
    [MULTICAST]: true,
    // input은 액션 객체: { type: '', payload: '' }
    put(input) {
      if (closed) {
        return
      }

      if (isEnd(input)) {
        close()
        return
      }

      const takers = (currentTakers = nextTakers)

      // 초기화 과정에서 currentTakers에 추가되었던 함수들을 모두 실행
      for (let i = 0, len = takers.length; i < len; i++) {
        const taker = takers[i]

        // type이 함수 taker의 MATCH 속성과 일치할 때만 실행
        if (taker[MATCH](input)) {
          taker.cancel()
          taker(input)
        }
      }
    },
    take(cb, matcher = matchers.wildcard) { /* ... */ },
    close,
  }
}

Saga 미들웨어 초기화 후 multicastChannel()가 종료되더라도 take, put 메서드는 클로저로서 nextTakers를 계속 참조하고 있기 때문에 Action이 dispatch될 때마다 take() Effect creator에 의해 nextTakers에 추가된 함수들이 실행될 수 있음.  

 

export function* watchIncrementAsync() {
  while (true) {
    yield take('INCREMENT_ASYNC')
    yield fork(incrementAsync)
  }
}

정리하면, 위와 같은 제너레이터 함수를 rootSaga에 추가했다면 rootSaga 초기화 시 take()에 의해 'INCREMENT_ASYNC' Action 타입에 watchIncrementAsync의 iterator.next()가 실행되도록(다음 yield 구문인 fork()가 실행됨) 구독하게 됨. 

 

 

G. fork()

take() Effect creator이 rootIterator.next()를 실행해 fork()가 실행될 수 있었는데 fork()는 어떤 방식으로 rootIterator.next()를 실행할까? 앞서 살펴본 대로 구독하는 Action이 dispatch되면 proc() 내부의 next() 함수가 실행됨. 이번에는 rootIterator.next()가 fork()이므로 result.value는 아래와 같은 객체.

// fork(incrementAsync)의 반환값
{
  [IO]: true,
  combinator: false,
  type: effectTypes.FORK,
  payload: { context: null, fn: incrementAsync, args: [] },
}
// redux-saga/packages/core/src/internal/proc.js
export default function proc(env, iterator, /* ... */) {
  // 별다른 설정을 하지 않았다면 finalRunEffect === runEffect
  const finalRunEffect = env.finalizeRunEffect(runEffect)
  
  next()
  
  function next(arg, isErr) {
    let result
    if (isErr) {
      // ...
    } else if (shouldCancel(arg)) {
      // ...
    } else if (shouldTerminate(arg)) {
      // ...
    } else {
      // 3. rootIterator.next() 메서드를 실행해 결과 얻음
      // 첫번째 yield 구문까지 실행
      result = iterator.next(arg)
    }

    if (!result.done) {
      // 4. iterator의 done: false일 때, 이 함수를 실행  
      digestEffect(result.value, parentEffectId, next)
    } else {
      // ...
    }
  }
}

rootIterator가 종료된 게 아니므로 result.done === false. 따라서, digestEffect → finalRunEffect( = runEffect) 순서로 함수가 실행됨을 상기해보면 

function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
  const takeCb = (input) => { /* ... */ }
  
  try {
    // 7. runSaga()에서 생성된 channel의 take() 메서드 실행
    channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
  } catch (err) {
    cb(err, true)
    return
  }
  cb.cancel = takeCb.cancel
}

이번에는 runForkEffect()가 실행되어 effect를 처리하게 됨.

function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
  // 9. fork()에 전달한 fn으로 iterator를 다시 만듦
  const taskIterator = createTaskIterator({ context, fn, args })
  const meta = getIteratorMetaInfo(taskIterator, fn)

  immediately(() => {
    const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
    
    // detached는 spawn()으로 생성된 effect일 때 true
    if (detached) {
      cb(child)
    } else {
      if (child.isRunning()) {
        parent.queue.addTask(child)
        cb(child)
      } else if (child.isAborted()) {
        parent.queue.abort(child.error())
      } else {
        cb(child)
      }
    }
  })
}
function createTaskIterator({ context, fn, args }) {
  try {
    const result = fn.apply(context, args)

    // fn이 제너레이터 함수일 때
    if (is.iterator(result)) {
      return result
    }
    let resolved = false

    // 10. 이러한 next() 메서드를 가지는 iterator를 반환
    const next = (arg) => {
      if (!resolved) {
      // 14. resolved가 true로 바뀌었기 때문에 iterator.next()가 실행되면 else 구문이 실행됨
        resolved = true
        // 비동기 함수라면 { value: Promise, done: false }
        return { value: result, done: !is.promise(result) }
      } else {
        return { value: arg, done: true }
      }
    }

    return makeIterator(next)
  } catch (err) {
    /* ... */
  }
}

10. createTaskIterator()에서 또다른 iterator가 만들어지는데 이 iterator를 forkIterator라고 하자. 이 iterator를 가지고 다시 proc() 함수를 실행함. proc의 next에서 forkIterator.next()가 실행되므로 result는 아래와 같음.

{ value: Promise { <pending>: returnValue }, done: false }

 

result.done === false이므로 digestEffect → finalRunEffect( = runEffect) 순서로 실행되어 is.promise(effect)의 if 구문이 실행됨.

// 11. runEffect(Promise, parentEffectId, currCb)를 실행
function runEffect(effect, effectId, currCb) {
  if (is.promise(effect)) {
    resolvePromise(effect, currCb)
  } else if (is.iterator(effect)) {
    // ...
  } else if (effect && effect[IO]) {
    // ...
  } else {
    // ...
  }
}
// 12. forkIterator.next()가 반환한 Promise를 처리
function resolvePromise(promise, cb) {
  const cancelPromise = promise[CANCEL]

  if (is.func(cancelPromise)) {
    cb.cancel = cancelPromise
  }
  promise.then(cb, (error) => {
    cb(error, true)
  })
}

이때, resolvePromise()의 Promise.then()에서 실행되는 cb() 함수는 forkIterator.next()를 처리하는 함수임에 유의해야 함. Promise의 결과값인 value  forkIterator.next(value)처럼 전달됨.

function runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {
  // ...

  immediately(() => {
    const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)
    
    if (detached) {
      cb(child)
    } else {
      if (child.isRunning()) {
        parent.queue.addTask(child)
        cb(child)
      } else if (child.isAborted()) {
        parent.queue.abort(child.error())
      } else {
      // 13. fork()의 task가 rootIterator.next()에 전달됨
        cb(child)
      }
    }
  })
}

13. 여기까지 fork()의 task를 만드는 게 종료되고 runForkEffect()에 전달된 cb()가 실행될 차례임. 이 cb()는 take(), fork()를 포함하는 제너레이터 함수의  rootIterator와 관련된 콜백함수인데 cb()에 전달되는 child가 바로 fork()가 반환하는 Task 객체임. 이 Task 객체가 rootIterator.next()에 전달되기 때문에 아래처럼 Task 객체를 이용해  업을 취소해줄 수 있음. 

function* main() {
  while (true) {
    yield take('START_BACKGROUND_SYNC')
    const bgSyncTask = yield fork(bgSync)

    // wait for the user stop action
    yield take('STOP_BACKGROUND_SYNC')
    // user clicked stop. cancel the background task
    // this will cause the forked bgSync task to jump into its finally block
    yield cancel(bgSyncTask)
  }
}

한편, forkIterator.next()가 실행되기도 전에 rootIterator.next()가 실행된 것에서 알 수 있듯이 ② fork()는 non-blocking effect임을 알 수 있음.

14. 이후, fork()에 전달한 비동기 함수가 resolve되었다면 forkIterator.next()가 실행되고 아래 값이 proc의 next()에서 처리됨.

{ value: PromiseResult, done: true }

 

 

 

 

take() 
Saga 미들웨어의 channel에 제너레이터 함수의 iterator.next()를 등록해 해당 Action이 dispatch될 때 iterator.next()이 실행되도록 함

fork()
fork에 전달된 함수와 제너레이터 함수의 iterator.next()를 순차적으로 실행함.
해당 함수가 비동기 함수라면 Promise가 resolve되기 전에 제너레이터 함수의 iterator.next()가 실행될 수 있음.(Non-blocking)