Risolvere i problemi relativi ai job di streaming lenti o bloccati

Questa pagina spiega come risolvere i problemi relativi alle cause comuni di job di streaming Dataflow lenti o bloccati.

Se noti i seguenti sintomi, il job di streaming Dataflow potrebbe essere in esecuzione lentamente o bloccato:

Utilizza le informazioni nelle sezioni seguenti per identificare e diagnosticare il problema.

Identificare la causa principale

  1. Controlla le metriche aggiornamento dei dati e byte backlog.

    • Se entrambe le metriche aumentano in modo monotono, significa che la pipeline è bloccata e non procede.
    • Se l'aggiornamento dei dati aumenta, ma i byte del backlog rimangono normali, significa che uno o più elementi di lavoro sono bloccati nella pipeline.

    Cerca le fasi in cui queste metriche aumentano per identificare eventuali fasi con problemi e le operazioni eseguite in quella fase.

  2. Controlla il grafico dell'elaborazione parallela per verificare se una fase è bloccata a causa di un parallelismo eccessivo o insufficiente. Consulta la sezione Risolvere i problemi di parallelismo.

  3. Controlla i log dei job per rilevare problemi come limiti di quota, problemi di esaurimento delle scorte o esaurimento degli indirizzi IP.

  4. Controlla i log dei worker per individuare avvisi ed errori.

    • Se i log del worker contengono errori, visualizza l'analisi dello stack. Verifica se l'errore è causato da un bug nel codice.
    • Cerca gli errori di Dataflow. Consulta Risolvere gli errori di Dataflow.
    • Cerca errori che indicano che il job ha superato un limite, ad esempio le dimensioni massime del messaggio Pub/Sub.
    • Cerca errori di esaurimento della memoria, che possono causare il blocco di una pipeline. Se visualizzi errori di memoria insufficiente, segui i passaggi descritti in Risolvere i problemi di memoria insufficiente di Dataflow.
    • Per identificare un passaggio lento o bloccato, controlla i log del worker per i messaggi Operation ongoing. Visualizza l'analisi dello stack per vedere dove il passaggio trascorre il tempo. Per saperne di più, consulta Elaborazione bloccata o operazione in corso.
  5. Se un elemento di lavoro è bloccato su un worker specifico, riavvia la VM worker.

  6. Se non utilizzi Streaming Engine, controlla i log di Shuffler per avvisi ed errori. Se visualizzi un errore di timeout RPC sulla porta 12345 o 12346, è possibile che nel job manchi una regola firewall. Consulta Regole firewall per Dataflow.

  7. Controlla la presenza di tasti di scelta rapida.

  8. Se Runner v2 è abilitato, controlla la presenza di errori nei log dell'imbracatura. Per ulteriori informazioni, vedi Risolvere i problemi di Runner v2.

Esamina gli errori ripetuti

In un job di streaming, alcuni errori vengono riprovati all'infinito. Questi tentativi impediscono l'avanzamento della pipeline. Per identificare errori ripetuti, controlla i log dei worker per le eccezioni.

Identificare i worker in stato non integro

Se i worker che elaborano il job di streaming non sono integri, il job potrebbe essere lento o bloccato. Per identificare i worker non integri:

Identificare gli elementi in ritardo

Un elemento in ritardo è un elemento di lavoro lento rispetto ad altri elementi di lavoro nella fase. Per informazioni su come identificare e correggere gli elementi in ritardo, vedi Risolvere i problemi relativi agli elementi in ritardo nei job di streaming.

Risolvere i problemi di parallelismo

Per scalabilità ed efficienza, Dataflow esegue le fasi della pipeline in parallelo su più worker. L'unità più piccola di elaborazione parallela in Dataflow è una chiave. I messaggi in entrata per ogni fase unita sono associati a una chiave. La chiave è definita in uno dei seguenti modi:

  • La chiave è definita implicitamente dalle proprietà dell'origine, ad esempio le partizioni Kafka.
  • La chiave è definita esplicitamente dalla logica di aggregazione nella pipeline, ad esempio GroupByKey.

In Dataflow, i thread worker sono responsabili della gestione dell'elaborazione dei bundle di lavoro (messaggi) per una chiave. Il numero di thread disponibili per elaborare le chiavi del job è pari a num_of_workers * threads_per_worker. Il numero di thread per worker è determinato in base all'SDK (Java, Python o Go) e al tipo di job (batch o streaming).

