Bronze Layer Kafka Ingestion DDL
-- =====================================================
-- FINVU BRONZE LAYER - KAFKA EVENT INGESTION (ENGINEERING-FOCUSED)
-- =====================================================
-- Purpose: Engineering-focused staging and ingestion layer for optimal data capture
-- Pattern: Kafka Table Engine + Materialized View → MergeTree Storage
-- Performance Target: Real-time ingestion with <15 minute latency
-- Philosophy: Maximum data capture, minimal processing, business logic in Silver
-- Based on: Engineering excellence + eventConfig-aa.json event patterns
-- =====================================================
-- =====================================================
-- BRONZE SCHEMA SETUP
-- =====================================================
CREATE DATABASE IF NOT EXISTS bronze;
-- =====================================================
-- KAFKA SOURCE TABLE (Ephemeral - No Data Storage)
-- =====================================================
-- Kafka Table Engine for consuming from Finvu AA events topic
-- Reference: https://clickhouse.com/docs/integrations/kafka/kafka-table-engine
CREATE TABLE bronze.kafka_finvu_events (
event_timestamp DateTime64(3) COMMENT 'Event timestamp from Kafka message',
event_data String COMMENT 'Raw JSON event payload from Kafka',
-- Essential Kafka Metadata (for debugging and lineage)
_topic String COMMENT 'Kafka topic name',
_partition UInt64 COMMENT 'Kafka partition number',
_offset UInt64 COMMENT 'Kafka message offset',
_timestamp DateTime COMMENT 'Kafka message timestamp',
_key String COMMENT 'Kafka message key'
) ENGINE = Kafka()
SETTINGS
-- Kafka Connection Settings
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'finvu-aa-events',
kafka_group_name = 'clickhouse-finvu-consumer',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
-- Performance Settings (Optimized for throughput)
kafka_num_consumers = 3, -- Scale with topic partitions
kafka_max_block_size = 65536, -- Optimal batch size for throughput
kafka_flush_interval_ms = 7500, -- Near real-time processing
kafka_thread_per_consumer = 1, -- Parallel processing per consumer
-- Error Handling (Robust but not overly permissive)
kafka_skip_broken_messages = 100,
input_format_allow_errors_num = 10,
input_format_allow_errors_ratio = 0.05,
input_format_skip_unknown_fields = 1, -- Handle schema evolution
-- JSON Settings
input_format_import_nested_json = 1,
input_format_json_read_objects_as_strings = 1
COMMENT 'Kafka consumer table for Finvu AA events - engineering-optimized configuration';
-- =====================================================
-- BRONZE STORAGE TABLE (Engineering-Optimized Event Staging)
-- =====================================================
-- Engineering-focused staging table for Silver layer processing
-- Complete event preservation with minimal extraction for routing/ordering
CREATE TABLE bronze.finvu_events (
-- Temporal Organization (Ingestion Performance)
event_date Date DEFAULT toDate(event_timestamp),
event_timestamp DateTime64(3) CODEC(Delta, ZSTD(3)),
ingestion_timestamp DateTime64(3) DEFAULT now64(3) COMMENT 'ClickHouse ingestion timestamp',
-- Event Identity (Minimal for Routing)
event_id String COMMENT 'Event identifier for deduplication',
event_name LowCardinality(String) COMMENT 'Event type for Silver layer routing',
event_source LowCardinality(String) COMMENT 'Source system classification',
event_type LowCardinality(String) COMMENT 'Transport type (HTTP/WS/etc)',
-- Complete Event Preservation (Schema Agnostic)
event_data JSON CODEC(ZSTD(3)) COMMENT 'Complete raw event - business logic in Silver',
event_data_size UInt32 COMMENT 'Size tracking for performance monitoring',
-- Engineering Metadata (Infrastructure Support)
data_quality LowCardinality(String) DEFAULT 'CAPTURED' COMMENT 'CAPTURED/CORRUPTED/PARTIAL',
processing_errors String DEFAULT '' COMMENT 'Ingestion error tracking',
is_duplicate Bool DEFAULT 0 COMMENT 'Duplicate detection flag',
-- Complete Kafka Lineage (Data Engineering Essential)
kafka_topic LowCardinality(String) COMMENT 'Source topic',
kafka_partition UInt32 COMMENT 'Source partition',
kafka_offset UInt64 COMMENT 'Source offset',
kafka_timestamp DateTime COMMENT 'Kafka message timestamp',
kafka_key String COMMENT 'Kafka message key',
-- Minimal Business Context (For Silver Layer Efficiency)
txn_id String DEFAULT '' COMMENT 'Transaction correlation ID',
session_id String DEFAULT '' COMMENT 'Session correlation ID',
request_url String DEFAULT '' COMMENT 'API endpoint tracking',
response_code UInt16 DEFAULT 0 COMMENT 'HTTP response code'
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date) -- Monthly partitions for efficient queries
ORDER BY (event_name, event_timestamp, event_id) -- Optimized for ingestion and Silver access
SETTINGS index_granularity = 8192
TTL event_timestamp + INTERVAL 7 YEAR -- Regulatory compliance
COMMENT 'Bronze staging table - complete event capture for Silver layer processing';
-- =====================================================
-- MATERIALIZED VIEW (Engineering-Focused Event Processing)
-- =====================================================
-- Real-time processing from Kafka to Bronze storage
-- Minimal extraction, maximum performance, business logic deferred to Silver
CREATE MATERIALIZED VIEW bronze.mv_kafka_to_finvu_events TO bronze.finvu_events AS
SELECT
-- Time handling (performance critical)
toDate(COALESCE(
parseDateTime64BestEffortOrNull(JSONExtractString(event_data, 'eventTimestamp')),
_timestamp
)) AS event_date,
COALESCE(
parseDateTime64BestEffortOrNull(JSONExtractString(event_data, 'eventTimestamp')),
_timestamp
) AS event_timestamp,
now64(3) AS ingestion_timestamp,
-- Basic event identification (minimal extraction)
COALESCE(
JSONExtractString(event_data, 'eventId'),
JSONExtractString(event_data, 'id'),
toString(generateUUIDv4())
) AS event_id,
COALESCE(
JSONExtractString(event_data, 'eventName'),
'UNKNOWN_EVENT'
) AS event_name,
-- Event source classification (for Silver layer routing)
CASE
WHEN JSONExtractString(event_data, 'eventSource') != '' THEN
JSONExtractString(event_data, 'eventSource')
WHEN JSONExtractString(event_data, 'eventName') LIKE 'USER:AA:%' THEN 'AAServer:UserEndpoint'
WHEN JSONExtractString(event_data, 'eventName') LIKE 'FIU:AA:%' THEN 'AAServer:ConsentFlow'
WHEN JSONExtractString(event_data, 'eventName') LIKE 'FIP:AA:%' THEN 'AAServer:Notifications'
WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:FIP:%' THEN 'AAServer:DataFlow'
WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:FIU:%' THEN 'AAServer:DataFlow'
ELSE 'AAServer:Undefined'
END AS event_source,
-- Event type (transport classification)
CASE
WHEN JSONExtractString(event_data, 'eventType') != '' THEN
JSONExtractString(event_data, 'eventType')
WHEN JSONExtractString(event_data, 'eventName') LIKE '%:AA:%Request' THEN 'HTTP_IN'
WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:%:%Request' THEN 'HTTP_OUT'
WHEN JSONExtractString(event_data, 'eventName') LIKE '%Websocket%' THEN 'WS_IN'
ELSE 'DEFAULT'
END AS event_type,
-- Complete event preservation (core Bronze responsibility)
CAST(event_data AS JSON) AS event_data,
length(event_data) AS event_data_size,
-- Engineering quality assessment (not business quality)
CASE
WHEN event_data = '' OR NOT isValidJSON(event_data) THEN 'CORRUPTED'
WHEN length(event_data) < 50 THEN 'PARTIAL'
ELSE 'CAPTURED'
END AS data_quality,
-- Error tracking for engineering ops
CASE
WHEN NOT isValidJSON(event_data) THEN 'INVALID_JSON'
WHEN length(event_data) = 0 THEN 'EMPTY_PAYLOAD'
ELSE ''
END AS processing_errors,
-- Basic duplicate detection
CASE
WHEN JSONExtractString(event_data, 'isDuplicate') = 'true' THEN 1
ELSE 0
END AS is_duplicate,
-- Complete Kafka lineage (essential for data engineering)
_topic AS kafka_topic,
_partition AS kafka_partition,
_offset AS kafka_offset,
_timestamp AS kafka_timestamp,
_key AS kafka_key,
-- Minimal correlation IDs (for Silver layer efficiency)
JSONExtractString(event_data, 'txnId') AS txn_id,
JSONExtractString(event_data, 'sessionId') AS session_id,
JSONExtractString(event_data, 'requestUrl') AS request_url,
JSONExtractUInt(event_data, 'responseCode') AS response_code
FROM bronze.kafka_finvu_events
WHERE event_data != '' AND length(event_data) > 10
COMMENT 'Minimal processing for maximum ingestion performance';
-- =====================================================
-- PERFORMANCE OPTIMIZATION
-- =====================================================
-- Essential indices for Silver layer access patterns
ALTER TABLE bronze.finvu_events
ADD INDEX idx_event_name (event_name) TYPE set(100) GRANULARITY 4;
ALTER TABLE bronze.finvu_events
ADD INDEX idx_event_source (event_source) TYPE set(10) GRANULARITY 4;
ALTER TABLE bronze.finvu_events
ADD INDEX idx_data_quality (data_quality) TYPE set(5) GRANULARITY 4;
-- =====================================================
-- ENGINEERING MONITORING VIEWS
-- =====================================================
-- 1. Ingestion Performance Monitor
CREATE VIEW bronze.v_ingestion_performance AS
SELECT
toStartOfHour(ingestion_timestamp) AS hour,
event_source,
count(*) AS events_ingested,
round(count(*) / 3600, 2) AS events_per_second,
countIf(data_quality = 'CORRUPTED') AS corrupted_events,
countIf(processing_errors != '') AS error_count,
max(ingestion_timestamp) AS latest_ingestion,
dateDiff('minute', max(event_timestamp), max(ingestion_timestamp)) AS lag_minutes
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
GROUP BY hour, event_source
ORDER BY hour DESC, events_ingested DESC
COMMENT 'Engineering performance metrics for Bronze layer';
-- 2. Event Type Coverage Monitor
CREATE VIEW bronze.v_event_coverage AS
SELECT
event_source,
event_type,
count(DISTINCT event_name) AS unique_event_types,
count(*) AS total_events,
min(event_timestamp) AS first_seen,
max(event_timestamp) AS last_seen,
round(avg(event_data_size), 0) AS avg_event_size_bytes
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
GROUP BY event_source, event_type
ORDER BY total_events DESC
COMMENT 'Complete event type coverage validation';
-- 3. Kafka Consumer Health Monitor
CREATE VIEW bronze.v_kafka_consumer_health AS
SELECT
kafka_topic,
kafka_partition,
count(*) AS events_consumed,
max(kafka_offset) AS latest_offset,
max(kafka_timestamp) AS latest_kafka_timestamp,
dateDiff('minute', max(kafka_timestamp), max(ingestion_timestamp)) AS consumer_lag_minutes,
countIf(processing_errors != '') AS processing_errors
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 1 HOUR
GROUP BY kafka_topic, kafka_partition
ORDER BY consumer_lag_minutes DESC
COMMENT 'Kafka consumer performance and health monitoring';
-- 4. Data Quality Trends
CREATE VIEW bronze.v_data_quality_trends AS
SELECT
toStartOfHour(ingestion_timestamp) AS hour,
data_quality,
count(*) AS event_count,
count(DISTINCT event_name) AS unique_event_types,
groupArray(DISTINCT processing_errors) AS error_types,
round(avg(event_data_size), 2) AS avg_event_size
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
AND data_quality != 'CAPTURED' -- Focus on problematic events
GROUP BY hour, data_quality
ORDER BY hour DESC, event_count DESC
COMMENT 'Data quality trends for operational alerts';
-- =====================================================
-- SILVER LAYER INTEGRATION EXAMPLES
-- =====================================================
/*
-- Silver Layer will perform comprehensive business logic extraction:
-- 1. Event Correlation & Stitching
SELECT
event_data,
-- Extract business context in Silver:
JSONExtractString(event_data, 'consentHandle') AS consent_handle,
JSONExtractString(event_data, 'fiu') AS fiu_id,
JSONExtractString(event_data, 'purposeCode') AS purpose_code,
-- Multi-path extraction for schema evolution:
COALESCE(
JSONExtractString(event_data, 'consentHandle'),
JSONExtractString(event_data, 'consentId'),
JSONExtractString(event_data, 'consent_handle')
) AS consent_handle_enhanced
FROM bronze.finvu_events
WHERE event_source = 'AAServer:ConsentFlow'
AND event_timestamp >= now() - INTERVAL 1 HOUR;
-- 2. Journey Reconstruction (Silver Layer Responsibility)
-- Correlate events by consent_handle, session_id, txn_id
-- Apply business rules for revenue attribution
-- Calculate journey metrics and KPIs
-- 3. Business Enrichment (Silver Layer Responsibility)
-- FIU client attribution and billing calculations
-- Purpose code revenue mapping (₹3 vs ₹0.10)
-- Customer journey analysis and funnel metrics
*/
-- =====================================================
-- DEPLOYMENT CHECKLIST
-- =====================================================
/*
ENGINEERING-FOCUSED BRONZE LAYER DEPLOYMENT CHECKLIST:
PHASE 1 - INFRASTRUCTURE SETUP:
☐ Kafka broker accessible from ClickHouse
☐ Topic 'finvu-aa-events' created with 6-12 partitions
☐ Consumer group 'clickhouse-finvu-consumer' configured
☐ Bronze database and tables created successfully
☐ Materialized view processing events without errors
☐ All 4 monitoring views created and functional
PHASE 2 - EVENT VALIDATION (Engineering Focus):
☐ All 43 event types being captured correctly
☐ 6 event sources properly classified (UserEndpoint, ConsentFlow, DataFlow, etc.)
☐ Event types (HTTP_IN/OUT, WS_IN/OUT, DEFAULT) correctly identified
☐ JSON preservation maintaining all original event data
☐ No missing events (compare Kafka consumer lag with ingestion rate)
☐ Complete Kafka lineage metadata captured
PHASE 3 - PERFORMANCE & RELIABILITY:
☐ Ingestion rate >10K events/minute sustained
☐ Processing lag <15 minutes consistently
☐ All monitoring views responding <3 seconds
☐ Data quality assessment working (CAPTURED/CORRUPTED/PARTIAL)
☐ Index performance optimized for Silver layer access
☐ TTL and partitioning working for 7-year retention
PHASE 4 - SILVER LAYER INTEGRATION:
☐ Silver layer can efficiently access Bronze staging data
☐ Complete event data available for business logic processing
☐ Event routing by event_source working correctly
☐ Temporal ordering maintained for journey reconstruction
☐ Kafka lineage available for debugging and replay
PHASE 5 - OPERATIONAL READINESS:
☐ Alerts configured for ingestion lag, error rates, data quality
☐ Monitoring dashboards created using provided views
☐ Runbooks created for common operational scenarios
☐ Performance tuning guidelines documented
☐ Schema evolution procedures documented
☐ Clear handoff procedures to Silver layer team
*/ ```