Controller 파이프라인
Controller는 watcher, reflector, scheduler, runner를 하나로 엮은 최상위 추상화입니다. Controller::new()부터 reconciler 호출까지 데이터가 어떤 경로로 흐르는지 추적합니다.
전체 데이터 흐름
각 단계:
- watcher() — API 서버를 watch하여
Event<K>스트림을 생성합니다 - reflector() — Event를 Store에 캐싱하면서 그대로 통과시킵니다
- .applied_objects() —
Event::Apply(K)와Event::InitApply(K)에서K만 추출합니다 - trigger_self() —
K→ReconcileRequest<K>로 변환합니다 - owns()/watches() — 관련 리소스에 대한 추가 trigger 스트림을 생성합니다
- select_all() — 모든 trigger 스트림을 하나로 합칩니다
- debounced_scheduler() — 동일
ObjectRef에 대한 중복을 제거하고 지연을 적용합니다 - Runner — 동시성을 제어하고, 같은 객체의 동시 reconcile을 방지합니다
- reconciler — 사용자 코드를 실행합니다
- Action/Error — 결과에 따라 scheduler로 피드백합니다
Controller 구조체
pub struct Controller<K> {
trigger_selector: SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
reader: Store<K>,
config: Config,
}
- trigger_selector: 모든 trigger 스트림(
trigger_self+owns+watches)을SelectAll로 합친 것입니다 - reader: reflector가 관리하는 Store의 읽기 핸들입니다. reconciler에서
Arc<K>를 꺼낼 때 사용합니다 - config: debounce duration, 동시 실행 수 제한 등을 설정합니다
Trigger 시스템
trigger_self — 주 리소스 변경
주 리소스(Controller::new에 지정한 타입)가 변경되면 해당 리소스에 대한 ReconcileRequest를 생성합니다.
// 내부 흐름
watcher Event → .applied_objects() → trigger_self() → ReconcileRequest<K>
ReconcileRequest는 대상 리소스의 ObjectRef와 trigger 이유를 담고 있습니다:
pub struct ReconcileRequest<K: Resource> {
pub obj_ref: ObjectRef<K>,
pub reason: ReconcileReason,
}
ReconcileReason에는 다음 variant가 있습니다:
| Variant | 설명 |
|---|---|
Unknown | reconcile_on()으로 주입된 외부 trigger |
ObjectUpdated | 주 리소스가 변경됨 |
RelatedObjectUpdated { obj_ref } | 관련 리소스가 변경됨 (owns/watches) |
ReconcilerRequestedRetry | reconciler가 Action::requeue()로 재실행 요청 |
ErrorPolicyRequestedRetry | error_policy가 재실행 요청 |
BulkReconcile | reconcile_all_on()으로 전체 재reconcile 요청 |
Custom { reason } | 사용자 정의 사유 (큐 스트림에 직접 주입 시) |
trigger_owners — 자식 리소스 변경
controller.owns::<ConfigMap>(api, wc)
- ConfigMap에 대한 별도 watcher를 생성합니다
- ConfigMap이 변경되면
metadata.ownerReferences를 확인합니다 - 부모(주 리소스)의
ObjectRef를 추출합니다 - 부모에 대한
ReconcileRequest를 발행합니다
ownerReferences에서 부모를 식별할 때, kind와 apiVersion이 Controller의 주 리소스 타입과 일치하는 항목만 추출합니다.
trigger_others — 관련 리소스 변경
controller.watches::<Secret>(api, wc, |secret| {
// Secret에서 관련 주 리소스의 ObjectRef 목록 반환
let name = secret.labels().get("app")?.clone();
let ns = secret.namespace()?;
Some(ObjectRef::new(&name).within(&ns))
})
- Secret에 대한 별도 watcher를 생성합니다
- Secret이 변경되면 사용자 정의 mapper 함수를 호출합니다
- mapper가 반환한
ObjectRef들에 대해ReconcileRequest를 발행합니다
사용 사례: Secret 변경 → 해당 Secret을 참조하는 모든 리소스를 재reconcile합니다.
외부 이벤트 소스 — reconcile_on
reconcile_on()으로 watch 이벤트 외의 외부 트리거를 Controller에 연결할 수 있습니다.
use tokio::sync::mpsc;
use futures::stream::ReceiverStream;
let (tx, rx) = mpsc::channel::<ObjectRef<MyResource>>(256);
Controller::new(api, wc)
.reconcile_on(ReceiverStream::new(rx))
.run(reconcile, error_policy, ctx)
외부 webhook, 메시지 큐, 타이머 등에서 ObjectRef를 보내면 해당 리소스의 reconcile이 트리거됩니다. 이때 ReconcileReason은 Unknown으로 설정됩니다.
Stream 기반 API
기본 Controller::new()는 내부에서 watcher를 생성하지만, for_stream()을 사용하면 미리 필터링된 스트림을 직접 주입할 수 있습니다. owns_stream()과 watches_stream()도 동일한 패턴입니다.
// 외부에서 생성한 watcher 스트림을 Controller에 주입
let (reader, writer) = reflector::store();
let stream = reflector(writer, watcher(api.clone(), wc))
.applied_objects();
Controller::for_stream(stream, reader)
.owns_stream::<ConfigMap>(cm_stream)
.watches_stream::<Secret, _>(secret_stream, |secret| { /* mapper */ })
.run(reconcile, error_policy, ctx)
다중 스트림 조합
여러 관련 리소스의 이벤트를 하나의 Controller에 통합하는 고급 패턴입니다. for_stream()과 watches_stream()을 조합하면 외부에서 구성한 스트림을 Controller에 주입할 수 있습니다.
use futures::stream;
use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
// 주 리소스 스트림
let (reader, writer) = reflector::store();
let main_stream = reflector(writer, watcher(main_api, wc.clone()))
.applied_objects()
.default_backoff();
// 관련 리소스 스트림들
let cm_stream = watcher(cm_api, wc.clone())
.applied_objects()
.default_backoff();
let secret_stream = watcher(secret_api, wc.clone())
.applied_objects()
.default_backoff();
Controller::for_stream(main_stream, reader)
.watches_stream(cm_stream, |cm| {
// ConfigMap → 주 리소스 매핑
let name = cm.labels().get("app")?.clone();
let ns = cm.namespace()?;
Some(ObjectRef::new(&name).within(&ns))
})
.watches_stream(secret_stream, |secret| {
// Secret → 주 리소스 매핑
let name = secret.labels().get("app")?.clone();
let ns = secret.namespace()?;
Some(ObjectRef::new(&name).within(&ns))
})
.run(reconcile, error_policy, ctx)
이 패턴은 owns()/watches()가 내부에서 생성하는 watcher를 외부에서 직접 구성하는 것입니다. 스트림에 .modify()로 필드를 제거하거나, shared reflector로 여러 Controller가 공유하는 등의 세밀한 제어가 가능합니다. 제네릭 컨트롤러에서 공유 패턴을 다룹니다.
이 API들은 불안정 기능 플래그 뒤에 있습니다:
reconcile_on()→unstable-runtime-reconcile-onfor_stream(),owns_stream(),watches_stream()→unstable-runtime-stream-control
kube = { version = "...", features = ["unstable-runtime-reconcile-on", "unstable-runtime-stream-control"] }
Scheduler — 중복 제거와 지연
debounced_scheduler()는 DelayQueue + HashMap으로 중복 제거와 시간 기반 스케줄링을 수행합니다.
동작 방식:
- 같은
ObjectRef에 대한 여러 trigger가 들어오면 가장 이른 시간 하나만 유지합니다 - debounce가 설정되면, 설정된 기간 내의 추가 trigger를 무시합니다
debounce = 1초일 때:
t=0.0 trigger(A) → 1초 후 실행 예약
t=0.3 trigger(A) → 이미 예약됨, 무시
t=1.0 A 실행
t=1.2 trigger(A) → 다시 1초 후 예약
debounce가 필요한 이유: reconciler가 status를 업데이트하면 → 새 resourceVersion 생성 → watch 이벤트 발생 → 불필요한 재reconcile이 됩니다. debounce가 이 burst를 흡수합니다. 자세한 내용은 Reconciler 패턴에서 다룹니다.
Runner — 동시성 제어
Runner는 scheduler에서 나온 항목을 실제로 reconciler에 전달하면서 동시성을 제어합니다.
핵심 동작:
- readiness gate:
Store::wait_until_ready()가 완료될 때까지 reconcile을 시작하지 않습니다 - hold_unless: 이미 reconcile 중인
ObjectRef는 scheduler에 남겨둡니다 - max_concurrent_executions: 전체 동시 reconcile 수를 제한합니다
hold_unless 패턴이 핵심입니다. 같은 객체에 대한 동시 reconcile을 방지합니다:
- A 객체 reconcile 중 → A에 대한 새 trigger 도착 → scheduler에서 대기
- A 완료 → scheduler에서 A를 다시 꺼내 실행
이렇게 하면 같은 리소스에 대한 reconcile이 순서대로 실행됩니다.
Reconcile 결과 처리
reconciler의 결과에 따라 scheduler에 피드백합니다.
성공 시
async fn reconcile(obj: Arc<MyResource>, ctx: Arc<Context>) -> Result<Action, Error> {
// ... reconcile 로직 ...
// 5분 후 다시 확인
Ok(Action::requeue(Duration::from_secs(300)))
// 또는: 다음 watch 이벤트까지 대기
Ok(Action::await_change())
}
| Action | 동작 |
|---|---|
requeue(Duration) | 지정된 시간 후 scheduler에 재실행 예약 |
await_change() | 능동적 requeue 없음, 다음 watch 이벤트 대기 |
실패 시
fn error_policy(
obj: Arc<MyResource>,
error: &Error,
ctx: Arc<Context>,
) -> Action {
Action::requeue(Duration::from_secs(5))
}
error_policy가 반환한 Action에 따라 scheduler에 예약합니다.
Config
use kube::runtime::controller::Config;
controller.with_config(
Config::default()
.debounce(Duration::from_secs(1)) // trigger 간 최소 간격
.concurrency(10) // 최대 동시 reconcile 수
)
| 설정 | 기본값 | 설명 |
|---|---|---|
debounce | 0 (없음) | trigger 간 최소 간격. burst 흡수에 유용합니다 |
concurrency | 0 (무제한) | 동시 reconcile 수 제한. 0은 무제한입니다 |
Shutdown
// SIGTERM/SIGINT에 반응
controller.shutdown_on_signal()
// 커스텀 shutdown trigger
controller.graceful_shutdown_on(async {
// 어떤 조건...
})
graceful shutdown 동작:
- 새 reconcile 시작을 중단합니다
- 진행 중인 reconcile은 완료될 때까지 대기합니다
내부적으로 tokio::signal을 사용합니다.