Skip to content

Commit 23f80f3

Browse files
authored
Move gcs & wasb task handlers to their respective provider packages (#9714)
1 parent 564192c commit 23f80f3

File tree

9 files changed

+425
-332
lines changed

9 files changed

+425
-332
lines changed

β€ŽUPDATING.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ More tips can be found in the guide:
6161
https://developers.google.com/style/inclusive-documentation
6262
6363
-->
64+
### GCSTaskHandler has been moved
65+
The `GCSTaskHandler` class from `airflow.utils.log.gcs_task_handler` has been moved to
66+
`airflow.providers.google.cloud.log.gcs_task_handler`. This is because it has items specific to `google cloud`.
67+
68+
### WasbTaskHandler has been moved
69+
The `WasbTaskHandler` class from `airflow.utils.log.wasb_task_handler` has been moved to
70+
`airflow.providers.microsoft.azure.log.wasb_task_handler`. This is because it has items specific to `azure`.
71+
6472
### StackdriverTaskHandler has been moved
6573
The `StackdriverTaskHandler` class from `airflow.utils.log.stackdriver_task_handler` has been moved to
6674
`airflow.providers.google.cloud.log.stackdriver_task_handler`. This is because it has items specific to `google cloud`.
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
import os
19+
from urllib.parse import urlparse
20+
21+
from cached_property import cached_property
22+
23+
from airflow.configuration import conf
24+
from airflow.exceptions import AirflowException
25+
from airflow.utils.log.file_task_handler import FileTaskHandler
26+
from airflow.utils.log.logging_mixin import LoggingMixin
27+
28+
29+
class GCSTaskHandler(FileTaskHandler, LoggingMixin):
30+
"""
31+
GCSTaskHandler is a python log handler that handles and reads
32+
task instance logs. It extends airflow FileTaskHandler and
33+
uploads to and reads from GCS remote storage. Upon log reading
34+
failure, it reads from host machine's local disk.
35+
"""
36+
def __init__(self, base_log_folder, gcs_log_folder, filename_template):
37+
super().__init__(base_log_folder, filename_template)
38+
self.remote_base = gcs_log_folder
39+
self.log_relative_path = ''
40+
self._hook = None
41+
self.closed = False
42+
self.upload_on_close = True
43+
44+
@cached_property
45+
def hook(self):
46+
"""
47+
Returns GCS hook.
48+
"""
49+
remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
50+
try:
51+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
52+
return GCSHook(
53+
google_cloud_storage_conn_id=remote_conn_id
54+
)
55+
except Exception as e: # pylint: disable=broad-except
56+
self.log.error(
57+
'Could not create a GoogleCloudStorageHook with connection id '
58+
'"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
59+
'and the GCS connection exists.', remote_conn_id, str(e)
60+
)
61+
62+
def set_context(self, ti):
63+
super().set_context(ti)
64+
# Log relative path is used to construct local and remote
65+
# log path to upload log files into GCS and read from the
66+
# remote location.
67+
self.log_relative_path = self._render_filename(ti, ti.try_number)
68+
self.upload_on_close = not ti.raw
69+
70+
def close(self):
71+
"""
72+
Close and upload local log file to remote storage GCS.
73+
"""
74+
# When application exit, system shuts down all handlers by
75+
# calling close method. Here we check if logger is already
76+
# closed to prevent uploading the log to remote storage multiple
77+
# times when `logging.shutdown` is called.
78+
if self.closed:
79+
return
80+
81+
super().close()
82+
83+
if not self.upload_on_close:
84+
return
85+
86+
local_loc = os.path.join(self.local_base, self.log_relative_path)
87+
remote_loc = os.path.join(self.remote_base, self.log_relative_path)
88+
if os.path.exists(local_loc):
89+
# read log and remove old logs to get just the latest additions
90+
with open(local_loc, 'r') as logfile:
91+
log = logfile.read()
92+
self.gcs_write(log, remote_loc)
93+
94+
# Mark closed so we don't double write if close is called twice
95+
self.closed = True
96+
97+
def _read(self, ti, try_number, metadata=None):
98+
"""
99+
Read logs of given task instance and try_number from GCS.
100+
If failed, read the log from task instance host machine.
101+
102+
:param ti: task instance object
103+
:param try_number: task instance try_number to read logs from
104+
:param metadata: log metadata,
105+
can be used for steaming log reading and auto-tailing.
106+
"""
107+
# Explicitly getting log relative path is necessary as the given
108+
# task instance might be different than task instance passed in
109+
# in set_context method.
110+
log_relative_path = self._render_filename(ti, try_number)
111+
remote_loc = os.path.join(self.remote_base, log_relative_path)
112+
113+
try:
114+
remote_log = self.gcs_read(remote_loc)
115+
log = '*** Reading remote log from {}.\n{}\n'.format(
116+
remote_loc, remote_log)
117+
return log, {'end_of_log': True}
118+
except Exception as e: # pylint: disable=broad-except
119+
log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
120+
remote_loc, str(e))
121+
self.log.error(log)
122+
local_log, metadata = super()._read(ti, try_number)
123+
log += local_log
124+
return log, metadata
125+
126+
def gcs_read(self, remote_log_location):
127+
"""
128+
Returns the log found at the remote_log_location.
129+
130+
:param remote_log_location: the log's location in remote storage
131+
:type remote_log_location: str (path)
132+
"""
133+
bkt, blob = self.parse_gcs_url(remote_log_location)
134+
return self.hook.download(bkt, blob).decode('utf-8')
135+
136+
def gcs_write(self, log, remote_log_location, append=True):
137+
"""
138+
Writes the log to the remote_log_location. Fails silently if no hook
139+
was created.
140+
141+
:param log: the log to write to the remote_log_location
142+
:type log: str
143+
:param remote_log_location: the log's location in remote storage
144+
:type remote_log_location: str (path)
145+
:param append: if False, any existing log file is overwritten. If True,
146+
the new log is appended to any existing logs.
147+
:type append: bool
148+
"""
149+
if append:
150+
try:
151+
old_log = self.gcs_read(remote_log_location)
152+
log = '\n'.join([old_log, log]) if old_log else log
153+
except Exception as e: # pylint: disable=broad-except
154+
if not hasattr(e, 'resp') or e.resp.get('status') != '404': # pylint: disable=no-member
155+
log = '*** Previous log discarded: {}\n\n'.format(str(e)) + log
156+
157+
try:
158+
bkt, blob = self.parse_gcs_url(remote_log_location)
159+
from tempfile import NamedTemporaryFile
160+
with NamedTemporaryFile(mode='w+') as tmpfile:
161+
tmpfile.write(log)
162+
# Force the file to be flushed, since we're doing the
163+
# upload from within the file context (it hasn't been
164+
# closed).
165+
tmpfile.flush()
166+
self.hook.upload(bkt, blob, tmpfile.name)
167+
except Exception as e: # pylint: disable=broad-except
168+
self.log.error('Could not write logs to %s: %s', remote_log_location, e)
169+
170+
@staticmethod
171+
def parse_gcs_url(gsurl):
172+
"""
173+
Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
174+
tuple containing the corresponding bucket and blob.
175+
"""
176+
parsed_url = urlparse(gsurl)
177+
if not parsed_url.netloc:
178+
raise AirflowException('Please provide a bucket name')
179+
else:
180+
bucket = parsed_url.netloc
181+
blob = parsed_url.path.strip('/')
182+
return bucket, blob
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
import os
19+
import shutil
20+
21+
from azure.common import AzureHttpError
22+
from cached_property import cached_property
23+
24+
from airflow.configuration import conf
25+
from airflow.utils.log.file_task_handler import FileTaskHandler
26+
from airflow.utils.log.logging_mixin import LoggingMixin
27+
28+
29+
class WasbTaskHandler(FileTaskHandler, LoggingMixin):
30+
"""
31+
WasbTaskHandler is a python log handler that handles and reads
32+
task instance logs. It extends airflow FileTaskHandler and
33+
uploads to and reads from Wasb remote storage.
34+
"""
35+
36+
def __init__(self, base_log_folder, wasb_log_folder, wasb_container,
37+
filename_template, delete_local_copy):
38+
super().__init__(base_log_folder, filename_template)
39+
self.wasb_container = wasb_container
40+
self.remote_base = wasb_log_folder
41+
self.log_relative_path = ''
42+
self._hook = None
43+
self.closed = False
44+
self.upload_on_close = True
45+
self.delete_local_copy = delete_local_copy
46+
47+
@cached_property
48+
def hook(self):
49+
"""
50+
Returns WasbHook.
51+
"""
52+
remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
53+
try:
54+
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
55+
return WasbHook(remote_conn_id)
56+
except AzureHttpError:
57+
self.log.error(
58+
'Could not create an WasbHook with connection id "%s". '
59+
'Please make sure that airflow[azure] is installed and '
60+
'the Wasb connection exists.', remote_conn_id
61+
)
62+
63+
def set_context(self, ti):
64+
super().set_context(ti)
65+
# Local location and remote location is needed to open and
66+
# upload local log file to Wasb remote storage.
67+
self.log_relative_path = self._render_filename(ti, ti.try_number)
68+
self.upload_on_close = not ti.raw
69+
70+
def close(self):
71+
"""
72+
Close and upload local log file to remote storage Wasb.
73+
"""
74+
# When application exit, system shuts down all handlers by
75+
# calling close method. Here we check if logger is already
76+
# closed to prevent uploading the log to remote storage multiple
77+
# times when `logging.shutdown` is called.
78+
if self.closed:
79+
return
80+
81+
super().close()
82+
83+
if not self.upload_on_close:
84+
return
85+
86+
local_loc = os.path.join(self.local_base, self.log_relative_path)
87+
remote_loc = os.path.join(self.remote_base, self.log_relative_path)
88+
if os.path.exists(local_loc):
89+
# read log and remove old logs to get just the latest additions
90+
with open(local_loc, 'r') as logfile:
91+
log = logfile.read()
92+
self.wasb_write(log, remote_loc, append=True)
93+
94+
if self.delete_local_copy:
95+
shutil.rmtree(os.path.dirname(local_loc))
96+
# Mark closed so we don't double write if close is called twice
97+
self.closed = True
98+
99+
def _read(self, ti, try_number, metadata=None):
100+
"""
101+
Read logs of given task instance and try_number from Wasb remote storage.
102+
If failed, read the log from task instance host machine.
103+
104+
:param ti: task instance object
105+
:param try_number: task instance try_number to read logs from
106+
:param metadata: log metadata,
107+
can be used for steaming log reading and auto-tailing.
108+
"""
109+
# Explicitly getting log relative path is necessary as the given
110+
# task instance might be different than task instance passed in
111+
# in set_context method.
112+
log_relative_path = self._render_filename(ti, try_number)
113+
remote_loc = os.path.join(self.remote_base, log_relative_path)
114+
115+
if self.wasb_log_exists(remote_loc):
116+
# If Wasb remote file exists, we do not fetch logs from task instance
117+
# local machine even if there are errors reading remote logs, as
118+
# returned remote_log will contain error messages.
119+
remote_log = self.wasb_read(remote_loc, return_error=True)
120+
log = '*** Reading remote log from {}.\n{}\n'.format(
121+
remote_loc, remote_log)
122+
return log, {'end_of_log': True}
123+
else:
124+
return super()._read(ti, try_number)
125+
126+
def wasb_log_exists(self, remote_log_location):
127+
"""
128+
Check if remote_log_location exists in remote storage
129+
130+
:param remote_log_location: log's location in remote storage
131+
:return: True if location exists else False
132+
"""
133+
try:
134+
return self.hook.check_for_blob(self.wasb_container, remote_log_location)
135+
except Exception: # pylint: disable=broad-except
136+
pass
137+
return False
138+
139+
def wasb_read(self, remote_log_location, return_error=False):
140+
"""
141+
Returns the log found at the remote_log_location. Returns '' if no
142+
logs are found or there is an error.
143+
144+
:param remote_log_location: the log's location in remote storage
145+
:type remote_log_location: str (path)
146+
:param return_error: if True, returns a string error message if an
147+
error occurs. Otherwise returns '' when an error occurs.
148+
:type return_error: bool
149+
"""
150+
try:
151+
return self.hook.read_file(self.wasb_container, remote_log_location)
152+
except AzureHttpError:
153+
msg = 'Could not read logs from {}'.format(remote_log_location)
154+
self.log.exception(msg)
155+
# return error if needed
156+
if return_error:
157+
return msg
158+
159+
def wasb_write(self, log, remote_log_location, append=True):
160+
"""
161+
Writes the log to the remote_log_location. Fails silently if no hook
162+
was created.
163+
164+
:param log: the log to write to the remote_log_location
165+
:type log: str
166+
:param remote_log_location: the log's location in remote storage
167+
:type remote_log_location: str (path)
168+
:param append: if False, any existing log file is overwritten. If True,
169+
the new log is appended to any existing logs.
170+
:type append: bool
171+
"""
172+
if append and self.wasb_log_exists(remote_log_location):
173+
old_log = self.wasb_read(remote_log_location)
174+
log = '\n'.join([old_log, log]) if old_log else log
175+
176+
try:
177+
self.hook.load_string(
178+
log,
179+
self.wasb_container,
180+
remote_log_location,
181+
)
182+
except AzureHttpError:
183+
self.log.exception('Could not write logs to %s',
184+
remote_log_location)

0 commit comments

Comments
 (0)