Validación de datos para aplicaciones de PySpark utilizando Pandera

Validación de datos en PySpark con Pandera

 

Si eres un profesional de datos, apreciarás que la validación de datos tiene una importancia fundamental para garantizar precisión y consistencia. Esto se vuelve especialmente crucial cuando se trata de conjuntos de datos grandes o datos que provienen de diversas fuentes. Sin embargo, la biblioteca de Python Pandera puede ayudar a agilizar y automatizar el proceso de validación de datos. Pandera es una biblioteca de código abierto meticulosamente diseñada para simplificar las tareas de validación de esquemas y datos. Se basa en la robustez y versatilidad de pandas e introduce una API intuitiva y expresiva diseñada específicamente para fines de validación de datos.

Este artículo presenta brevemente las características clave de Pandera, antes de pasar a explicar cómo se puede integrar la validación de datos de Pandera con flujos de trabajo de procesamiento de datos que utilizan PySpark SQL nativo desde la última versión (Pandera 0.16.0).

Pandera está diseñado para funcionar con otras bibliotecas populares de Python como pandas, pyspark.pandas, Dask, etc. Esto facilita la incorporación de la validación de datos en sus flujos de trabajo de procesamiento de datos existentes. Hasta hace poco, Pandera no contaba con soporte nativo para PySpark SQL, pero para cerrar esta brecha, un equipo de QuantumBlack, AI by McKinsey compuesto por Ismail Negm-PARI, Neeraj Malhotra, Jaskaran Singh Sidana, Kasper Janehag, Oleksandr Lazarchuk, junto con el fundador de Pandera, Niels Bantilan, desarrollaron el soporte nativo para PySpark SQL y lo contribuyeron a Pandera. El texto de este artículo también fue preparado por el equipo y está escrito en sus propias palabras a continuación.

 

Las características clave de Pandera

 

Si no estás familiarizado con el uso de Pandera para validar tus datos, te recomendamos revisar el artículo “Valida tu DataFrame de pandas con Pandera” de Khuyen Tran, que describe los conceptos básicos. En resumen, aquí explicamos brevemente las características clave y los beneficios de una API simple e intuitiva, funciones de validación incorporadas y personalización.

 

API simple e intuitiva

 

Una de las características destacadas de Pandera es su API simple e intuitiva. Puedes definir el esquema de tus datos utilizando una sintaxis declarativa que es fácil de leer y entender. Esto facilita la escritura de código de validación de datos que es eficiente y efectivo.

Aquí tienes un ejemplo de definición de esquema en Pandera:

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field()
   month: Series[int] = pa.Field()
   day: Series[int] = pa.Field()

 

Funciones de validación incorporadas

 

Pandera proporciona un conjunto de funciones incorporadas (más comúnmente llamadas comprobaciones) para realizar validaciones de datos. Cuando invocamos validate() en un esquema de Pandera, se realizarán tanto las validaciones de esquema como las de datos. Las validaciones de datos invocarán funciones de check en segundo plano.

Aquí tienes un ejemplo simple de cómo ejecutar una check de datos en un objeto dataframe utilizando Pandera.

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field(gt=2000, coerce=True)
   month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
   day: Series[int] = pa.Field(ge=0, le=365, coerce=True)

InputSchema.validate(df)

 

Como se puede ver arriba, para el campo year hemos definido una comprobación gt=2000 que impone que todos los valores en este campo deben ser mayores que 2000, de lo contrario Pandera generará un error de validación.

Aquí tienes una lista de todas las comprobaciones integradas disponibles en Pandera por defecto:

