Implementación de ParDo y DoFn en Apache Beam en Detalles

Implementación de ParDo y DoFn en Apache Beam

Foto de ODISSEI en Unsplash

Explicación detallada del código para principiantes

Escribí un tutorial sobre algunas funciones de transformación comunes en Apache Beam en un tutorial anterior que cubría map, filter y combinePerKey(). Este tutorial será sobre la transformación ParDo, que no es más que otra forma de hacer Map. Pero la diferencia es que ParDo aplica la transformación en cada PCollection y devuelve cero o más elementos a la PCollection de salida. Por otro lado, la transformación Map genera exactamente un elemento para cada elemento de entrada. De esa manera, ParDo nos proporciona mucha flexibilidad para trabajar.

Otro aspecto importante de la transformación Pardo es que requiere el código del usuario en forma de DoFn. Veamos algunos ejemplos.

No dudes en descargar este conjunto de datos públicos y seguir:

Datos de muestra de ventas | Kaggle

Utilicé un cuaderno de Google Colab para trabajar con este código, por lo que es muy fácil de instalar. Aquí está el código para instalarlo:

!pip install --quiet apache_beam

Creé un directorio llamado ‘data’ para colocar el archivo CSV que usaremos y para colocar las salidas de nuestro ejercicio hoy.

mkdir -p data

Para comenzar, solo trabajaré en la cosa más simple del conjunto de datos. Leer el conjunto de datos y crear una lista con cada fila del conjunto de datos y enviarlas a un archivo de texto.

Leer un archivo de texto en un pipeline de beam es muy simple y directo. Tenemos un archivo CSV. Entonces, definiremos una clase CustomCoder() para esto, que codifica los objetos en una cadena de bytes primero, luego decodifica los bytes en sus objetos y, por último, especifica si el codificador está garantizado para codificar valores de manera determinista. Aquí está la documentación para el codificador.

from apache_beam.coders.coders import Coderclass CustomCoder(Coder):    """Un codificador personalizado utilizado para leer y escribir cadenas como UTF-8."""    def encode(self, value):        return value.encode("utf-8", "replace")    def decode(self, value):        return value.decode("utf-8", "ignore")    def is_deterministic(self):        return True

También hay una clase SplitRow() que simplemente utiliza la función .split() de Python.

class SplitRow(beam.DoFn):  def process(self, element)…

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Inteligencia Artificial

Dentro del acalorado centro del pesimismo de la IA

Anthropic, una start-up de inteligencia artificial centrada en la seguridad, está tratando de competir con ChatGPT mi...

Inteligencia Artificial

Hitos alcanzados en la nueva arquitectura de la computación cuántica

Los científicos han extendido el tiempo de coherencia para una clase única de bit cuántico (qubit) a 0.1 milisegundos...

Inteligencia Artificial

A pesar de los temores de trampas, las escuelas revocan las prohibiciones de ChatGPT

Algunos distritos que antes se apresuraban a bloquear los chatbots de inteligencia artificial ahora están tratando de...

Inteligencia Artificial

Los Gemelos Digitales Brindan un Camino más Verde para el Crecimiento del Hidrógeno

Un investigador cree que los gemelos digitales podrían ayudar a reducir los costos de producción de hidrógeno limpio ...