-
Notifications
You must be signed in to change notification settings - Fork 4.4k
[BEAM-11408, BEAM-11772] Add explicit output typehints to ensure coder determinism for BQ with auto-sharding #14499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking βSign up for GitHubβ, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
β¦toBatches.WithShardedKey
R: @udim |
@@ -941,7 +943,8 @@ def _write_files_with_auto_sharding( | |||
destination_files_kv_pc = ( | |||
destination_data_kv_pc | |||
| 'ToHashableTableRef' >> beam.Map( | |||
lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1])) | |||
lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1])). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, but with the additional step of making this lambda a function (you can't add type annotations to lambdas).
@@ -1431,7 +1433,8 @@ def _restore_table_ref(sharded_table_ref_elems_kv): | |||
bigquery_tools.AppendDestinationsFn(self.table_reference), | |||
*self.table_side_inputs) | |||
| 'AddInsertIds' >> beam.ParDo(_StreamToBigQuery.InsertIdPrefixFn()) | |||
| 'ToHashableTableRef' >> beam.Map(_to_hashable_table_ref)) | |||
| 'ToHashableTableRef' >> beam.Map(_to_hashable_table_ref) | |||
).with_output_types(Tuple[str, Any]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The explicit Any
discards type information. Could you try annotating the types in _to_hashable_table_ref
instead?
V = TypeVar('V')
def _to_hashable_table_ref(table_ref_elem_kv: Tuple[Union[str, TABLE_REF_TYPE], V]) -> Tuple[str, V]:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Made the change accordingly.
@@ -1417,7 +1417,12 @@ def _add_random_shard(element): | |||
value = element[1] | |||
return ((key, random.randint(0, DEFAULT_SHARDS_PER_DESTINATION)), value) | |||
|
|||
def _to_hashable_table_ref(table_ref_elem_kv): | |||
V = TypeVar('V') | |||
TABLE_REF_TYPE = TypeVar('TABLE_REF_TYPE') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I was lazy when I wrote TABLE_REF_TYPE - I meant it should be replaced with bigquery.TableReference
. (the Union should contain the types accepted by bigquery_tools.get_hashable_destination)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha. That makes more sense! Corrected.
V = TypeVar('V') | ||
TABLE_REF_TYPE = TypeVar('TABLE_REF_TYPE') | ||
|
||
def _to_hashable_table_ref( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be put in bigquery_tools.py to avoid code repetition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I moved it to bigquery_tools.py
retest this please |
f9956e5
to
62768fd
Compare
Move to_hashable_table_ref to bigquery_tools
62768fd
to
38f8390
Compare
@@ -69,6 +71,11 @@ | |||
except ImportError: | |||
pass | |||
|
|||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to explicitly import this otherwise many tests failed with "apache_beam.io.gcp.internal.clients.bigquery has no attribute TableReference" - I guess bigquery lib was not available in some environments. The pylint check also failed possibly because of this but I haven't figured out what's wrong. Any insights?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense - you can add # pylint: disable=ungrouped-imports, wrong-import-order, wrong-import-position
before the block to get pylint to pass.
Also fix the string representation of
ShardedKeyCoder
to print out the key coder.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.