eq: comprueba si el valor es igual a un literal dado
ne: comprueba si el valor no es igual a un literal dado
gt: comprueba si el valor es mayor que un literal dado
ge: comprueba si el valor es mayor o igual a un literal dado
lt: comprueba si el valor es menor que un literal dado
le: comprueba si el valor es menor o igual a un literal dado
in_range: comprueba si el valor está en un rango dado
isin: comprueba si el valor está en una lista dada de literales
notin: comprueba si el valor no está en una lista dada de literales
str_contains: comprueba si el valor contiene un literal de cadena
str_endswith: comprueba si el valor termina con un literal de cadena
str_length: comprueba si la longitud del valor coincide
str_matches: comprueba si el valor coincide con un literal de cadena
str_startswith: comprueba si el valor comienza con un literal de cadena

 

Funciones de Validación Personalizadas

 

Además de las comprobaciones de validación incorporadas, Pandera te permite definir tus propias funciones de validación personalizadas. Esto te brinda la flexibilidad de definir tus propias reglas de validación basadas en el caso de uso.

Por ejemplo, puedes definir una función lambda para la validación de datos como se muestra aquí:

schema = pa.DataFrameSchema({
   "column2": pa.Column(str, [
       pa.Check(lambda s: s.str.startswith("valor")),
       pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
   ]),
})

 

Añadiendo Soporte para DataFrames de PySpark SQL a Pandera

 

Durante el proceso de añadir soporte a PySpark SQL, nos adherimos a dos principios fundamentales: 

  • consistencia de interfaz y experiencia de usuario
  • optimización de rendimiento para PySpark. 

Primero, profundicemos en el tema de la consistencia, porque es importante que, desde la perspectiva del usuario, tengan un conjunto consistente de APIs y una interfaz independientemente del framework elegido. Como Pandera proporciona múltiples frameworks para elegir, era aún más crítico tener una experiencia de usuario consistente en las APIs de PySpark SQL.

Con esto en mente, podemos definir el esquema de Pandera utilizando PySpark SQL de la siguiente manera:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa

spark = SparkSession.builder.getOrCreate()


class PanderaSchema(DataFrameModel):
       """Esquema de prueba"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()


data_fail = [
       (5, "Pan", 44.4, ["descripción del producto"], {"product_category": "lácteos"}),
       (15, "Mantequilla", 99.0, ["más detalles aquí"], {"product_category": "panadería"}),
   ]

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )
df_fail = spark_df(spark, data_fail, spark_schema)

 

En el código anterior, PanderaSchema define el esquema para el dataframe de PySpark entrante. Tiene 5 campos con diferentes dtypes y aplicación de comprobaciones de datos en los campos id y product_name.

class PanderaSchema(DataFrameModel):
       """Esquema de prueba"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()

 

A continuación, creamos unos datos ficticios y aplicamos el esquema de PySpark SQL nativo definido en spark_schema.

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )

df_fail = spark_df(spark, data_fail, spark_schema)

 

Esto se hace para simular fallos de validación de esquema y datos.

Aquí está el contenido del dataframe df_fail:

