Arrow y S3

big data
R
C++
S3
AWS
2023
Author

José Luis Cañadas Reche

Published

February 19, 2023

Intro

Apache Arrow está de moda, permite trabajar de forma muy rápida con ficheros parquet por ejemplo, está escrito en C++, aunque también hay implementación en Rust, de hecho la librería polars está escrita en Rust.

Luego hay APis que permiten usar Arrow desde Java Julia, Go, Python o incluso R. Vamos a ver un par de ejemplos de como usar arrow desde R. El primero de ellos es sólo copia de la excelente presentación de Danielle Navarro que os dejo aquí . Y el segundo ejemplo es como ver lo mismo pero con unos datos fake que he dejado en un bucket de S3 (del que no puedo poner la dirección)

Ejemplo 1

Una cosa maravillosa de Arrow es que puedes conectarte a un bucket remoto de S3 (o google cloud storage) y hacer consultas sobre varios millones de datos sin necesidad de que esos datos se traigan enteros a tu pc y sin necesidad de que te quepan en RAM. ¿Cómo lo hace? pues ni la más remota idea. Pero podéis leer las slides de Danielle para haceros una idea

Cargamos librerías, nos conectamos a un bucket de s3 y vemos que hay

Mostrar / ocultar código
library(arrow)
library(tidyverse)
library(tictoc) 
library(duckdb) # compraberomos más tarde si usar duckdb mejora

bucket <- s3_bucket("voltrondata-labs-datasets", anonymous = TRUE)
bucket$ls("nyc-taxi")
#>  [1] "nyc-taxi/year=2009" "nyc-taxi/year=2010" "nyc-taxi/year=2011"
#>  [4] "nyc-taxi/year=2012" "nyc-taxi/year=2013" "nyc-taxi/year=2014"
#>  [7] "nyc-taxi/year=2015" "nyc-taxi/year=2016" "nyc-taxi/year=2017"
#> [10] "nyc-taxi/year=2018" "nyc-taxi/year=2019" "nyc-taxi/year=2020"
#> [13] "nyc-taxi/year=2021" "nyc-taxi/year=2022"
Mostrar / ocultar código
tic()
bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi", anonymous = TRUE)
remote_taxi <- open_dataset(bucket) 
remote_taxi
#> FileSystemDataset with 158 Parquet files
#> vendor_name: string
#> pickup_datetime: timestamp[ms]
#> dropoff_datetime: timestamp[ms]
#> passenger_count: int64
#> trip_distance: double
#> pickup_longitude: double
#> pickup_latitude: double
#> rate_code: string
#> store_and_fwd: string
#> dropoff_longitude: double
#> dropoff_latitude: double
#> payment_type: string
#> fare_amount: double
#> extra: double
#> mta_tax: double
#> tip_amount: double
#> tolls_amount: double
#> total_amount: double
#> improvement_surcharge: double
#> congestion_surcharge: double
#> pickup_location_id: int64
#> dropoff_location_id: int64
#> year: int32
#> month: int32
toc()
#> 8.916 sec elapsed

Cuánto tardaría en hacer el cálculo de cuántos viajes ha habido en Enero de 2019 y ver el número de viajes en los que ha habido más de un pasajero. (Viendo el htop de mi linuxmint se ve que no hay casi uso de mis cpus)

Mostrar / ocultar código
tic()
result <- remote_taxi |> 
    filter(year == 2019, month == 1) |>
    summarize(
        all_trips = n(),
        shared_trips = sum(passenger_count > 1, na.rm = TRUE)
    ) |>
    mutate(pct_shared = shared_trips / all_trips * 100) |>
    collect()

result |> 
    print(n = 200)
#> # A tibble: 1 × 3
#>   all_trips shared_trips pct_shared
#>       <int>        <int>      <dbl>
#> 1   7667255      2094091       27.3
toc()
#> 12.982 sec elapsed

No está mal , ¿verdad?

Ejemplo 2

Mostrar / ocultar código
BUCKET   = Sys.getenv("BUCKET_COMUN")

ROLE_ARN = Sys.getenv("ROLE_ARN")
S3_FOLDER = Sys.getenv("S3_FOLDER")




bucket_comun <- s3_bucket(bucket = BUCKET, 
                             role_arn = ROLE_ARN)


ds <- open_dataset(bucket_comun$path(S3_FOLDER),
                   partitioning = c("year", "month", "day"))

Cuántos datos

Mostrar / ocultar código

tic()
res <- ds %>% 
    filter(year == 2021 & month == 3)  |> 
    select(year, month, day ) |> 
    group_by(day) |> 
    summarise(
        n_filas = n()
    ) |> 
    collect()

res |> 
    arrange(day) |> 
    print(n  = 10)
#> # A tibble: 30 × 2
#>      day n_filas
#>    <int>   <int>
#>  1     1 6260454
#>  2     2 6245312
#>  3     3 6243455
#>  4     4 6242304
#>  5     5 6241816
#>  6     6 6241089
#>  7     7 6241633
#>  8     8 6241651
#>  9    10 6240299
#> 10    11 6239817
#> # … with 20 more rows
toc()
#> 33.974 sec elapsed

Y usando duckdb como engine de consulta .

Mostrar / ocultar código

tic()
res <- ds %>% 
    filter(year == 2021 & month == 3)  |> 
    select(year, month, day ) |> 
    to_duckdb() %>%
    group_by(day) |> 
    summarise(
        n_filas = n()
    ) |> 
    collect()

res |> 
    arrange(day) |> 
    print(n  = 10)
#> # A tibble: 30 × 2
#>      day n_filas
#>    <int>   <dbl>
#>  1     1 6260454
#>  2     2 6245312
#>  3     3 6243455
#>  4     4 6242304
#>  5     5 6241816
#>  6     6 6241089
#>  7     7 6241633
#>  8     8 6241651
#>  9    10 6240299
#> 10    11 6239817
#> # … with 20 more rows
toc()
#> 38.924 sec elapsed

Pues a mi la verdad, arrow me parece impresionante. Poder hacer cosas como calcular medias, contar, filtrar etc, sobre conjuntos de datos que están en remoto sin tener una computadadora muy potente.

Nota

Para instalar la última versión de Arrow en R, recomiendo que os leáis esta documentación

Feliz domingo