Combine 살펴보기: Publisher
Publisher
Declares that a type can transmit a sequence of values over time.
시간이 지남에 따라 값 시퀀스를 보낼 수 있는 타입을 정의함.
Apple Documentation
Combine의 퍼블리셔는 Observable의 개념이다. 퍼블리셔는 실제 구현이 아닌 Protocol로 정의되어 있다. RX의 경우에는 Observable Class의 구현을 제공하는 것과 차이가 있다. 따라서, Publisher는 여러 구현체를 가질 수 있고, Future, Just, …. 와 같은 편의 퍼블리셔를 제공한다.
Publisher 프로토콜을 채택하는 Publisher를 구현하기 위해서는 프로토콜 채택 후, Output, Failure: Error associatedType과 receive(subscriber:) 함수를 구현해야한다. 다만, 애플은 Publisher를 직접 구현하기는 권장하지 않고 다음과 같은 방식으로 커스텀 Publisher를 만들 것을 추천하고 있다.
- Subject를 이용한 생성 (PassthroughSubject, CurrentValueSubject)
- @Published property wrapper를 이용한 생성
Combine의 Publisher가 Subscriber에게 subscribe(_:) 함수로 구독할 수 있게 해준다. 구현적으로는 Publisher에서 Subscriber를 붙이는 함수는 recieve(subscriber:)이지만, 이는 subscribe(_:) 함수에서 내부적으로 이용되기를 기대하고, 실제 사용시에는 recieve(subscriber:)를 직접 사용하지 말고, subscribe(_:)를 이용할 것을 권장한다.
Subscriber가 구독을 시작하게 되면, Publisher는 Subscription의 구현을 Subscriber에 주게 된다.1RX의 Cancelable과 동일하다. Subscription을 통해서 Subcriber는 Publisher에 시퀀스를 요청할 수 있고, 필요에 따라 구독을 해제할 수 있다. 구독 후에, Publisher는 Subscriber의 recieve(_:) 함수를 통해서 값을 전송한다.2RXSwift에서는 on(_:) 함수를 통해 전송함.
Publisher가 제공하는 데이터의 시퀀스에 변형을 가하고 조정하기 위해 Operator를 이용한다.
Subject
A publisher that exposes a method for outside callers to publish elements.
원소를 발행하도록 외부 호출자에게 메소드를 노출하는 퍼블리셔.
Subject는 Publisher 프로토콜을 채택하고, 외부에서 값을 주입받아 발행할 수 있도록 하는 Publisher이다. 따라서, 외부 주입을 위한 send() 메소드들을 추가적으로 구현해야한다.
Combine의 Subject와 RX의 Subject는 똑같은 용도로 이용되긴 하지만 개념적으로는 살짝 다른 부분이 있는 것 같다. RX 홈페이지에서 Subject를 설명하기에,
주제(subject)는 옵저버나 Observable처럼 행동하는 ReactiveX의 일부 구현체에서 사용 가능한 일종의 교각 혹은 프록시라고 볼 수 있는데, 그 이유는 주제는 옵저버이기 때문에 하나 이상의 Observable을 구독할 수 있으며 동시에 Observable이기도 하기 때문에 항목들을 하나 하나 거치면서 재배출하고 관찰하며 새로운 항목들을 배출할 수도 있다.
라고 설명하고 있다. 따라서, Subject는 Observer이면서, Observable이다. RX에서는 Subject가 Observable의 상속이지만, ObserverType을 채택하고 있기 때문에 다른 옵저버와 동일하게 on() 메소드로 값을 받게 되어 있다. 그러나, Combine에서는 주입을 받을 수 있는 Publisher로 Subject를 해석하고 있기 때문에, Subscriber의 recive(:)가 아닌 send(:) 메소드를 사용하고, 따로 Subscriber를 채택하고 있지 않다.
일반적으로 내가 사용했을 때에도 보통은 주입을 하는 용도로만 Subject를 사용했지, Observer의 용도로 Subject를 사용할 생각은 해본 적이 없어서 Combine의 해석이 더 직관적인데, 다른 사람은 어떻게 해석하는 진 모르겠다.
Combine에서는 Subject의 구현체로 CurrentValueSubject와 PassThroughSubject를 제공하고 있다. CurrentValueSubject는 최근의 값에 대한 버퍼를 가지고 있어서 구독하는 순간 최근 값의 버퍼를 받게 되고, PassThroughSubject는 그렇지 않아서 값이 주입되는 시점에서만 Subscriber가 새로운 값을 받을 수 있게 된다.3각각 BehaviorSubject, PublishSubject와 유사하다. RX에서는 마지막 값 배출만을 위한 AysncSubject와 스트림을 전부 저장했다가 구독 시점에서 모두 배출해주는 ReplaySubject가 존재하지만, Combine에는 없다.
@Published
Combine을 이용해 저장소가 있는 Publisher를 만드는 데에 @Published 어노테이션을 쓴다. Publisher라는 Structure를 PropertyWrapper로 표현한 것인데, 기존과 동일한 방식으로 변수를 선언하면서도 @Published 어노테이션만 앞에 붙이면 변수에 대한 Publisher를 쉽게 만들 수 있다.(@Published는 Class에서만 사용할 수 있다. 아마도 Published 자체가 Structure이기 때문에 값 복사로 인한 혼돈을 방지하려고 그런 제한을 두지 않았냐는 생각이 들지만, 정확히 막혀있는 이유는 알 수 없다.)4이러한 커스텀 PropertyWrapper를 작성하는 것은 어렵지 않지만, 적용범위를 Class로 한정시키는 방법은 따로 공개되어 있지 않은 것 같다.
내부적으로 Published는 변수에 대한 Storage를 가지면서도, 타입 내부에 선언한 Publisher 구현체의 인스턴스를 지닌다. 그 Publisher는 ProjectedValue로 노출되고, 구독할 수 있다.($ 어노테이션) 따라서, 별다른 노력 없이 변수의 변화를 Subscriber를 통해 관찰할 수 있다.
구현과 개념적으로는 상당히 다른 부분이 있지만, 모델이 가진 값을 쉽게 구독할 수 있기 때문에 RX에서는 BehaviorRelay(←BehaviorSubject의 .completed, .error를 무시하고 계속 구동하는 wrapper)로 작성할 수 있는 부분을 Published로 작성할 수 있고, 나에게는 이 방식이 더 직관적으로 느껴진다.
Cancellable
A protocol indicating that an activity or action supports cancellation.
취소를 지원하는 활동이나 행동을 나타내는 프로토콜
Cancellable로 각 인스턴스에 대한 취소를 지원할 수 있다.5스트림 구독을 취소한다는 의미이지, 구독하면서 활동할 때 바로 행동을 취소시킬 수 있다는 이야기 아님. Cancellable이 생성된 이후 deinit만으로 구독이 끝나지는 않고, 명시적으로 cancel()을 호출해야한다. 또는, AnyCancellable을 통해 Cancellable을 감싸주면 AnyCancellable을 denit하는 시점에 자동으로 스트림구독을 취소한다.6그런데, 현재 버전에서는 Cancellable로 선언한 후에 nil로 초기화해도 잘 cancel되는 것으로 보인다. 어느 시점에 변화가 있었는 듯 함. 검색해보면 분명 cancel()을 호출해야 구독이 끝났는데, 지금은 AnyCancellable이 아닌 Cancellable의 denit으로도 cancel이 호출되고 있음.
RX의 Disposable과 동일한 개념이며, AnyCancellable을 Collection의 형태로 쓰는 것은 DIsposeBag()의 역할과 유사하다.
ConnectablePublisher
하나의 Publisher에 여러 개의 Subscriber가 붙을 때, 프로그래머가 특정 조건이 만족될 수 있을 때까지 발행을 지연시킬 수 있도록 도와주는 퍼블리셔이다. makeConnectable() 오퍼레이터로 만들 수 있으며, makeConnectable()이 만든 퍼블리셔에 connect()를 호출하기 전까지 발행을 지연시킨다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
struct ExamplePublisher: Publisher { typealias Output = Int typealias Failure = Never func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Int == S.Input { Swift.print("구독 요청함!") let subscription = InternalSubscription(subscriber: subscriber) subscriber.receive(subscription: subscription) } } extension ExamplePublisher { class InternalSubscription<S: Subscriber>: NSObject, Subscription where Never == S.Failure, Int == S.Input { let subscriber: S init(subscriber: S) { self.subscriber = subscriber } func request(_ demand: Subscribers.Demand) { Swift.print("서브스크립션이 리퀘스트를 받음!") let demand = subscriber.receive(32) } func cancel() { subscriber.receive(completion: .finished) } } } let publisher = ExamplePublisher() let connectablePublisher = publisher.makeConnectable() let sub1 = connectablePublisher .sink { value in print("First subscriber \(value)") } let sub2 = connectablePublisher .sink { value in print("second subscriber \(value)") } print("모두 구독함!") let connection = connectablePublisher.connect() while true { RunLoop.main.run() } 호출 결과: connectablePublisher를 이용했을 때, 모두 구독함! 구독 요청함! 서브스크립션이 리퀘스트를 받음! second subscriber 32 First subscriber 32 이용하지 않았을 때, 구독 요청함! 서브스크립션이 리퀘스트를 받음! First subscriber 32 구독 요청함! 서브스크립션이 리퀘스트를 받음! second subscriber 32 모두 구독함! |
예시로 작성한 코드를 살펴보면, ConnectablePublisher는 connect() 함수가 호출될 때까지, 대상 Publisher에 대한 구독을 지연하고, connect()가 호출되는 순간 구독을 개시하면서, 하위 Subscriber에 시퀀스를 나눠준다.
구독이 지연되기 때문에, Apple Document에서는 URLRequest와 같은 것을 Combine으로 작성할 때, 두 개 이상 서브스크라이버가 해당 요소를 이용할 필요가 있을 경우, connect()가 호출된 이후부터 요청이 시작되게 하여 경쟁 상태를 방지하는데 이용하라고 알려주고 있다.
또한, Mulicast 퍼블리셔 또는, TimerPublisher는 애초에 ConnectablePublisher인데, 이를 일반 Publihser와 같이 이용하려면 autoconnect() 오퍼레이터를 이용해서 connect() 메소드 호출 없이 이용할 수 있다고 알려주고 있다.
이는 RX 홈페이지에서 소개하는 “Hot” (일반적인 Publisher)과 “Cold”의 개념과 유시하고, RX에서도 동일한 방식의 인터페이스를 제공하고 있다.