df_fail.show()

   +---+-------+--------+--------------------+--------------------+
   | id|product|   price|         description|                meta|
   +---+-------+--------+--------------------+--------------------+
   |  5|  Pan  |44.40000|[descripción del p...|{product_category...|
   | 15| Mantequilla|99.00000| [más detalles aquí]|{product_category...|
   +---+-------+--------+--------------------+--------------------+

 

A continuación, podemos invocar la función de validación de Pandera para realizar validaciones a nivel de esquema y datos de la siguiente manera:

df_out = PanderaSchema.validate(check_obj=df)

 

Exploraremos el contenido de df_out en breve.

 

Optimización de rendimiento para PySpark

 

Nuestra contribución fue diseñada específicamente para lograr un rendimiento óptimo al trabajar con dataframes de PySpark, lo cual es crucial cuando se trabaja con conjuntos de datos grandes para manejar los desafíos únicos del entorno de computación distribuida de PySpark.

Pandera utiliza la arquitectura de computación distribuida de PySpark para procesar eficientemente conjuntos de datos grandes manteniendo la consistencia y precisión de los datos. Reescribimos las funciones de validación personalizadas de Pandera para el rendimiento de PySpark para permitir una validación más rápida y eficiente de conjuntos de datos grandes, al tiempo que se reduce el riesgo de errores e inconsistencias de datos en volumen.

 

Informes de error completos

 

Agregamos otra función a Pandera para generar informes de error detallados en forma de un objeto diccionario de Python. Estos informes son accesibles a través del dataframe devuelto por la función de validación. Proporcionan un resumen completo de todas las validaciones a nivel de esquema y datos, según las configuraciones del usuario.

Esta función resulta valiosa para que los desarrolladores identifiquen y aborden rápidamente cualquier problema relacionado con los datos. Al utilizar el informe de error generado, los equipos pueden compilar una lista completa de problemas de esquema y datos dentro de su aplicación. Esto les permite priorizar y resolver problemas con eficiencia y precisión.

Es importante tener en cuenta que esta función está actualmente disponible exclusivamente para PySpark SQL, lo que ofrece a los usuarios una experiencia mejorada al trabajar con informes de errores en Pandera.

En el ejemplo de código anterior, recuerde que habíamos invocado validate() en el dataframe de spark:

df_out = PanderaSchema.validate(check_obj=df)

 

Devuelve un objeto dataframe. Usando los accesores podemos extraer el informe de error de la siguiente manera:

print(df_out.pandera.errors)

 

{
  "SCHEMA":{
     "COLUMN_NOT_IN_DATAFRAME":[
        {
           "schema":"PanderaSchema",
           "column":"PanderaSchema",
           "check":"column_in_dataframe",
           "error":"columna 'product_name' no está en el dataframe Row(id=5, product='Bread', price=None, description=['descripción del producto'], meta={'product_category': 'dairy'})"
        }
     ],
     "WRONG_DATATYPE":[
        {
           "schema":"PanderaSchema",
           "column":"description",
           "check":"dtype('ArrayType(StringType(), True)')",
           "error":"se esperaba que la columna 'description' tuviera el tipo ArrayType(StringType(), True), se obtuvo ArrayType(StringType(), False)"
        },
        {
           "schema":"PanderaSchema",
           "column":"meta",
           "check":"dtype('MapType(StringType(), StringType(), True)')",
           "error":"se esperaba que la columna 'meta' tuviera el tipo MapType(StringType(), StringType(), True), se obtuvo MapType(StringType(), StringType(), False)"
        }
     ]
  },
  "DATA":{
     "DATAFRAME_CHECK":[
        {
           "schema":"PanderaSchema",
           "column":"id",
           "check":"greater_than(5)",
           "error":"la columna 'id' con tipo IntegerType() no pasó la validación greater_than(5)"
        }
     ]
  }
}

 

Como se puede ver arriba, el informe de error se agrega en 2 niveles en un objeto diccionario de Python para ser fácilmente consumido por aplicaciones posteriores como la visualización de errores a lo largo del tiempo utilizando herramientas como Grafana:

  1. tipo de validación = SCHEMA o DATA
  2. categoría de errores = DATAFRAME_CHECK o WRONG_DATATYPE, etc.

Este nuevo formato para reestructurar la generación de informes de errores se introdujo en la versión 0.16.0 como parte de nuestra contribución.

Interruptor ON/OFF

Para aplicaciones que dependen de PySpark, tener un interruptor On/Off es una característica importante que puede marcar una gran diferencia en términos de flexibilidad y gestión de riesgos. Específicamente, el interruptor On/Off permite a los equipos desactivar las validaciones de datos en producción sin necesidad de realizar cambios en el código.

Esto es especialmente importante para las canalizaciones de big data donde el rendimiento es crítico. En muchos casos, la validación de datos puede llevar mucho tiempo de procesamiento, lo que puede afectar el rendimiento general de la canalización. Con el interruptor On/Off, los equipos pueden desactivar rápidamente y fácilmente la validación de datos si es necesario, sin tener que pasar por el proceso que consume mucho tiempo de modificar el código.

Nuestro equipo introdujo el interruptor On/Off en Pandera para que los usuarios puedan desactivar fácilmente la validación de datos en producción simplemente cambiando una configuración. Esto proporciona la flexibilidad necesaria para priorizar el rendimiento, cuando sea necesario, sin sacrificar la calidad o exactitud de los datos en desarrollo.

Para habilitar las validaciones, configure lo siguiente en sus variables de entorno:

export PANDERA_VALIDATION_ENABLED=False

Esto será detectado por Pandera para desactivar todas las validaciones en la aplicación. Por defecto, la validación está habilitada.

Actualmente, esta función solo está disponible para PySpark SQL a partir de la versión 0.16.0, ya que es un nuevo concepto introducido por nuestra contribución.

Control granular de la ejecución de Pandera

Además de la función del interruptor On/Off, también hemos introducido un control más granular sobre la ejecución del flujo de validación de Pandera. Esto se logra mediante la introducción de configuraciones personalizables que permiten a los usuarios controlar la ejecución en tres niveles diferentes:

  1. SCHEMA_ONLY: Esta configuración realiza solo validaciones de esquema. Verifica que los datos cumplan con la definición del esquema, pero no realiza ninguna validación adicional a nivel de datos.
  2. DATA_ONLY: Esta configuración realiza solo validaciones a nivel de datos. Verifica los datos según las restricciones y reglas definidas, pero no valida el esquema.
  3. SCHEMA_AND_DATA: Esta configuración realiza tanto validaciones de esquema como a nivel de datos. Verifica los datos tanto según la definición del esquema como según las restricciones y reglas definidas.

Al proporcionar este control granular, los usuarios pueden elegir el nivel de validación que mejor se adapte a su caso de uso específico. Por ejemplo, si la principal preocupación es asegurarse de que los datos cumplan con el esquema definido, se puede utilizar la configuración SCHEMA_ONLY para reducir el tiempo total de procesamiento. Alternativamente, si se sabe que los datos cumplen con el esquema y el enfoque está en garantizar la calidad de los datos, se puede utilizar la configuración DATA_ONLY para priorizar las validaciones a nivel de datos.

El control mejorado sobre la ejecución de Pandera permite a los usuarios alcanzar un equilibrio afinado entre precisión y eficiencia, lo que permite una experiencia de validación más enfocada y optimizada.

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

Por defecto, las validaciones están habilitadas y la profundidad está configurada en SCHEMA_AND_DATA, que se puede cambiar a SCHEMA_ONLY o DATA_ONLY según sea necesario para el caso de uso.

Actualmente, esta función solo está disponible para PySpark SQL a partir de la versión 0.16.0, ya que es un nuevo concepto introducido por nuestra contribución.

Metadatos a nivel de columna y dataframe

Nuestro equipo agregó una nueva función a Pandera que permite a los usuarios almacenar metadatos adicionales a nivel de campo y de esquema/modelo. Esta función está diseñada para permitir a los usuarios incrustar información contextual en sus definiciones de esquema que puede ser aprovechada por otras aplicaciones.

Por ejemplo, al almacenar detalles sobre una columna específica, como el tipo de datos, el formato o las unidades, los desarrolladores pueden asegurarse de que las aplicaciones posteriores puedan interpretar y utilizar los datos correctamente. De manera similar, al almacenar información sobre qué columnas de un esquema se necesitan para un caso de uso específico, los desarrolladores pueden optimizar las canalizaciones de procesamiento de datos, reducir los costos de almacenamiento y mejorar el rendimiento de las consultas.

A nivel de esquema, los usuarios pueden almacenar información para ayudar a categorizar diferentes esquemas en toda la aplicación. Estos metadatos pueden incluir detalles como el propósito del esquema, la fuente de los datos o el rango de fechas de los datos. Esto puede ser particularmente útil para administrar flujos de trabajo de procesamiento de datos complejos, donde se utilizan múltiples esquemas para diferentes propósitos y es necesario realizar un seguimiento y una gestión eficientes.

class PanderaSchema(DataFrameModel):
       """Clase de Esquema Pandera"""
       id: T.IntegerType() = Field(
           gt=5,
           metadata={"usecase": ["RetailPricing", "ConsumerBehavior"],
              "category": "product_pricing"},
       )
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()


       class Config:
           """Configuración de la clase pandera"""
           name = "product_info"
           strict = True
           coerce = True
           metadata = {"category": "product-details"}

 

En el ejemplo anterior, hemos introducido información adicional en el objeto de esquema en sí mismo. Esto está permitido en 2 niveles: campo y esquema.

Para extraer los metadatos a nivel de esquema (incluyendo todos los campos en él), proporcionamos funciones auxiliares como:

PanderaSchema.get_metadata()
El resultado será un objeto de diccionario de la siguiente manera:
{
       "product_info": {
           "columns": {
               "id": {"usecase": ["RetailPricing", "ConsumerBehavior"],
                      "category": "product_pricing"},
               "product_name": None,
               "price": None,
           },
           "dataframe": {"category": "product-details"},
       }
}

 

Actualmente, esta característica es un nuevo concepto en la versión 0.16.0 y se ha agregado para PySpark SQL y Pandas.

 

Resumen

 

Hemos introducido varias características y conceptos nuevos, incluido un interruptor de encendido/apagado que permite a los equipos deshabilitar las validaciones en producción sin cambios de código, un control granular sobre el flujo de validación de Pandera y la capacidad de almacenar metadatos adicionales en los niveles de columna y dataframe. Puede encontrar aún más detalles en la documentación actualizada de Pandera para la versión 0.16.0.

Como explicó Niels Bantilan, fundador de Pandera, en una publicación reciente del blog sobre el lanzamiento de Pandera 0.16.0:

 

Para demostrar la capacidad de extensión de Pandera con la nueva especificación de esquema y la API de backend, colaboramos con el equipo de QuantumBlack para implementar un esquema y backend para Pyspark SQL … ¡y completamos un MVP en cuestión de unos pocos meses!

 

Esta reciente contribución al código abierto de Pandera beneficiará a los equipos que trabajan con PySpark y otras tecnologías de big data.

Los siguientes miembros del equipo en QuantumBlack, AI by McKinsey son responsables de esta contribución reciente: Ismail Negm-PARI, Neeraj Malhotra, Jaskaran Singh Sidana, Kasper Janehag, Oleksandr Lazarchuk. Me gustaría agradecer especialmente a Neeraj por su ayuda en la preparación de este artículo para su publicación.     Jo Stitchbury es una escritora técnica con experiencia. Escribe sobre ciencia y análisis de datos, inteligencia artificial y la industria del software.  

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Inteligencia Artificial

EE.UU. busca malware chino que podría interrumpir las operaciones militares

Funcionarios de seguridad de Estados Unidos dicen que la Casa Blanca está buscando malware supuestamente oculto por C...

Inteligencia Artificial

Desbloqueando la transparencia de la IA Cómo el agrupamiento de características de Anthropic mejora la interpretabilidad de las redes neuronales

En un reciente artículo, “Hacia la monosemanticidad: descomposición de modelos de lenguaje con aprendizaje de d...

Inteligencia Artificial

Inteligencia Artificial (IA) y Web3 ¿Cómo están conectados?

¿Qué es la IA? En pocas palabras, la Inteligencia Artificial (IA) es la capacidad de las máquinas para realizar funci...

Inteligencia Artificial

La amenaza de la desinformación climática propagada por la tecnología de IA generativa

Explora cómo la IA generativa puede propagar información errónea sobre el clima y aprende estrategias efectivas para ...

Investigación

Usando Inteligencia Artificial, científicos encuentran un medicamento que podría combatir infecciones resistentes a los medicamentos.

El algoritmo de aprendizaje automático identificó un compuesto que mata Acinetobacter baumannii, una bacteria que ace...