Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
La funzionalitร di ribilanciamento dinamico del lavoro del servizio Dataflow consente al servizio di ripartire dinamicamente il lavoro in base alle condizioni di runtime. Queste condizioni possono includere quanto segue:
Squilibri nei compiti
I lavoratori impiegano piรน tempo del previsto per completare il lavoro
I lavoratori terminano il lavoro piรน velocemente del previsto
Il servizio Dataflow rileva automaticamente queste condizioni e puรฒ assegnare dinamicamente il lavoro a worker inutilizzati o sottoutilizzati per ridurre il tempo di elaborazione complessivo del job.
Limitazioni
Il ribilanciamento dinamico del lavoro si verifica solo quando il servizio Dataflow elabora alcuni dati di input in parallelo: quando legge i dati da un'origine di input esterna, quando lavora con un PCollection intermedio materializzato o quando lavora con il risultato di un'aggregazione come GroupByKey. Se un numero elevato di passaggi del job รจ fuso, il job ha meno PCollection intermedi e il riequilibrio dinamico del lavoro รจ limitato al numero di elementi nell'PCollection materializzato di origine. Se
vuoi assicurarti che il riequilibrio dinamico del lavoro possa essere applicato a un determinato
PCollection nella pipeline, puoi
impedire la fusione in diversi modi per garantire il parallelismo dinamico.
Il ribilanciamento dinamico del lavoro non puรฒ eseguire la riparallelizzazione dei dati con una granularitร inferiore a un singolo record.
Se i dati contengono singoli record che causano notevoli ritardi nel tempo di elaborazione, potrebbero comunque ritardare il job. Dataflow non puรฒ
suddividere e ridistribuire un singolo record "hot" a piรน worker.
Java
Se imposti un numero fisso di shard per l'output finale della pipeline (ad esempio scrivendo i dati utilizzando TextIO.Write.withNumShards), Dataflow limita la parallelizzazione in base al numero di shard che scegli.
Python
Se imposti un numero fisso di shard per l'output finale della pipeline (ad esempio scrivendo i dati utilizzando beam.io.WriteToText(..., num_shards=...)), Dataflow limita la parallelizzazione in base al numero di shard che scegli.
Vai
Se imposti un numero fisso di shard per l'output finale della pipeline, Dataflow limita la parallelizzazione in base al numero di shard che scegli.
Utilizzo di origini dati personalizzate
Java
Se la pipeline utilizza un'origine dati personalizzata che fornisci, devi implementare il metodo splitAtFraction per consentire all'origine di funzionare con la funzionalitร di riequilibrio dinamico del lavoro.
Se implementi splitAtFraction in modo errato, i record della tua origine potrebbero sembrare duplicati o eliminati. Consulta le informazioni di riferimento dell'API su RangeTracker per ricevere assistenza e suggerimenti sull'implementazione di splitAtFraction.
Python
Se la pipeline utilizza un'origine dati personalizzata da te fornita, il tuo RangeTracker deve implementare try_claim, try_split, position_at_fraction e fraction_consumed per consentire all'origine di funzionare con la funzionalitร di riequilibrio dinamico del lavoro.
Se la pipeline utilizza un'origine dati personalizzata che fornisci, devi implementare un RTracker valido per consentire all'origine di funzionare con la funzionalitร di riequilibrio dinamico del lavoro.
Il ribilanciamento dinamico del lavoro utilizza il valore restituito del metodo getProgress()
della tua origine personalizzata per l'attivazione. L'implementazione predefinita per getProgress() restituisce
null. Per assicurarti che la scalabilitร automatica venga attivata, assicurati che l'origine personalizzata sostituisca
getProgress() per restituire un valore appropriato.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema รจ stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-02 UTC."],[[["\u003cp\u003eThe Dataflow service's Dynamic Work Rebalancing feature automatically redistributes work among workers based on runtime conditions such as work imbalances or varying processing times.\u003c/p\u003e\n"],["\u003cp\u003eDynamic work rebalancing is limited to parallel data processing stages, like reading from external sources or working with materialized \u003ccode\u003ePCollection\u003c/code\u003es, and is restricted by the number of elements or shards in those stages.\u003c/p\u003e\n"],["\u003cp\u003eIf you have custom data sources, dynamic work rebalancing requires implementing specific methods in your data source, such as \u003ccode\u003esplitAtFraction\u003c/code\u003e in Java or \u003ccode\u003etry_split\u003c/code\u003e and \u003ccode\u003eposition_at_fraction\u003c/code\u003e in Python, in order to function correctly.\u003c/p\u003e\n"],["\u003cp\u003eDynamic work rebalancing cannot further divide and redistribute a single record that is processing slower than the rest, potentially causing delays.\u003c/p\u003e\n"],["\u003cp\u003eSetting a fixed number of shards for your pipeline's output limits the parallelization that Dataflow can perform, thereby impacting the effectiveness of dynamic work rebalancing.\u003c/p\u003e\n"]]],[],null,["# Dynamic work rebalancing\n\nThe Dynamic Work Rebalancing feature of the Dataflow service allows the\nservice to dynamically repartition work based on runtime conditions. These\nconditions might include the following:\n\n- Imbalances in work assignments\n- Workers taking longer than expected to finish\n- Workers finishing faster than expected\n\nThe Dataflow service automatically detects these conditions and\ncan dynamically assign work to unused or underused workers to decrease\nthe overall processing time of your job.\n\nLimitations\n-----------\n\nDynamic work rebalancing only happens when the Dataflow service is\nprocessing some input data in parallel: when reading data from an external input\nsource, when working with a materialized intermediate `PCollection`, or when\nworking with the result of an aggregation like `GroupByKey`. If a large number\nof steps in your job are\n[fused](/dataflow/docs/pipeline-lifecycle#fusion_optimization), your job has fewer\nintermediate `PCollection`s, and dynamic work rebalancing is\nlimited to the number of elements in the source materialized `PCollection`. If\nyou want to ensure that dynamic work rebalancing can be applied to a particular\n`PCollection` in your pipeline, you can\n[prevent fusion](/dataflow/docs/pipeline-lifecycle#preventing_fusion) in a few\ndifferent ways to ensure dynamic parallelism.\n\nDynamic work rebalancing cannot reparallelize data finer than a single record.\nIf your data contains individual records that cause large delays in processing\ntime, they might still delay your job. Dataflow can't\nsubdivide and redistribute an individual \"hot\" record to multiple workers. \n\n### Java\n\nIf you set a fixed number of shards for the final output of your pipeline (for\nexample, by writing data using `TextIO.Write.withNumShards`),\nDataflow limits parallelization based on the number of\nshards that you choose.\n\n### Python\n\nIf you set a fixed number of shards for the final output of your pipeline (for\nexample, by writing data using `beam.io.WriteToText(..., num_shards=...)`),\nDataflow limits parallelization based on the number of\nshards that you choose.\n\n### Go\n\nIf you set a fixed number of shards for the final output of your pipeline,\nDataflow limits parallelization based on the number of shards\nthat you choose.\n| **Note:** The fixed-shards limitation can be considered temporary, and might be subject to change in future releases of the Dataflow service.\n\nWorking with Custom Data Sources\n--------------------------------\n\n### Java\n\nIf your pipeline uses a custom data source that you provide, you must\nimplement the method `splitAtFraction` to allow your source to work with the\ndynamic work rebalancing feature.\n| **Caution:** Using dynamic work rebalancing with custom data sources is an advanced use case. If you choose to implement `splitAtFraction`, it's critical that you test your code extensively and with maximum code coverage.\n\nIf you implement `splitAtFraction` incorrectly, records from your source might\nappear to get duplicated or dropped. See the\n[API reference information on RangeTracker](https://beam.apache.org/documentation/sdks/javadoc/current/index.html?org/apache/beam/sdk/io/range/RangeTracker.html) for help and tips on\nimplementing `splitAtFraction`.\n\n### Python\n\nIf your pipeline uses a custom data source that you provide, your\n`RangeTracker` must implement `try_claim`, `try_split`,\n`position_at_fraction`, and `fraction_consumed` to allow your source to work\nwith the dynamic work rebalancing feature.\n\nSee the\n[API reference information on RangeTracker](https://beam.apache.org/documentation/sdks/pydoc/current/apache_beam.io.iobase.html#apache_beam.io.iobase.RangeTracker)\nfor more information.\n\n### Go\n\nIf your pipeline uses a custom data source that you provide, you must\nimplement a valid `RTracker` to allow your source to work with the dynamic\nwork rebalancing feature.\n\nFor more information, see the [RTracker API reference information](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf#RTracker).\n\nDynamic work rebalancing uses the return value of the `getProgress()`\nmethod of your custom source to activate. The default implementation for `getProgress()` returns\n`null`. To ensure autoscaling activates, make sure your custom source overrides\n`getProgress()` to return an appropriate value."]]