📘 Laboratorio 1: ETL
📘 Laboratorio 1: ETL
🎯 Objetivo
Implementar una arquitectura de datos moderna (Medallón) en Supabase para el caso Aletza, construyendo pipelines ETL/ELT que transformen datos crudos en información valiosa para análisis y modelos de IA.
Al finalizar, podrás:
- Diseñar capas Bronze (Raw), Silver (Cleansed) y Gold (Aggregated)
- Implementar procesos de carga incremental
- Crear pipelines con funciones y procedimientos
- Orquestar transformaciones con SQL
- Preparar datos para consumo en BI y Machine Learning
📋 Requisitos Previos
- Tener ejecutados los Laboratorios 1-4
- Conectado a Supabase con el SQL Editor abierto
- Tablas originales:
usuarios,perfiles_voz,sesiones,mensajes,logs_ejecucion
🏗️ Paso 1: Arquitectura Medallón en Supabase
1.1 Creación de Esquemas por Capa
-- Crear esquemas para cada capa de la arquitectura Medallón
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;
-- Verificar esquemas creados
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name IN ('bronze', 'silver', 'gold');
1.2 Capa Bronze (Raw) - Datos sin Transformar
-- Tabla Bronze: usuarios_raw (copia exacta de la fuente)
CREATE TABLE bronze.usuarios_raw (
id INTEGER,
username TEXT,
email TEXT,
creado_en TIMESTAMPTZ,
pais TEXT,
ciudad TEXT,
activo BOOLEAN,
fecha_carga TIMESTAMPTZ DEFAULT NOW(),
origen TEXT DEFAULT 'source_system'
);
-- Tabla Bronze: mensajes_raw
CREATE TABLE bronze.mensajes_raw (
id INTEGER,
sesion_id INTEGER,
remitente_id INTEGER,
contenido TEXT,
modalidad TEXT,
metadata_ia JSONB,
enviado_en TIMESTAMPTZ,
fecha_carga TIMESTAMPTZ DEFAULT NOW(),
origen TEXT DEFAULT 'source_system'
);
-- Tabla Bronze: logs_raw
CREATE TABLE bronze.logs_ejecucion_raw (
id INTEGER,
mensaje_id INTEGER,
modulo TEXT,
tiempo_ejecucion_ms INTEGER,
exito BOOLEAN,
error TEXT,
ejecutado_en TIMESTAMPTZ,
fecha_carga TIMESTAMPTZ DEFAULT NOW(),
origen TEXT DEFAULT 'source_system'
);
-- Tabla Bronze: sesiones_raw
CREATE TABLE bronze.sesiones_raw (
id INTEGER,
usuario_id INTEGER,
token TEXT,
canal TEXT,
iniciada_en TIMESTAMPTZ,
ultima_actividad TIMESTAMPTZ,
expira_en TIMESTAMPTZ,
cerrada BOOLEAN,
fecha_carga TIMESTAMPTZ DEFAULT NOW(),
origen TEXT DEFAULT 'source_system'
);
-- Tabla Bronze: perfiles_voz_raw
CREATE TABLE bronze.perfiles_voz_raw (
id INTEGER,
usuario_id INTEGER,
palabra_magica TEXT,
vector_voz JSONB,
muestras_tomadas INTEGER,
estado TEXT,
creado_en TIMESTAMPTZ,
fecha_carga TIMESTAMPTZ DEFAULT NOW(),
origen TEXT DEFAULT 'source_system'
);
-- Verificar tablas creadas
SELECT table_name, table_schema
FROM information_schema.tables
WHERE table_schema = 'bronze'
ORDER BY table_name;
1.3 Carga Inicial a Bronze (Full Load)
-- Cargar datos de producción a bronze
INSERT INTO bronze.usuarios_raw (id, username, email, creado_en, pais, ciudad, activo)
SELECT id, username, email, creado_en, pais, ciudad, TRUE
FROM public.usuarios;
INSERT INTO bronze.mensajes_raw (id, sesion_id, remitente_id, contenido, modalidad, metadata_ia, enviado_en)
SELECT id, sesion_id, remitente_id, contenido, modalidad, metadata_ia, enviado_en
FROM public.mensajes;
INSERT INTO bronze.logs_ejecucion_raw (id, mensaje_id, modulo, tiempo_ejecucion_ms, exito, error, ejecutado_en)
SELECT id, mensaje_id, modulo, tiempo_ejecucion_ms, exito, error, ejecutado_en
FROM public.logs_ejecucion;
INSERT INTO bronze.sesiones_raw (id, usuario_id, token, canal, iniciada_en, ultima_actividad, expira_en, cerrada)
SELECT id, usuario_id, token, canal, iniciada_en, ultima_actividad, expira_en, cerrada
FROM public.sesiones;
INSERT INTO bronze.perfiles_voz_raw (id, usuario_id, palabra_magica, vector_voz, muestras_tomadas, estado, creado_en)
SELECT id, usuario_id, palabra_magica, vector_voz, muestras_tomadas, estado, creado_en
FROM public.perfiles_voz;
-- Verificar cargas
SELECT 'bronze.usuarios_raw' AS tabla, COUNT(*) FROM bronze.usuarios_raw
UNION ALL
SELECT 'bronze.mensajes_raw', COUNT(*) FROM bronze.mensajes_raw
UNION ALL
SELECT 'bronze.logs_ejecucion_raw', COUNT(*) FROM bronze.logs_ejecucion_raw;
1.4 Función para Carga Incremental a Bronze
-- Función que carga nuevos registros a bronze (incremental)
CREATE OR REPLACE FUNCTION bronze.cargar_incremental_usuarios()
RETURNS INTEGER
LANGUAGE plpgsql
AS $$
DECLARE
v_nuevos INTEGER;
BEGIN
-- Insertar usuarios nuevos (que no existen en bronze)
INSERT INTO bronze.usuarios_raw (id, username, email, creado_en, pais, ciudad, activo)
SELECT u.id, u.username, u.email, u.creado_en, u.pais, u.ciudad, u.activo
FROM public.usuarios u
LEFT JOIN bronze.usuarios_raw b ON u.id = b.id
WHERE b.id IS NULL;
GET DIAGNOSTICS v_nuevos = ROW_COUNT;
RETURN v_nuevos;
END;
$$;
-- Probar función
SELECT bronze.cargar_incremental_usuarios();
🧹 Paso 2: Capa Silver (Cleansed) - Datos Limpiados
2.1 Crear Tablas Silver
-- Tabla Silver: usuarios_cleansed (datos limpios y normalizados)
CREATE TABLE silver.usuarios_cleansed (
id INTEGER PRIMARY KEY,
username TEXT NOT NULL,
email TEXT NOT NULL,
pais TEXT,
ciudad TEXT,
activo BOOLEAN DEFAULT TRUE,
email_valido BOOLEAN,
dominio_email TEXT,
fecha_registro DATE,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Silver: mensajes_cleansed
CREATE TABLE silver.mensajes_cleansed (
id INTEGER PRIMARY KEY,
sesion_id INTEGER,
remitente_id INTEGER,
contenido_limpio TEXT,
modalidad TEXT,
intencion TEXT,
sentimiento TEXT,
longitud_contenido INTEGER,
fecha_envio DATE,
hora_envio TIME,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Silver: logs_cleansed
CREATE TABLE silver.logs_cleansed (
id INTEGER PRIMARY KEY,
mensaje_id INTEGER,
modulo TEXT,
tiempo_ejecucion_ms INTEGER,
tiempo_segundos NUMERIC,
exito BOOLEAN,
fecha_ejecucion DATE,
hora_ejecucion TIME,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Silver: sesiones_cleansed
CREATE TABLE silver.sesiones_cleansed (
id INTEGER PRIMARY KEY,
usuario_id INTEGER,
canal TEXT,
fecha_inicio DATE,
hora_inicio TIME,
duracion_minutos NUMERIC,
activa BOOLEAN,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Silver: perfiles_voz_cleansed
CREATE TABLE silver.perfiles_voz_cleansed (
id INTEGER PRIMARY KEY,
usuario_id INTEGER,
estado TEXT,
muestras_tomadas INTEGER,
perfil_completo BOOLEAN,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
2.2 Procedimiento ETL para Silver (Limpieza y Transformación)
-- Procedimiento que transforma datos de Bronze a Silver
CREATE OR REPLACE PROCEDURE silver.transformar_usuarios()
LANGUAGE plpgsql
AS $$
BEGIN
-- Limpiar y transformar datos de usuarios
INSERT INTO silver.usuarios_cleansed (
id, username, email, pais, ciudad, activo,
email_valido, dominio_email, fecha_registro
)
SELECT
u.id,
TRIM(u.username) AS username,
LOWER(TRIM(u.email)) AS email,
INITCAP(TRIM(u.pais)) AS pais,
INITCAP(TRIM(u.ciudad)) AS ciudad,
u.activo,
u.email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' AS email_valido,
SPLIT_PART(u.email, '@', 2) AS dominio_email,
DATE(u.creado_en) AS fecha_registro
FROM bronze.usuarios_raw u
ON CONFLICT (id) DO UPDATE SET
username = EXCLUDED.username,
email = EXCLUDED.email,
pais = EXCLUDED.pais,
ciudad = EXCLUDED.ciudad,
activo = EXCLUDED.activo,
email_valido = EXCLUDED.email_valido,
dominio_email = EXCLUDED.dominio_email,
fecha_registro = EXCLUDED.fecha_registro,
fecha_carga = NOW();
RAISE NOTICE 'Usuarios transformados a Silver';
END;
$$;
-- Procedimiento para transformar mensajes
CREATE OR REPLACE PROCEDURE silver.transformar_mensajes()
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO silver.mensajes_cleansed (
id, sesion_id, remitente_id, contenido_limpio, modalidad,
intencion, sentimiento, longitud_contenido, fecha_envio, hora_envio
)
SELECT
m.id,
m.sesion_id,
m.remitente_id,
TRIM(REGEXP_REPLACE(m.contenido, '\s+', ' ', 'g')) AS contenido_limpio,
m.modalidad,
m.metadata_ia->>'intencion' AS intencion,
m.metadata_ia->>'sentimiento' AS sentimiento,
LENGTH(COALESCE(m.contenido, '')) AS longitud_contenido,
DATE(m.enviado_en) AS fecha_envio,
m.enviado_en::TIME AS hora_envio
FROM bronze.mensajes_raw m
ON CONFLICT (id) DO UPDATE SET
contenido_limpio = EXCLUDED.contenido_limpio,
intencion = EXCLUDED.intencion,
sentimiento = EXCLUDED.sentimiento,
longitud_contenido = EXCLUDED.longitud_contenido,
fecha_carga = NOW();
RAISE NOTICE 'Mensajes transformados a Silver';
END;
$$;
-- Procedimiento para transformar logs
CREATE OR REPLACE PROCEDURE silver.transformar_logs()
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO silver.logs_cleansed (
id, mensaje_id, modulo, tiempo_ejecucion_ms, tiempo_segundos, exito,
fecha_ejecucion, hora_ejecucion
)
SELECT
l.id,
l.mensaje_id,
l.modulo,
l.tiempo_ejecucion_ms,
ROUND(l.tiempo_ejecucion_ms / 1000.0, 3) AS tiempo_segundos,
l.exito,
DATE(l.ejecutado_en) AS fecha_ejecucion,
l.ejecutado_en::TIME AS hora_ejecucion
FROM bronze.logs_ejecucion_raw l
ON CONFLICT (id) DO UPDATE SET
tiempo_ejecucion_ms = EXCLUDED.tiempo_ejecucion_ms,
tiempo_segundos = EXCLUDED.tiempo_segundos,
exito = EXCLUDED.exito,
fecha_carga = NOW();
RAISE NOTICE 'Logs transformados a Silver';
END;
$$;
-- Ejecutar transformaciones
CALL silver.transformar_usuarios();
CALL silver.transformar_mensajes();
CALL silver.transformar_logs();
-- Verificar datos en Silver
SELECT COUNT(*) FROM silver.usuarios_cleansed;
SELECT COUNT(*) FROM silver.mensajes_cleansed;
SELECT COUNT(*) FROM silver.logs_cleansed;
📊 Paso 3: Capa Gold (Aggregated) - Datos Analíticos
3.1 Crear Tablas Gold
-- Tabla Gold: hechos_mensajes (fact table)
CREATE TABLE gold.hechos_mensajes (
id SERIAL PRIMARY KEY,
fecha DATE NOT NULL,
hora TIME NOT NULL,
usuario_id INTEGER,
sesion_id INTEGER,
modalidad TEXT,
intencion TEXT,
sentimiento TEXT,
longitud_contenido INTEGER,
tiempo_procesamiento_ms INTEGER,
modulo_principal TEXT,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Gold: dim_usuarios (dimension table)
CREATE TABLE gold.dim_usuarios (
usuario_id INTEGER PRIMARY KEY,
username TEXT,
pais TEXT,
ciudad TEXT,
email_valido BOOLEAN,
dominio_email TEXT,
fecha_registro DATE,
nivel_actividad TEXT,
total_mensajes INTEGER,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Gold: dim_tiempo (dimension table)
CREATE TABLE gold.dim_tiempo (
fecha DATE PRIMARY KEY,
año INTEGER,
mes INTEGER,
dia INTEGER,
dia_semana INTEGER,
nombre_dia TEXT,
trimestre INTEGER,
es_fin_semana BOOLEAN
);
-- Tabla Gold: resumen_diario (aggregated metrics)
CREATE TABLE gold.resumen_diario (
fecha DATE PRIMARY KEY,
total_usuarios_activos INTEGER,
total_sesiones INTEGER,
total_mensajes INTEGER,
mensajes_por_modalidad JSONB,
intenciones_top JSONB,
tiempo_promedio_respuesta NUMERIC,
tasa_exito_ia NUMERIC,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Tabla Gold: resumen_modulos (performance metrics)
CREATE TABLE gold.resumen_modulos (
modulo TEXT PRIMARY KEY,
total_ejecuciones INTEGER,
tasa_exito NUMERIC,
tiempo_promedio_ms NUMERIC,
tiempo_minimo_ms NUMERIC,
tiempo_maximo_ms NUMERIC,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
3.2 Procedimiento para Poblar Gold
-- Eliminar si existe
DROP PROCEDURE IF EXISTS gold.actualizar_capa_gold();
-- Procedimiento principal para poblar capa Gold
CREATE OR REPLACE PROCEDURE gold.actualizar_capa_gold()
LANGUAGE plpgsql
AS $$
DECLARE
v_fecha_inicio DATE;
v_fecha_fin DATE;
BEGIN
-- Obtener última fecha procesada
SELECT COALESCE(MAX(fecha), '2024-01-01') INTO v_fecha_inicio
FROM gold.resumen_diario;
v_fecha_fin := CURRENT_DATE - 1;
RAISE NOTICE 'Procesando fechas desde % hasta %', v_fecha_inicio, v_fecha_fin;
-- 1. Actualizar dimensiones - Dim Usuarios
INSERT INTO gold.dim_usuarios (
usuario_id, username, pais, ciudad, email_valido, dominio_email,
fecha_registro, nivel_actividad, total_mensajes
)
SELECT
u.id,
u.username,
u.pais,
u.ciudad,
u.email_valido,
u.dominio_email,
u.fecha_registro,
CASE
WHEN COALESCE(COUNT(m.id), 0) > 10 THEN 'Alto'
WHEN COALESCE(COUNT(m.id), 0) > 5 THEN 'Medio'
ELSE 'Bajo'
END AS nivel_actividad,
COUNT(m.id) AS total_mensajes
FROM silver.usuarios_cleansed u
LEFT JOIN silver.mensajes_cleansed m ON u.id = m.remitente_id
GROUP BY u.id, u.username, u.pais, u.ciudad, u.email_valido, u.dominio_email, u.fecha_registro
ON CONFLICT (usuario_id) DO UPDATE SET
nivel_actividad = EXCLUDED.nivel_actividad,
total_mensajes = EXCLUDED.total_mensajes,
fecha_carga = NOW();
RAISE NOTICE '✅ Dim Usuarios actualizada';
-- 2. Poblar tabla de hechos (solo nuevos registros)
INSERT INTO gold.hechos_mensajes (
fecha, hora, usuario_id, sesion_id, modalidad, intencion,
sentimiento, longitud_contenido, tiempo_procesamiento_ms, modulo_principal
)
SELECT
m.fecha_envio,
m.hora_envio,
m.remitente_id,
m.sesion_id,
m.modalidad,
m.intencion,
m.sentimiento,
m.longitud_contenido,
l.tiempo_ejecucion_ms,
l.modulo
FROM silver.mensajes_cleansed m
LEFT JOIN silver.logs_cleansed l ON m.id = l.mensaje_id
WHERE m.fecha_envio >= v_fecha_inicio;
RAISE NOTICE '✅ Hechos de mensajes actualizados';
-- 3. Actualizar resumen diario
INSERT INTO gold.resumen_diario (
fecha, total_usuarios_activos, total_sesiones, total_mensajes,
mensajes_por_modalidad, intenciones_top, tiempo_promedio_respuesta, tasa_exito_ia
)
WITH
-- CTE 1: Estadísticas diarias base
daily_base AS (
SELECT
m.fecha_envio AS fecha,
COUNT(DISTINCT m.remitente_id) AS usuarios_activos,
COUNT(DISTINCT m.sesion_id) AS sesiones,
COUNT(*) AS mensajes,
AVG(l.tiempo_ejecucion_ms) AS tiempo_promedio,
AVG(CASE WHEN l.exito THEN 100 ELSE 0 END) AS tasa_exito
FROM silver.mensajes_cleansed m
LEFT JOIN silver.logs_cleansed l ON m.id = l.mensaje_id
WHERE m.fecha_envio >= v_fecha_inicio
GROUP BY m.fecha_envio
),
-- CTE 2: Modalidades por día
modalidades_dia AS (
SELECT
fecha_envio,
jsonb_object_agg(modalidad, cantidad) AS mensajes_por_modalidad
FROM (
SELECT
fecha_envio,
modalidad,
COUNT(*) AS cantidad
FROM silver.mensajes_cleansed
WHERE fecha_envio >= v_fecha_inicio
GROUP BY fecha_envio, modalidad
) sub
GROUP BY fecha_envio
),
-- CTE 3: Intenciones top por día
intenciones_top_dia AS (
SELECT
fecha_envio,
jsonb_agg(intencion ORDER BY cantidad DESC) AS intenciones_top
FROM (
SELECT
fecha_envio,
intencion,
COUNT(*) AS cantidad,
ROW_NUMBER() OVER (PARTITION BY fecha_envio ORDER BY COUNT(*) DESC) AS rn
FROM silver.mensajes_cleansed
WHERE fecha_envio >= v_fecha_inicio
AND intencion IS NOT NULL
GROUP BY fecha_envio, intencion
) sub
WHERE rn <= 3
GROUP BY fecha_envio
)
SELECT
db.fecha,
db.usuarios_activos,
db.sesiones,
db.mensajes,
COALESCE(md.mensajes_por_modalidad, '{}'::JSONB) AS mensajes_por_modalidad,
COALESCE(itd.intenciones_top, '[]'::JSONB) AS intenciones_top,
ROUND(COALESCE(db.tiempo_promedio, 0), 2) AS tiempo_promedio_respuesta,
ROUND(COALESCE(db.tasa_exito, 0), 2) AS tasa_exito_ia
FROM daily_base db
LEFT JOIN modalidades_dia md ON db.fecha = md.fecha_envio
LEFT JOIN intenciones_top_dia itd ON db.fecha = itd.fecha_envio
ON CONFLICT (fecha) DO UPDATE SET
total_usuarios_activos = EXCLUDED.total_usuarios_activos,
total_sesiones = EXCLUDED.total_sesiones,
total_mensajes = EXCLUDED.total_mensajes,
mensajes_por_modalidad = EXCLUDED.mensajes_por_modalidad,
intenciones_top = EXCLUDED.intenciones_top,
tiempo_promedio_respuesta = EXCLUDED.tiempo_promedio_respuesta,
tasa_exito_ia = EXCLUDED.tasa_exito_ia,
fecha_carga = NOW();
RAISE NOTICE '✅ Resumen diario actualizado';
-- 4. Actualizar resumen de módulos
INSERT INTO gold.resumen_modulos (
modulo, total_ejecuciones, tasa_exito, tiempo_promedio_ms,
tiempo_minimo_ms, tiempo_maximo_ms
)
SELECT
modulo,
COUNT(*) AS total,
ROUND(100.0 * SUM(CASE WHEN exito THEN 1 ELSE 0 END) / COUNT(*), 2) AS tasa_exito,
ROUND(AVG(tiempo_ejecucion_ms), 2) AS tiempo_promedio,
MIN(tiempo_ejecucion_ms) AS tiempo_minimo,
MAX(tiempo_ejecucion_ms) AS tiempo_maximo
FROM silver.logs_cleansed
WHERE fecha_ejecucion >= v_fecha_inicio
GROUP BY modulo
ON CONFLICT (modulo) DO UPDATE SET
total_ejecuciones = EXCLUDED.total_ejecuciones,
tasa_exito = EXCLUDED.tasa_exito,
tiempo_promedio_ms = EXCLUDED.tiempo_promedio_ms,
tiempo_minimo_ms = EXCLUDED.tiempo_minimo_ms,
tiempo_maximo_ms = EXCLUDED.tiempo_maximo_ms,
fecha_carga = NOW();
RAISE NOTICE '✅ Resumen de módulos actualizado';
RAISE NOTICE '🎯 Capa Gold actualizada exitosamente para fechas desde % hasta %', v_fecha_inicio, v_fecha_fin;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE '❌ Error en actualización de Gold: %', SQLERRM;
RAISE;
END;
$$;
⏰ Paso 4: Carga Incremental y CDC
4.1 Marca de Agua (Watermark) para Incremental
-- Crear tabla de control de cargas
CREATE TABLE IF NOT EXISTS control_cargas (
id SERIAL PRIMARY KEY,
proceso TEXT NOT NULL,
ultima_ejecucion TIMESTAMPTZ,
ultimo_id_procesado INTEGER,
registros_procesados INTEGER,
estado TEXT,
fecha_carga TIMESTAMPTZ DEFAULT NOW()
);
-- Función para registrar ejecuciones
CREATE OR REPLACE FUNCTION registrar_ejecucion(
p_proceso TEXT,
p_ultimo_id INTEGER,
p_registros INTEGER,
p_estado TEXT
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO control_cargas (proceso, ultima_ejecucion, ultimo_id_procesado, registros_procesados, estado)
VALUES (p_proceso, NOW(), p_ultimo_id, p_registros, p_estado);
RAISE NOTICE 'Ejecución registrada: % - % registros', p_proceso, p_registros;
END;
$$;
4.2 Pipeline Completo con Carga Incremental
-- Pipeline completo: Bronze → Silver → Gold
CREATE OR REPLACE PROCEDURE pipeline_completo_aletza()
LANGUAGE plpgsql
AS $$
DECLARE
v_nuevos_usuarios INTEGER;
v_nuevos_mensajes INTEGER;
v_nuevos_logs INTEGER;
BEGIN
-- 1. Cargar datos nuevos a Bronze
RAISE NOTICE 'Iniciando carga a Bronze...';
v_nuevos_usuarios := bronze.cargar_incremental_usuarios();
-- Nota: Similar para otras tablas (implementar según necesidad)
-- 2. Transformar a Silver
RAISE NOTICE 'Transformando a Silver...';
CALL silver.transformar_usuarios();
CALL silver.transformar_mensajes();
CALL silver.transformar_logs();
-- 3. Actualizar Gold
RAISE NOTICE 'Actualizando Gold...';
CALL gold.actualizar_capa_gold();
-- 4. Registrar ejecución
PERFORM registrar_ejecucion('pipeline_completo', NULL,
v_nuevos_usuarios + v_nuevos_mensajes + v_nuevos_logs, 'EXITOSO');
RAISE NOTICE 'Pipeline completado exitosamente. Nuevos registros: %',
v_nuevos_usuarios + v_nuevos_mensajes + v_nuevos_logs;
END;
$$;
-- Ejecutar pipeline completo
CALL pipeline_completo_aletza();