Se la pipeline non ha abbastanza chiavi per una determinata fase, limita l'elaborazione parallela. Questa fase potrebbe diventare un collo di bottiglia.

Se la pipeline utilizza un numero molto elevato di chiavi per una determinata fase, può limitare la velocità effettiva della fase e accumulare backlog nelle fasi precedenti, perché c'è un sovraccarico per chiave. Le spese generali potrebbero includere la comunicazione del backend con i worker, le RPC esterne a un sink come BigQuery e altre elaborazioni. Ad esempio, se l'elaborazione di una chiave con un messaggio richiede 100 ms, potrebbe essere necessario anche circa 100 ms per elaborare 1000 messaggi in quel bundle di chiavi.

Identificare le fasi con parallelismo basso

Per identificare se la lentezza della pipeline è causata da un basso parallelismo, visualizza le metriche di utilizzo della CPU. Se l'utilizzo della CPU è basso, ma distribuito uniformemente tra i worker, il job potrebbe avere un parallelismo insufficiente. Se il job utilizza Streaming Engine, per verificare se una fase ha un parallelismo basso, nella scheda Metriche job, visualizza le metriche di parallelismo. Per risolvere il problema:

Identificare le fasi con parallelismo elevato

Una combinazione di bassa latenza di sistema, crescente frequenza di aggiornamento dei dati e aumento del backlog e delle CPU dei worker sottoutilizzate suggerisce che la pipeline viene limitata a causa di un numero elevato di chiavi. Controlla il grafico Elaborazione parallela per identificare le fasi con un numero elevato di chiavi.

Trasformazioni come Reshuffle possono generare milioni di chiavi se non specifichi esplicitamente withNumBuckets. Un numero elevato di chiavi può portare alla creazione di numerosi bundle di lavoro più piccoli, ognuno dei quali richiede un thread worker dedicato per l'elaborazione. Poiché i thread worker disponibili sono limitati, può verificarsi un backlog significativo di chiavi di elaborazione, causando ritardi in quanto attendono le risorse. Di conseguenza, i thread di lavoro non vengono utilizzati in modo efficiente.

Ti consigliamo di limitare il numero di chiavi impostando l'opzione withNumBuckets nella trasformazione Reshuffle. Il valore non deve superare il numero totale di thread in tutti i worker. Le chiavi di (threads_per_worker * max_workers) targeting nella pipeline potrebbero non essere ottimali. A volte sono possibili meno chiavi e bundle più grandi, che vengono elaborati in modo più efficiente da Dataflow grazie all'utilizzo di un numero inferiore di worker. Un numero inferiore di chiavi crea bundle di lavoro più grandi, che utilizzano in modo efficiente i thread di worker e aumentano il throughput dello stage.

Se nella pipeline sono presenti più passaggi Reshuffle, dividi il numero totale di thread per il conteggio dei passaggi Reshuffle per calcolare withNumBuckets.

Controllare la presenza di tasti di scelta rapida

Se le attività sono distribuite in modo non uniforme tra i worker e l'utilizzo dei worker è molto irregolare, la pipeline potrebbe avere un tasto di scelta rapida. Una hot key è una chiave che ha molti più elementi da elaborare rispetto alle altre chiavi.

Controlla le scorciatoie da tastiera utilizzando il seguente filtro dei log:

  resource.type="dataflow_step"
  resource.labels.job_id=JOB_ID
  jsonPayload.line:"hot_key_logger"

Sostituisci JOB_ID con l'ID del tuo job.

Per risolvere il problema, esegui uno o più dei seguenti passaggi:

  • Riassegna le chiavi ai tuoi dati. Per generare nuove coppie chiave-valore, applica una trasformazione ParDo. Per saperne di più, consulta la pagina di trasformazione ParDo Java o la pagina di trasformazione ParDo Python nella documentazione di Apache Beam.
  • Utilizza .withFanout nelle trasformazioni combinate. Per saperne di più, consulta la classe Combine.PerKey nell'SDK Java o l'operazione with_hot_key_fanout nell'SDK Python.
  • Se hai una pipeline Java che elabora PCollections senza limiti di volume, ti consigliamo di procedere nel seguente modo:
    • Utilizza Combine.Globally.withFanout anziché Combine.Globally.
    • Utilizza Combine.PerKey.withHotKeyFanout anziché Count.PerKey.

