Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import java.util.Objects;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Tracks the current value (and delta) for a Histogram metric.
*
* <p>This class generally shouldn't be used directly. The only exception is within a runner where a
* histogram is being reported for a specific step (rather than the histogram in the current
* context). In that case retrieving the underlying cell and reporting directly to it avoids a step
* of indirection.
*/
public class HistogramCell
implements org.apache.beam.sdk.metrics.Histogram, MetricCell<HistogramData> {

private final DirtyState dirty = new DirtyState();
private final HistogramData value;
private final MetricName name;

/**
* Generally, runners should construct instances using the methods in {@link
* MetricsContainerImpl}, unless they need to define their own version of {@link
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
*/
public HistogramCell(KV<MetricName, HistogramData.BucketType> kv) {
this.name = kv.getKey();
this.value = new HistogramData(kv.getValue());
}

@Override
public void reset() {
dirty.afterModification();
value.clear();
}

/** Increment the distribution by the given amount. */
@Override
public void update(double value) {
this.value.record(value);
dirty.afterModification();
}

@Override
public DirtyState getDirty() {
return dirty;
}

@Override
public HistogramData getCumulative() {
return value;
}

@Override
public MetricName getName() {
return name;
}

@Override
public boolean equals(@Nullable Object object) {
if (object instanceof HistogramCell) {
HistogramCell histogramCell = (HistogramCell) object;
return Objects.equals(dirty, histogramCell.dirty)
&& Objects.equals(value, histogramCell.value)
&& Objects.equals(name, histogramCell.name);
}

return false;
}

@Override
public int hashCode() {
return Objects.hash(dirty, value, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.DelegatingCounter;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.util.HistogramData;

/**
* Define a metric on the current MetricContainer with a specific URN and a set of labels. This is a
Expand All @@ -32,4 +35,15 @@ public class LabeledMetrics {
public static Counter counter(MonitoringInfoMetricName metricName) {
return new DelegatingCounter(metricName);
}

public static Counter counter(MonitoringInfoMetricName metricName, boolean processWideContainer) {
return new DelegatingCounter(metricName, processWideContainer);
}

public static Histogram histogram(
MonitoringInfoMetricName metricName,
HistogramData.BucketType bucketType,
boolean processWideContainer) {
return new DelegatingHistogram(metricName, bucketType, processWideContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsLogger;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -58,7 +61,7 @@
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class MetricsContainerImpl implements Serializable, MetricsContainer {
public class MetricsContainerImpl implements Serializable, MetricsContainer, MetricsLogger {
private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class);

private final @Nullable String stepName;
Expand All @@ -70,19 +73,24 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

/** Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. */
public MetricsContainerImpl(@Nullable String stepName) {
this.stepName = stepName;
}

/** Reset the metrics. */
@Override
public void reset() {
reset(counters);
reset(distributions);
reset(gauges);
reset(histograms);
}

private void reset(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
private void reset(MetricsMap<?, ? extends MetricCell<?>> cells) {
for (MetricCell<?> cell : cells.values()) {
cell.reset();
}
Expand Down Expand Up @@ -122,6 +130,24 @@ public DistributionCell getDistribution(MetricName metricName) {
return distributions.tryGet(metricName);
}

/**
* Return a {@code HistogramCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
*/
@Override
public HistogramCell getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
return histograms.get(KV.of(metricName, bucketType));
}

/**
* Return a {@code HistogramCell} named {@code metricName}. If it doesn't exist, return {@code
* null}.
*/
public @Nullable HistogramCell tryGetHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
return histograms.tryGet(KV.of(metricName, bucketType));
}

/**
* Return a {@code GaugeCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
Expand Down Expand Up @@ -383,4 +409,75 @@ public boolean equals(@Nullable Object object) {
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges);
}

/**
* Match a MetricName with a given namespace and a name. If the namespace or the name is null, it
* will be ignored for the match.
*/
private boolean matchMetricName(
MetricName metricName, @Nullable String namespace, @Nullable String name) {
return (namespace == null || namespace.equals(metricName.getNamespace()))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This equality looks werid, why would it be considered equal if the param is null?

Can you add a clairifying comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, equals prefix is misleading. It's more like matching and when the field will be ignored when the parameter is null. I renamed the method.

&& (name == null || name.equals(metricName.getName()));
}
/** Return a string representing the cumulative values of all metrics in this container. */
@Override
public String getCumulativeString(String namespace, String name) {
StringBuilder message = new StringBuilder();
for (Map.Entry<MetricName, CounterCell> cell : counters.entries()) {
if (!matchMetricName(cell.getKey(), namespace, name)) {
continue;
}
message.append(cell.getKey().toString());
message.append(" = ");
message.append(cell.getValue().getCumulative());
message.append("\n");
}
for (Map.Entry<MetricName, DistributionCell> cell : distributions.entries()) {
if (!matchMetricName(cell.getKey(), namespace, name)) {
continue;
}
message.append(cell.getKey().toString());
message.append(" = ");
DistributionData data = cell.getValue().getCumulative();
message.append(
String.format(
"{sum: %d, count: %d, min: %d, max: %d}",
data.sum(), data.count(), data.min(), data.max()));
message.append("\n");
}
for (Map.Entry<MetricName, GaugeCell> cell : gauges.entries()) {
if (!matchMetricName(cell.getKey(), namespace, name)) {
continue;
}
message.append(cell.getKey().toString());
message.append(" = ");
GaugeData data = cell.getValue().getCumulative();
message.append(String.format("{timestamp: %s, value: %d}", data.timestamp(), data.value()));
message.append("\n");
}
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
histograms.entries()) {
if (!matchMetricName(cell.getKey().getKey(), namespace, name)) {
continue;
}
message.append(cell.getKey().getKey().toString());
message.append(" = ");
HistogramData data = cell.getValue().getCumulative();
if (data.getTotalCount() > 0) {
message.append(
String.format(
"{count: %d, p50: %f, p90: %f, p99: %f}",
data.getTotalCount(), data.p50(), data.p90(), data.p99()));
} else {
message.append("{count: 0}");
}
message.append("\n");
}
return message.toString();
}

@Override
public Logger getMetricLogger() {
return LOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static final class Labels {
public static final String ENVIRONMENT = "ENVIRONMENT";
public static final String NAMESPACE = "NAMESPACE";
public static final String NAME = "NAME";
public static final String SERVICE = "SERVICE";
public static final String METHOD = "METHOD";

static {
checkArgument(PTRANSFORM.equals(extractLabel(MonitoringInfoLabels.TRANSFORM)));
Expand All @@ -75,6 +77,8 @@ public static final class Labels {
checkArgument(ENVIRONMENT.equals(extractLabel(MonitoringInfoLabels.ENVIRONMENT)));
checkArgument(NAMESPACE.equals(extractLabel(MonitoringInfoLabels.NAMESPACE)));
checkArgument(NAME.equals(extractLabel(MonitoringInfoLabels.NAME)));
checkArgument(SERVICE.equals(extractLabel(MonitoringInfoLabels.SERVICE)));
checkArgument(METHOD.equals(extractLabel(MonitoringInfoLabels.METHOD)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static MonitoringInfoMetricName of(MetricsApi.MonitoringInfo mi) {
return new MonitoringInfoMetricName(mi.getUrn(), mi.getLabelsMap());
}

public static MonitoringInfoMetricName named(String urn, HashMap<String, String> labels) {
public static MonitoringInfoMetricName named(String urn, Map<String, String> labels) {
return new MonitoringInfoMetricName(urn, labels);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.CustomSources;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
Expand Down Expand Up @@ -290,6 +292,8 @@ public static void main(String[] args) throws Exception {
StreamingDataflowWorker worker =
StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options, sdkHarnessRegistry);

MetricsEnvironment.setProcessWideContainer(new MetricsContainerImpl(null));

JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
worker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@
@Internal
public class DelegatingCounter implements Metric, Counter, Serializable {
private final MetricName name;
private final boolean processWideContainer;

public DelegatingCounter(MetricName name) {
this(name, false);
}

public DelegatingCounter(MetricName name, boolean processWideContainer) {
this.name = name;
this.processWideContainer = processWideContainer;
}

/** Increment the counter. */
Expand All @@ -38,7 +44,10 @@ public void inc() {
/** Increment the counter by the given amount. */
@Override
public void inc(long n) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
MetricsContainer container =
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container != null) {
container.getCounter(name).inc(n);
}
Expand Down
Loading