Controllare se la quota è insufficiente

Assicurati di avere una quota sufficiente per l'origine e il sink. Ad esempio, se la pipeline legge l'input da Pub/Sub o BigQuery, il progetto potrebbe non disporre di una quota sufficiente. Google Cloud Per ulteriori informazioni sui limiti di quota per questi servizi, consulta Quota Pub/Sub o Quota BigQuery.

Se il tuo job genera un numero elevato di errori 429 (Rate Limit Exceeded), potrebbe avere una quota insufficiente. Per verificare la presenza di errori, prova i seguenti passaggi:

  1. Vai alla consoleGoogle Cloud .
  2. Nel riquadro di navigazione, fai clic su API e servizi.
  3. Nel menu, fai clic su Raccolta.
  4. Utilizza la casella di ricerca per cercare Pub/Sub.
  5. Fai clic su API Cloud Pub/Sub.
  6. Fai clic su Gestisci.
  7. Nel grafico Traffico per codice di risposta, cerca i codici di errore client (4xx).

Puoi anche utilizzare Metrics Explorer per controllare l'utilizzo della quota. Se la pipeline utilizza un'origine o un sink BigQuery, per risolvere i problemi relativi alla quota, utilizza le metriche dell'API BigQuery Storage. Ad esempio, per creare un grafico che mostri il conteggio delle connessioni simultanee BigQuery, segui questi passaggi:

  1. Nella console Google Cloud , seleziona Monitoring:

    Vai a Monitoring

  2. Nel riquadro di navigazione, seleziona Metrics Explorer.

  3. Nel riquadro Seleziona una metrica, per Metrica, filtra in base a Progetto BigQuery > Scrittura > Conteggio connessioni simultanee.

Per istruzioni sulla visualizzazione delle metriche Pub/Sub, vedi Monitorare l'utilizzo della quota in "Monitorare Pub/Sub in Cloud Monitoring". Per istruzioni sulla visualizzazione delle metriche BigQuery, consulta Visualizzare l'utilizzo e i limiti delle quote in "Creare dashboard, grafici e avvisi".

Strumenti per il debug

Quando una pipeline è lenta o bloccata, i seguenti strumenti possono aiutarti a diagnosticare il problema.

  • Per correlare gli incidenti e identificare i colli di bottiglia, utilizza Cloud Monitoring per Dataflow.
  • Per monitorare le prestazioni della pipeline, utilizza Cloud Profiler.
  • Alcune trasformazioni sono più adatte alle pipeline ad alto volume rispetto ad altre. I messaggi di log possono identificare una trasformazione dell'utente bloccata nelle pipeline batch o di streaming.
  • Per saperne di più su un job bloccato, utilizza le metriche dei job Dataflow. Il seguente elenco include metriche utili:
    • La metrica Byte backlog (backlog_bytes) misura la quantità di input non elaborato in byte per fase. Utilizza questa metrica per trovare un passaggio unito che non ha throughput. Allo stesso modo, la metrica Elementi nel backlog (backlog_elements) misura il numero di elementi di input non elaborati per una fase.
    • La metrica Chiavi di parallelismo dell'elaborazione (processing_parallelism_keys) misura il numero di chiavi di elaborazione parallela per una determinata fase della pipeline negli ultimi cinque minuti. Utilizza questa metrica per eseguire indagini nei seguenti modi:
      • Restringi il problema a fasi specifiche e conferma gli avvisi relativi ai tasti di scelta rapida, ad esempio A hot key ... was detected.
      • Trova i colli di bottiglia del throughput causati da un parallelismo insufficiente. Questi colli di bottiglia possono causare pipeline lente o bloccate.
    • La metrica Ritardo sistema (system_lag) e la metrica Ritardo sistema per fase (per_stage_system_lag) misurano il tempo massimo per cui un elemento di dati è stato elaborato o è in attesa di elaborazione. Utilizza queste metriche per identificare le fasi inefficienti e i colli di bottiglia delle origini dati.

Per ulteriori metriche non incluse nell'interfaccia web di monitoraggio di Dataflow, consulta l'elenco completo delle metriche di Dataflow in Google Cloud Metriche.