diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index 3ea09cf167..c46172eb04 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -210,3 +210,44 @@ called, either by calling any of the `PrimeUpdateAndCacheUtils` methods again or updated via `PrimaryUpdateAndCacheUtils`. See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache). + +### Trigger reconciliation on all events + +TLDR; We provide an execution mode where `reconcile` method is called on every event from event source. + +The framework optimizes execution for generic use cases, which in almost all cases falls into two categories: + +1. The controller does not use finalizers; thus when the primary resource is deleted, all the managed secondary + resources are cleaned up using the Kubernetes garbage collection mechanism, a.k.a., using owner references. +2. When finalizers are used (using `Cleaner` interface), thus when controller requires some explicit cleanup logic, typically for external + resources and when secondary resources are in different namespace than the primary resources (owner references + cannot be used in this case). + +Note that for example framework neither of those cases triggers the reconciler on the `Delete` event of the primary resource. +When finalizer is used, it calls `cleaner(..)` method when resource is marked for deletion and our (not other) finalizer +is present. When there is no finalizer, does not make sense to call the `reconciel(..)` method on a `Delete` event +since all the cleanup will be done by the garbage collector. This way we spare reconciliation cycles. + +However, there are cases when controllers do not strictly follow those patterns, typically when: +- Only some of the primary resources use finalizers, e.g., for some of the primary resources you need + to create an external resource for others not. +- You maintain some additional in memory caches (so not all the caches are encapsulated by an `EventSource`) + and you don't want to use finalizers. For those cases, you typically want to clean up your caches when the primary + resource is deleted. + +For such use cases you can set [`triggerReconcilerOnAllEvent`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java#L81) +to `true`, as a result, `reconcile` method will be triggered on ALL events (so also `Delete` events), and you +are free to optimize you reconciliation for the use cases above and possibly others. + +In this mode: +- even if the primary resource is already deleted from the Informer's cache, we will still pass the last known state + as the parameter for the reconciler. You can check if the resource is deleted using `Context.isPrimaryResourceDeleted()`. +- The retry, rate limiting, re-schedule, filters mechanisms work normally. (The internal caches related to the resource + are cleaned up only when there was a successful reconiliation after `Delete` event received for the primary resource + and reconciliation was not re-scheduled. +- you cannot use `Cleaner` interface. The framework assumes you will explicitly manage the finalizers. To + add finalizer you can use [`PrimeUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java#L308). +- you cannot use managed dependent resources since those manage the finalizers and other logic related to the normal + execution mode. + + \ No newline at end of file diff --git a/operator-framework-core/pom.xml b/operator-framework-core/pom.xml index c99b609113..c5c2f54986 100644 --- a/operator-framework-core/pom.xml +++ b/operator-framework-core/pom.xml @@ -79,6 +79,11 @@ awaitility test + + io.fabric8 + kube-api-test-client-inject + test + diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java index 891f199dbe..1c5a79d79e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java @@ -304,12 +304,15 @@ private

ResolvedControllerConfiguration

controllerCon final var dependentFieldManager = fieldManager.equals(CONTROLLER_NAME_AS_FIELD_MANAGER) ? name : fieldManager; + var triggerReconcilerOnAllEvent = + annotation != null && annotation.triggerReconcilerOnAllEvent(); + InformerConfiguration

informerConfig = InformerConfiguration.builder(resourceClass) .initFromAnnotation(annotation != null ? annotation.informer() : null, context) .buildForController(); - return new ResolvedControllerConfiguration

( + return new ResolvedControllerConfiguration<>( name, generationAware, associatedReconcilerClass, @@ -323,7 +326,8 @@ private

ResolvedControllerConfiguration

controllerCon null, dependentFieldManager, this, - informerConfig); + informerConfig, + triggerReconcilerOnAllEvent); } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index 3898493c82..9c9f43485d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -92,4 +92,8 @@ default String fieldManager() { } C getConfigurationFor(DependentResourceSpec spec); + + default boolean triggerReconcilerOnAllEvent() { + return false; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index d2e37a397d..c7fb6f7f6d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -30,6 +30,7 @@ public class ControllerConfigurationOverrider { private Duration reconciliationMaxInterval; private Map configurations; private final InformerConfiguration.Builder config; + private boolean triggerReconcilerOnAllEvent; private ControllerConfigurationOverrider(ControllerConfiguration original) { this.finalizer = original.getFinalizerName(); @@ -42,6 +43,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) { this.rateLimiter = original.getRateLimiter(); this.name = original.getName(); this.fieldManager = original.fieldManager(); + this.triggerReconcilerOnAllEvent = original.triggerReconcilerOnAllEvent(); } public ControllerConfigurationOverrider withFinalizer(String finalizer) { @@ -154,6 +156,12 @@ public ControllerConfigurationOverrider withFieldManager(String dependentFiel return this; } + public ControllerConfigurationOverrider withTriggerReconcilerOnAllEvent( + boolean triggerReconcilerOnAllEvent) { + this.triggerReconcilerOnAllEvent = triggerReconcilerOnAllEvent; + return this; + } + /** * Sets a max page size limit when starting the informer. This will result in pagination while * populating the cache. This means that longer lists will take multiple requests to fetch. See @@ -198,6 +206,7 @@ public ControllerConfiguration build() { fieldManager, original.getConfigurationService(), config.buildForController(), + triggerReconcilerOnAllEvent, original.getWorkflowSpec().orElse(null)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java index 3c26659ed2..ac0ded6b11 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java @@ -29,6 +29,7 @@ public class ResolvedControllerConfiguration

private final Map configurations; private final ConfigurationService configurationService; private final String fieldManager; + private final boolean triggerReconcilerOnAllEvent; private WorkflowSpec workflowSpec; public ResolvedControllerConfiguration(ControllerConfiguration

other) { @@ -44,6 +45,7 @@ public ResolvedControllerConfiguration(ControllerConfiguration

other) { other.fieldManager(), other.getConfigurationService(), other.getInformerConfig(), + other.triggerReconcilerOnAllEvent(), other.getWorkflowSpec().orElse(null)); } @@ -59,6 +61,7 @@ public ResolvedControllerConfiguration( String fieldManager, ConfigurationService configurationService, InformerConfiguration

informerConfig, + boolean triggerReconcilerOnAllEvent, WorkflowSpec workflowSpec) { this( name, @@ -71,7 +74,8 @@ public ResolvedControllerConfiguration( configurations, fieldManager, configurationService, - informerConfig); + informerConfig, + triggerReconcilerOnAllEvent); setWorkflowSpec(workflowSpec); } @@ -86,7 +90,8 @@ protected ResolvedControllerConfiguration( Map configurations, String fieldManager, ConfigurationService configurationService, - InformerConfiguration

informerConfig) { + InformerConfiguration

informerConfig, + boolean triggerReconcilerOnAllEvent) { this.informerConfig = informerConfig; this.configurationService = configurationService; this.name = ControllerConfiguration.ensureValidName(name, associatedReconcilerClassName); @@ -99,6 +104,7 @@ protected ResolvedControllerConfiguration( this.finalizer = ControllerConfiguration.ensureValidFinalizerName(finalizer, getResourceTypeName()); this.fieldManager = fieldManager; + this.triggerReconcilerOnAllEvent = triggerReconcilerOnAllEvent; } protected ResolvedControllerConfiguration( @@ -117,7 +123,8 @@ protected ResolvedControllerConfiguration( null, null, configurationService, - InformerConfiguration.builder(resourceClass).buildForController()); + InformerConfiguration.builder(resourceClass).buildForController(), + false); } @Override @@ -207,4 +214,9 @@ public C getConfigurationFor(DependentResourceSpec spec) { public String fieldManager() { return fieldManager; } + + @Override + public boolean triggerReconcilerOnAllEvent() { + return triggerReconcilerOnAllEvent; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java index f47deb9734..36df3666e6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java @@ -72,4 +72,21 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) { * @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise */ boolean isNextReconciliationImminent(); + + /** + * To check if the primary resource is already deleted. This value can be true only if you turn on + * {@link + * io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration#triggerReconcilerOnAllEvent()} + * + * @return true Delete event received for primary resource + */ + boolean isPrimaryResourceDeleted(); + + /** + * Check this only if {@link #isPrimaryResourceDeleted()} is true. + * + * @return true if the primary resource is deleted, but the last known state is only available + * from the caches of the underlying Informer, not from Delete event. + */ + boolean isPrimaryResourceFinalStateUnknown(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index d407ed0fc6..db0f799172 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -77,4 +77,11 @@ MaxReconciliationInterval maxReconciliationInterval() default * @return the name used as field manager for SSA operations */ String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER; + + /** + * By settings to true, reconcile method will be triggered on every event, thus even for Delete + * event. You cannot use {@link Cleaner} or managed dependent resources in that case. See + * documentation for further details. + */ + boolean triggerReconcilerOnAllEvent() default false; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index 2acf8d13ca..5a9166d047 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -24,12 +24,21 @@ public class DefaultContext

implements Context

{ private final ControllerConfiguration

controllerConfiguration; private final DefaultManagedWorkflowAndDependentResourceContext

defaultManagedDependentResourceContext; - - public DefaultContext(RetryInfo retryInfo, Controller

controller, P primaryResource) { + private final boolean primaryResourceDeleted; + private final boolean primaryResourceFinalStateUnknown; + + public DefaultContext( + RetryInfo retryInfo, + Controller

controller, + P primaryResource, + boolean primaryResourceDeleted, + boolean primaryResourceFinalStateUnknown) { this.retryInfo = retryInfo; this.controller = controller; this.primaryResource = primaryResource; this.controllerConfiguration = controller.getConfiguration(); + this.primaryResourceDeleted = primaryResourceDeleted; + this.primaryResourceFinalStateUnknown = primaryResourceFinalStateUnknown; this.defaultManagedDependentResourceContext = new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this); } @@ -119,6 +128,16 @@ public boolean isNextReconciliationImminent() { .isNextReconciliationImminent(ResourceID.fromResource(primaryResource)); } + @Override + public boolean isPrimaryResourceDeleted() { + return primaryResourceDeleted; + } + + @Override + public boolean isPrimaryResourceFinalStateUnknown() { + return primaryResourceFinalStateUnknown; + } + public DefaultContext

setRetryInfo(RetryInfo retryInfo) { this.retryInfo = retryInfo; return this; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index c61cc837c1..18664e606c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -1,19 +1,26 @@ package io.javaoperatorsdk.operator.api.reconciler; +import java.lang.reflect.InvocationTargetException; import java.time.LocalTime; import java.time.temporal.ChronoUnit; +import java.util.function.Predicate; import java.util.function.UnaryOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; +import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; + /** * Utility methods to patch the primary resource state and store it to the related cache, to make * sure that the latest version of the resource is present for the next reconciliation. The main use @@ -152,7 +159,7 @@ public static

P updateAndCacheResource( long cachePollPeriodMillis) { if (log.isDebugEnabled()) { - log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate)); + log.debug("Update and cache: {}", ResourceID.fromResource(resourceToUpdate)); } P modified = null; int retryIndex = 0; @@ -229,4 +236,143 @@ private static

P pollLocalCache( throw new OperatorException(e); } } + + /** Adds finalizer using JSON Patch. Retries conflicts and unprocessable content (HTTP 422) */ + @SuppressWarnings("unchecked") + public static

P addFinalizer(Context

context, String finalizer) { + return addFinalizer(context.getClient(), context.getPrimaryResource(), finalizer); + } + + /** Adds finalizer using JSON Patch. Retries conflicts and unprocessable content (HTTP 422) */ + @SuppressWarnings("unchecked") + public static

P addFinalizer( + KubernetesClient client, P resource, String finalizerName) { + return conflictRetryingPatch( + client, + resource, + r -> { + r.addFinalizer(finalizerName); + return r; + }, + r -> !r.hasFinalizer(finalizerName)); + } + + public static

P removeFinalizer( + Context

context, String finalizerName) { + return removeFinalizer(context.getClient(), context.getPrimaryResource(), finalizerName); + } + + public static

P removeFinalizer( + KubernetesClient client, P resource, String finalizerName) { + return conflictRetryingPatch( + client, + resource, + r -> { + r.removeFinalizer(finalizerName); + return r; + }, + r -> r.hasFinalizer(finalizerName)); + } + + @SuppressWarnings("unchecked") + public static

P conflictRetryingPatch( + KubernetesClient client, + P resource, + UnaryOperator

unaryOperator, + Predicate

preCondition) { + if (log.isDebugEnabled()) { + log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resource)); + } + int retryIndex = 0; + while (true) { + try { + if (!preCondition.test(resource)) { + return resource; + } + return client.resource(resource).edit(unaryOperator); + } catch (KubernetesClientException e) { + log.trace("Exception during patch for resource: {}", resource); + retryIndex++; + // only retry on conflict (409) and unprocessable content (422) which + // can happen if JSON Patch is not a valid request since there was + // a concurrent request which already removed another finalizer: + // List element removal from a list is by index in JSON Patch + // so if addressing a second finalizer but first is meanwhile removed + // it is a wrong request. + if (e.getCode() != 409 && e.getCode() != 422) { + throw e; + } + if (retryIndex >= DEFAULT_MAX_RETRY) { + throw new OperatorException( + "Exceeded maximum (" + + DEFAULT_MAX_RETRY + + ") retry attempts to patch resource: " + + ResourceID.fromResource(resource)); + } + log.debug( + "Retrying patch for resource name: {}, namespace: {}; HTTP code: {}", + resource.getMetadata().getName(), + resource.getMetadata().getNamespace(), + e.getCode()); + var operation = client.resources(resource.getClass()); + if (resource.getMetadata().getNamespace() != null) { + resource = + (P) + operation + .inNamespace(resource.getMetadata().getNamespace()) + .withName(resource.getMetadata().getName()) + .get(); + } else { + resource = (P) operation.withName(resource.getMetadata().getName()).get(); + } + } + } + } + + /** Adds finalizer using Server-Side Apply. */ + public static

P addFinalizerWithSSA( + Context

context, P originalResource, String finalizerName) { + return addFinalizerWithSSA( + context.getClient(), + originalResource, + finalizerName, + context.getControllerConfiguration().fieldManager()); + } + + /** Adds finalizer using Server-Side Apply. */ + @SuppressWarnings("unchecked") + public static

P addFinalizerWithSSA( + KubernetesClient client, P originalResource, String finalizerName, String fieldManager) { + if (log.isDebugEnabled()) { + log.debug( + "Adding finalizer (using SSA) for resource: {} version: {}", + getUID(originalResource), + getVersion(originalResource)); + } + try { + P resource = (P) originalResource.getClass().getConstructor().newInstance(); + ObjectMeta objectMeta = new ObjectMeta(); + objectMeta.setName(originalResource.getMetadata().getName()); + objectMeta.setNamespace(originalResource.getMetadata().getNamespace()); + resource.setMetadata(objectMeta); + resource.addFinalizer(finalizerName); + return client + .resource(resource) + .patch( + new PatchContext.Builder() + .withFieldManager(fieldManager) + .withForce(true) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build()); + } catch (InstantiationException + | IllegalAccessException + | InvocationTargetException + | NoSuchMethodException e) { + throw new RuntimeException( + "Issue with creating custom resource instance with reflection." + + " Custom Resources must provide a no-arg constructor. Class: " + + originalResource.getClass().getName(), + e); + } + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index a53d52c429..851c6b011d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -61,6 +61,11 @@ public class Controller

private static final String RESOURCE = "resource"; private static final String STATUS = "status"; private static final String BOTH = "both"; + public static final String CLEANER_NOT_SUPPORTED_ON_ALL_EVENT_ERROR_MESSAGE = + "Cleaner is not supported when triggerReconcilerOnAllEvent enabled."; + public static final String + MANAGED_WORKFLOWS_NOT_SUPPORTED_TRIGGER_RECONCILER_ON_ALL_EVENT_ERROR_MESSAGE = + "Managed workflows are not supported when triggerReconcilerOnAllEvent enabled."; private final Reconciler

reconciler; private final ControllerConfiguration

configuration; @@ -97,6 +102,16 @@ public Controller( explicitWorkflowInvocation = configuration.getWorkflowSpec().map(WorkflowSpec::isExplicitInvocation).orElse(false); + if (configuration.triggerReconcilerOnAllEvent()) { + if (isCleaner) { + throw new OperatorException(CLEANER_NOT_SUPPORTED_ON_ALL_EVENT_ERROR_MESSAGE); + } + if (managedWorkflow != null && !managedWorkflow.isEmpty()) { + throw new OperatorException( + MANAGED_WORKFLOWS_NOT_SUPPORTED_TRIGGER_RECONCILER_ON_ALL_EVENT_ERROR_MESSAGE); + } + } + eventSourceManager = new EventSourceManager<>(this); eventProcessor = new EventProcessor<>(eventSourceManager, configurationService); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); @@ -486,4 +501,8 @@ public boolean workflowContainsDependentForType(Class clazz) { return managedWorkflow.getDependentResourcesByName().values().stream() .anyMatch(d -> d.resourceType().equals(clazz)); } + + public boolean isCleaner() { + return isCleaner; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index e029e287a0..4469cbc80e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -23,6 +23,7 @@ import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -130,7 +131,7 @@ public synchronized void handleEvent(Event event) { } private void handleMarkedEventForResource(ResourceState state) { - if (state.deleteEventPresent()) { + if (state.deleteEventPresent() && !triggerOnAllEvent()) { cleanupForDeletedEvent(state.getId()); } else if (!state.processedMarkForDeletionPresent()) { submitReconciliationExecution(state); @@ -143,7 +144,8 @@ private void submitReconciliationExecution(ResourceState state) { final var resourceID = state.getId(); Optional

maybeLatest = cache.get(resourceID); maybeLatest.ifPresent(MDCUtils::addResourceInfo); - if (!controllerUnderExecution && maybeLatest.isPresent()) { + if (!controllerUnderExecution + && (maybeLatest.isPresent() || (triggerOnAllEvent() && state.deleteEventPresent()))) { var rateLimit = state.getRateLimit(); if (rateLimit == null) { rateLimit = rateLimiter.initState(); @@ -155,9 +157,16 @@ private void submitReconciliationExecution(ResourceState state) { return; } state.setUnderProcessing(true); - final var latest = maybeLatest.get(); - ExecutionScope

executionScope = new ExecutionScope<>(state.getRetry()); - state.unMarkEventReceived(); + final var latest = maybeLatest.orElseGet(() -> getResourceFromState(state)); + // passing the latest resources for a corner case when delete event received + // during processing an event + ExecutionScope

executionScope = + new ExecutionScope<>( + latest, + state.getRetry(), + state.deleteEventPresent(), + state.isDeleteFinalStateUnknown()); + state.unMarkEventReceived(triggerOnAllEvent()); metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata); log.debug("Executing events for custom resource. Scope: {}", executionScope); executor.execute(new ReconcilerExecutor(resourceID, executionScope)); @@ -182,12 +191,25 @@ private void submitReconciliationExecution(ResourceState state) { } } + @SuppressWarnings("unchecked") + private P getResourceFromState(ResourceState state) { + if (triggerOnAllEvent()) { + log.debug("Getting resource from state for {}", state.getId()); + return (P) state.getLastKnownResource(); + } else { + throw new IllegalStateException( + "No resource found, this indicates issue with implementation."); + } + } + private void handleEventMarking(Event event, ResourceState state) { final var relatedCustomResourceID = event.getRelatedCustomResourceID(); if (event instanceof ResourceEvent resourceEvent) { if (resourceEvent.getAction() == ResourceAction.DELETED) { log.debug("Marking delete event received for: {}", relatedCustomResourceID); - state.markDeleteEventReceived(); + state.markDeleteEventReceived( + resourceEvent.getResource().orElseThrow(), + ((ResourceDeleteEvent) resourceEvent).isDeletedFinalStateUnknown()); } else { if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) { log.debug( @@ -202,10 +224,12 @@ private void handleEventMarking(Event event, ResourceState state) { // removed, but also the informers websocket is disconnected and later reconnected. So // meanwhile the resource could be deleted and recreated. In this case we just mark a new // event as below. - markEventReceived(state); + state.markEventReceived(triggerOnAllEvent()); } } else if (!state.deleteEventPresent() && !state.processedMarkForDeletionPresent()) { - markEventReceived(state); + state.markEventReceived(triggerOnAllEvent()); + } else if (triggerOnAllEvent() && state.deleteEventPresent()) { + state.markAdditionalEventAfterDeleteEvent(); } else if (log.isDebugEnabled()) { log.debug( "Skipped marking event as received. Delete event present: {}, processed mark for" @@ -215,11 +239,6 @@ private void handleEventMarking(Event event, ResourceState state) { } } - private void markEventReceived(ResourceState state) { - log.debug("Marking event received for: {}", state.getId()); - state.markEventReceived(); - } - private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) { return resourceEvent.getResource().map(HasMetadata::isMarkedForDeletion).orElse(false); } @@ -252,20 +271,22 @@ synchronized void eventProcessingFinished( // Either way we don't want to retry. if (isRetryConfigured() && postExecutionControl.exceptionDuringExecution() - && !state.deleteEventPresent()) { + && (!state.deleteEventPresent() || triggerOnAllEvent())) { handleRetryOnException( executionScope, postExecutionControl.getRuntimeException().orElseThrow()); return; } cleanupOnSuccessfulExecution(executionScope); metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata); - if (state.deleteEventPresent()) { + if ((triggerOnAllEvent() && executionScope.isDeleteEvent()) + || (!triggerOnAllEvent() && state.deleteEventPresent())) { cleanupForDeletedEvent(executionScope.getResourceID()); } else if (postExecutionControl.isFinalizerRemoved()) { state.markProcessedMarkForDeletion(); metrics.cleanupDoneFor(resourceID, metricsMetadata); } else { - if (state.eventPresent()) { + if (state.eventPresent() || (triggerOnAllEvent() && state.deleteEventPresent())) { + log.debug("Submitting for reconciliation."); submitReconciliationExecution(state); } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); @@ -328,8 +349,10 @@ TimerEventSource

retryEventSource() { private void handleRetryOnException(ExecutionScope

executionScope, Exception exception) { final var state = getOrInitRetryExecution(executionScope); var resourceID = state.getId(); - boolean eventPresent = state.eventPresent(); - state.markEventReceived(); + boolean eventPresent = + state.eventPresent() + || (triggerOnAllEvent() && state.isAdditionalEventPresentAfterDeleteEvent()); + state.markEventReceived(triggerOnAllEvent()); retryAwareErrorLogging(state.getRetry(), eventPresent, exception, executionScope); if (eventPresent) { @@ -458,6 +481,7 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope

executionSco } @Override + @SuppressWarnings("unchecked") public void run() { if (!running) { // this is needed for the case when controller stopped, but there is a graceful shutdown @@ -466,14 +490,37 @@ public void run() { log.debug("Event processor not running skipping resource processing: {}", resourceID); return; } + log.debug("Running reconcile executor for: {}", executionScope); // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); try { + // we try to get the most up-to-date resource from cache var actualResource = cache.get(resourceID); if (actualResource.isEmpty()) { - log.debug("Skipping execution; primary resource missing from cache: {}", resourceID); - return; + if (triggerOnAllEvent()) { + log.debug( + "Resource not found in the cache, checking for delete event resource: {}", + resourceID); + var state = resourceStateManager.get(resourceID); + if (executionScope.isDeleteEvent()) { + actualResource = + (Optional

) + state + .filter(ResourceState::deleteEventPresent) + .map(ResourceState::getLastKnownResource); + if (actualResource.isEmpty()) { + throw new IllegalStateException("this should not happen"); + } + } else { + log.debug("Skipping execution since delete event received meanwhile"); + eventProcessingFinished(executionScope, PostExecutionControl.defaultDispatch()); + return; + } + } else { + log.debug("Skipping execution; primary resource missing from cache: {}", resourceID); + return; + } } actualResource.ifPresent(executionScope::setResource); MDCUtils.addResourceInfo(executionScope.getResource()); @@ -509,4 +556,9 @@ public synchronized boolean isUnderProcessing(ResourceID resourceID) { public synchronized boolean isRunning() { return running; } + + // shortening + private boolean triggerOnAllEvent() { + return controllerConfiguration.triggerReconcilerOnAllEvent(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 8b07bf110b..c96d5b1ea2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -38,7 +38,9 @@ public class EventSourceManager

private final ExecutorServiceManager executorServiceManager; public EventSourceManager(Controller

controller) { - this(controller, new EventSources<>()); + this( + controller, + new EventSources<>(controller.getConfiguration().triggerReconcilerOnAllEvent())); } EventSourceManager(Controller

controller, EventSources

eventSources) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index 6a8b290c4f..f870b63962 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -22,10 +22,18 @@ class EventSources

{ new ConcurrentSkipListMap<>(); private final Map sourceByName = new HashMap<>(); - private final TimerEventSource

retryAndRescheduleTimerEventSource = - new TimerEventSource<>("RetryAndRescheduleTimerEventSource"); + private final TimerEventSource

retryAndRescheduleTimerEventSource; private ControllerEventSource

controllerEventSource; + public EventSources(boolean triggerReconcilerOnAllEvent) { + retryAndRescheduleTimerEventSource = + new TimerEventSource<>("RetryAndRescheduleTimerEventSource", triggerReconcilerOnAllEvent); + } + + EventSources() { + this(false); + } + public void add(EventSource eventSource) { final var name = eventSource.name(); var existing = sourceByName.get(name); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java index 90899a6e1a..bef40c0317 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java @@ -8,9 +8,15 @@ class ExecutionScope { // the latest custom resource from cache private R resource; private final RetryInfo retryInfo; + private boolean deleteEvent; + private boolean isDeleteFinalStateUnknown; - ExecutionScope(RetryInfo retryInfo) { + ExecutionScope( + R resource, RetryInfo retryInfo, boolean deleteEvent, boolean isDeleteFinalStateUnknown) { this.retryInfo = retryInfo; + this.deleteEvent = deleteEvent; + this.isDeleteFinalStateUnknown = isDeleteFinalStateUnknown; + this.resource = resource; } public ExecutionScope setResource(R resource) { @@ -26,17 +32,34 @@ public ResourceID getResourceID() { return ResourceID.fromResource(resource); } + public boolean isDeleteEvent() { + return deleteEvent; + } + + public void setDeleteEvent(boolean deleteEvent) { + this.deleteEvent = deleteEvent; + } + + public boolean isDeleteFinalStateUnknown() { + return isDeleteFinalStateUnknown; + } + + public void setDeleteFinalStateUnknown(boolean deleteFinalStateUnknown) { + isDeleteFinalStateUnknown = deleteFinalStateUnknown; + } + @Override public String toString() { - if (resource == null) { - return "ExecutionScope{resource: null}"; - } else - return "ExecutionScope{" - + " resource id: " - + ResourceID.fromResource(resource) - + ", version: " - + resource.getMetadata().getResourceVersion() - + '}'; + return "ExecutionScope{" + + "resource=" + + ResourceID.fromResource(resource) + + ", retryInfo=" + + retryInfo + + ", deleteEvent=" + + deleteEvent + + ", isDeleteFinalStateUnknown=" + + isDeleteFinalStateUnknown + + '}'; } public RetryInfo getRetryInfo() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index 41d7a4f493..70b69f5ad2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -81,7 +81,9 @@ private PostExecutionControl

handleDispatch(ExecutionScope

executionScope) originalResource.getMetadata().getNamespace()); final var markedForDeletion = originalResource.isMarkedForDeletion(); - if (markedForDeletion && shouldNotDispatchToCleanupWhenMarkedForDeletion(originalResource)) { + if (!triggerOnAllEvent() + && markedForDeletion + && shouldNotDispatchToCleanupWhenMarkedForDeletion(originalResource)) { log.debug( "Skipping cleanup of resource {} because finalizer(s) {} don't allow processing yet", getName(originalResource), @@ -90,9 +92,16 @@ private PostExecutionControl

handleDispatch(ExecutionScope

executionScope) } Context

context = - new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution); - if (markedForDeletion) { - return handleCleanup(resourceForExecution, originalResource, context); + new DefaultContext<>( + executionScope.getRetryInfo(), + controller, + resourceForExecution, + executionScope.isDeleteEvent(), + executionScope.isDeleteFinalStateUnknown()); + + // checking the cleaner for all-event-mode + if (!triggerOnAllEvent() && markedForDeletion) { + return handleCleanup(resourceForExecution, originalResource, context, executionScope); } else { return handleReconcile(executionScope, resourceForExecution, originalResource, context); } @@ -110,7 +119,8 @@ private PostExecutionControl

handleReconcile( P originalResource, Context

context) throws Exception { - if (controller.useFinalizer() + if (!triggerOnAllEvent() + && controller.useFinalizer() && !originalResource.hasFinalizer(configuration().getFinalizerName())) { /* * We always add the finalizer if missing and the controller is configured to use a finalizer. @@ -156,7 +166,7 @@ private PostExecutionControl

reconcileExecution( P updatedCustomResource = null; if (useSSA) { if (updateControl.isNoUpdate()) { - return createPostExecutionControl(null, updateControl); + return createPostExecutionControl(null, updateControl, executionScope); } else { toUpdate = updateControl.getResource().orElseThrow(); } @@ -177,7 +187,7 @@ private PostExecutionControl

reconcileExecution( if (updateControl.isPatchStatus()) { customResourceFacade.patchStatus(toUpdate, originalResource); } - return createPostExecutionControl(updatedCustomResource, updateControl); + return createPostExecutionControl(updatedCustomResource, updateControl, executionScope); } private PostExecutionControl

handleErrorStatusHandler( @@ -237,7 +247,7 @@ public boolean isLastAttempt() { } private PostExecutionControl

createPostExecutionControl( - P updatedCustomResource, UpdateControl

updateControl) { + P updatedCustomResource, UpdateControl

updateControl, ExecutionScope

executionScope) { PostExecutionControl

postExecutionControl; if (updatedCustomResource != null) { postExecutionControl = @@ -245,17 +255,31 @@ private PostExecutionControl

createPostExecutionControl( } else { postExecutionControl = PostExecutionControl.defaultDispatch(); } - updatePostExecutionControlWithReschedule(postExecutionControl, updateControl); + updatePostExecutionControlWithReschedule(postExecutionControl, updateControl, executionScope); return postExecutionControl; } private void updatePostExecutionControlWithReschedule( - PostExecutionControl

postExecutionControl, BaseControl baseControl) { - baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule); + PostExecutionControl

postExecutionControl, + BaseControl baseControl, + ExecutionScope

executionScope) { + baseControl + .getScheduleDelay() + .ifPresent( + r -> { + if (executionScope.isDeleteEvent()) { + log.warn("No re-schedules allowed when delete event present. Will be ignored."); + } else { + postExecutionControl.withReSchedule(r); + } + }); } private PostExecutionControl

handleCleanup( - P resourceForExecution, P originalResource, Context

context) { + P resourceForExecution, + P originalResource, + Context

context, + ExecutionScope

executionScope) { if (log.isDebugEnabled()) { log.debug( "Executing delete for resource: {} with version: {}", @@ -264,7 +288,7 @@ private PostExecutionControl

handleCleanup( } DeleteControl deleteControl = controller.cleanup(resourceForExecution, context); final var useFinalizer = controller.useFinalizer(); - if (useFinalizer) { + if (useFinalizer && !triggerOnAllEvent()) { // note that we don't reschedule here even if instructed. Removing finalizer means that // cleanup is finished, nothing left to be done final var finalizerName = configuration().getFinalizerName(); @@ -298,7 +322,7 @@ private PostExecutionControl

handleCleanup( deleteControl, useFinalizer); PostExecutionControl

postExecutionControl = PostExecutionControl.defaultDispatch(); - updatePostExecutionControlWithReschedule(postExecutionControl, deleteControl); + updatePostExecutionControlWithReschedule(postExecutionControl, deleteControl, executionScope); return postExecutionControl; } @@ -510,4 +534,8 @@ private Resource resource(R resource) { : resourceOperation.resource(resource); } } + + private boolean triggerOnAllEvent() { + return configuration().triggerReconcilerOnAllEvent(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java index 5d4e74d681..b7636818fa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -1,10 +1,16 @@ package io.javaoperatorsdk.operator.processing.event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; class ResourceState { + private static final Logger log = LoggerFactory.getLogger(ResourceState.class); + /** * Manages the state of received events. Basically there can be only three distinct states * relevant for event processing. Either an event is received, so we eventually process or no @@ -21,6 +27,8 @@ private enum EventingState { PROCESSED_MARK_FOR_DELETION, /** Delete event present, from this point other events are not relevant */ DELETE_EVENT_PRESENT, + /** */ + ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT } private final ResourceID id; @@ -29,6 +37,8 @@ private enum EventingState { private RetryExecution retry; private EventingState eventing; private RateLimitState rateLimit; + private HasMetadata lastKnownResource; + private boolean isDeleteFinalStateUnknown; public ResourceState(ResourceID id) { this.id = id; @@ -63,26 +73,50 @@ public void setUnderProcessing(boolean underProcessing) { this.underProcessing = underProcessing; } - public void markDeleteEventReceived() { + public void markDeleteEventReceived( + HasMetadata lastKnownResource, boolean isDeleteFinalStateUnknown) { eventing = EventingState.DELETE_EVENT_PRESENT; + this.lastKnownResource = lastKnownResource; + this.isDeleteFinalStateUnknown = isDeleteFinalStateUnknown; } public boolean deleteEventPresent() { - return eventing == EventingState.DELETE_EVENT_PRESENT; + return eventing == EventingState.DELETE_EVENT_PRESENT + || eventing == EventingState.ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT; + } + + public boolean isAdditionalEventPresentAfterDeleteEvent() { + return eventing == EventingState.ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT; } public boolean processedMarkForDeletionPresent() { return eventing == EventingState.PROCESSED_MARK_FOR_DELETION; } - public void markEventReceived() { - if (deleteEventPresent()) { + public void markEventReceived(boolean isAllEventMode) { + if (!isAllEventMode && deleteEventPresent()) { throw new IllegalStateException("Cannot receive event after a delete event received"); } - eventing = EventingState.EVENT_PRESENT; + log.debug("Marking event received for: {}", getId()); + if (eventing == EventingState.DELETE_EVENT_PRESENT) { + eventing = EventingState.ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT; + } else { + eventing = EventingState.EVENT_PRESENT; + } + } + + public void markAdditionalEventAfterDeleteEvent() { + if (!deleteEventPresent()) { + throw new IllegalStateException( + "Cannot mark additional event after delete event, if in current state not delete event" + + " present"); + } + log.debug("Marking additional event after delete event: {}", getId()); + eventing = EventingState.ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT; } public void markProcessedMarkForDeletion() { + log.debug("Marking processed mark for deletion: {}", getId()); eventing = EventingState.PROCESSED_MARK_FOR_DELETION; } @@ -94,7 +128,15 @@ public boolean noEventPresent() { return eventing == EventingState.NO_EVENT_PRESENT; } - public void unMarkEventReceived() { + public boolean isDeleteFinalStateUnknown() { + return isDeleteFinalStateUnknown; + } + + public HasMetadata getLastKnownResource() { + return lastKnownResource; + } + + public void unMarkEventReceived(boolean isAllEventReconcileMode) { switch (eventing) { case EVENT_PRESENT: eventing = EventingState.NO_EVENT_PRESENT; @@ -102,7 +144,17 @@ public void unMarkEventReceived() { case PROCESSED_MARK_FOR_DELETION: throw new IllegalStateException("Cannot unmark processed marked for deletion."); case DELETE_EVENT_PRESENT: - throw new IllegalStateException("Cannot unmark delete event."); + if (!isAllEventReconcileMode) { + throw new IllegalStateException("Cannot unmark delete event."); + } + break; + case ADDITIONAL_EVENT_PRESENT_AFTER_DELETE_EVENT: + if (!isAllEventReconcileMode) { + throw new IllegalStateException( + "This state should not happen in non all-event-reconciliation mode"); + } + eventing = EventingState.DELETE_EVENT_PRESENT; + break; case NO_EVENT_PRESENT: // do nothing break; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java index 481fd317ff..b55b432d4b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java @@ -33,6 +33,10 @@ public ResourceState getOrCreate(ResourceID resourceID) { return states.computeIfAbsent(resourceID, ResourceState::new); } + public Optional get(ResourceID resourceID) { + return Optional.ofNullable(states.get(resourceID)); + } + public ResourceState remove(ResourceID resourceID) { return states.remove(resourceID); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index eb9f65eafc..a505a97702 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -62,7 +62,8 @@ public synchronized void start() { } } - public void eventReceived(ResourceAction action, T resource, T oldResource) { + public void eventReceived( + ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) { try { if (log.isDebugEnabled()) { log.debug( @@ -76,8 +77,18 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) { MDCUtils.addResourceInfo(resource); controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource); if (isAcceptedByFilters(action, resource, oldResource)) { - getEventHandler() - .handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource), resource)); + if (deletedFinalStateUnknown != null) { + getEventHandler() + .handleEvent( + new ResourceDeleteEvent( + action, + ResourceID.fromResource(resource), + resource, + deletedFinalStateUnknown)); + } else { + getEventHandler() + .handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource), resource)); + } } else { log.debug("Skipping event handling resource {}", ResourceID.fromResource(resource)); } @@ -103,19 +114,19 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso @Override public void onAdd(T resource) { super.onAdd(resource); - eventReceived(ResourceAction.ADDED, resource, null); + eventReceived(ResourceAction.ADDED, resource, null, null); } @Override public void onUpdate(T oldCustomResource, T newCustomResource) { super.onUpdate(oldCustomResource, newCustomResource); - eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource); + eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource, null); } @Override public void onDelete(T resource, boolean b) { super.onDelete(resource, b); - eventReceived(ResourceAction.DELETED, resource, null); + eventReceived(ResourceAction.DELETED, resource, null, b); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java new file mode 100644 index 0000000000..73d856e922 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.processing.event.source.controller; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class ResourceDeleteEvent extends ResourceEvent { + + private final boolean deletedFinalStateUnknown; + + public ResourceDeleteEvent( + ResourceAction action, + ResourceID resourceID, + HasMetadata resource, + boolean deletedFinalStateUnknown) { + super(action, resourceID, resource); + this.deletedFinalStateUnknown = deletedFinalStateUnknown; + } + + public boolean isDeletedFinalStateUnknown() { + return deletedFinalStateUnknown; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index 53c0d328a8..3f4d136f6c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -21,13 +21,15 @@ public class TimerEventSource extends AbstractEventSource private Timer timer; private final Map onceTasks = new ConcurrentHashMap<>(); + private boolean triggerReconcilerOnAllEvent; public TimerEventSource() { super(Void.class); } - public TimerEventSource(String name) { + public TimerEventSource(String name, boolean triggerReconcilerOnAllEvent) { super(Void.class, name); + this.triggerReconcilerOnAllEvent = triggerReconcilerOnAllEvent; } @SuppressWarnings("unused") @@ -50,7 +52,11 @@ public void scheduleOnce(ResourceID resourceID, long delay) { @Override public void onResourceDeleted(R resource) { - cancelOnceSchedule(ResourceID.fromResource(resource)); + // for triggerReconcilerOnAllEvent the cancelOnceSchedule will be called on + // successful delete event processing + if (!triggerReconcilerOnAllEvent) { + cancelOnceSchedule(ResourceID.fromResource(resource)); + } } public void cancelOnceSchedule(ResourceID customResourceUid) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java index afef9e6703..d3f0fbac68 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java @@ -32,6 +32,10 @@ public static TestCustomResource testCustomResource1() { return testCustomResource(new ResourceID("test1", "default")); } + public static ResourceID testCustomResource1Id() { + return new ResourceID("test1", "default"); + } + public static TestCustomResource testCustomResource(ResourceID id) { TestCustomResource resource = new TestCustomResource(); resource.setMetadata( diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java index b289d68b22..1f59b8912c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java @@ -18,7 +18,8 @@ class DefaultContextTest { private final Secret primary = new Secret(); private final Controller mockController = mock(); - private final DefaultContext context = new DefaultContext<>(null, mockController, primary); + private final DefaultContext context = + new DefaultContext<>(null, mockController, primary, false, false); @Test @SuppressWarnings("unchecked") diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsIntegrationTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsIntegrationTest.java new file mode 100644 index 0000000000..86cf6d2b22 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsIntegrationTest.java @@ -0,0 +1,76 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubeapitest.junit.EnableKubeAPIServer; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; + +import static org.assertj.core.api.Assertions.assertThat; + +@EnableKubeAPIServer +class PrimaryUpdateAndCacheUtilsIntegrationTest { + + public static final String DEFAULT_NS = "default"; + public static final String TEST_RESOURCE_NAME = "test1"; + public static final String FINALIZER = "int.test/finalizer"; + static KubernetesClient client; + + @Test + void testFinalizerAddAndRemoval() { + var cm = createConfigMap(); + PrimaryUpdateAndCacheUtils.addFinalizer(client, cm, FINALIZER); + + cm = getTestConfigMap(); + assertThat(cm.getMetadata().getFinalizers()).containsExactly(FINALIZER); + + PrimaryUpdateAndCacheUtils.removeFinalizer(client, cm, FINALIZER); + + cm = getTestConfigMap(); + assertThat(cm.getMetadata().getFinalizers()).isEmpty(); + client.resource(cm).delete(); + } + + private static ConfigMap getTestConfigMap() { + return client.configMaps().inNamespace(DEFAULT_NS).withName(TEST_RESOURCE_NAME).get(); + } + + @Test + void testFinalizerAddRetryOnOptimisticLockFailure() { + var cm = createConfigMap(); + // update resource, so it has a new version on the server + cm.setData(Map.of("k", "v")); + client.resource(cm).update(); + + PrimaryUpdateAndCacheUtils.addFinalizer(client, cm, FINALIZER); + + cm = getTestConfigMap(); + assertThat(cm.getMetadata().getFinalizers()).containsExactly(FINALIZER); + + cm.setData(Map.of("k2", "v2")); + client.resource(cm).update(); + + PrimaryUpdateAndCacheUtils.removeFinalizer(client, cm, FINALIZER); + cm = getTestConfigMap(); + assertThat(cm.getMetadata().getFinalizers()).isEmpty(); + + client.resource(cm).delete(); + } + + private static ConfigMap createConfigMap() { + return client + .resource( + new ConfigMapBuilder() + .withMetadata( + new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .withNamespace(DEFAULT_NS) + .build()) + .build()) + .create(); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java index 82ecdb111a..10ddc79dfd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java @@ -8,6 +8,7 @@ import io.fabric8.kubernetes.api.model.Secret; import io.javaoperatorsdk.operator.MockKubernetesClient; +import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.BaseConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.MockControllerConfiguration; @@ -23,7 +24,10 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.monitoring.Metrics.NOOP; +import static io.javaoperatorsdk.operator.processing.Controller.CLEANER_NOT_SUPPORTED_ON_ALL_EVENT_ERROR_MESSAGE; +import static io.javaoperatorsdk.operator.processing.Controller.MANAGED_WORKFLOWS_NOT_SUPPORTED_TRIGGER_RECONCILER_ON_ALL_EVENT_ERROR_MESSAGE; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -76,6 +80,54 @@ void usesFinalizerIfThereIfReconcilerImplementsCleaner() { assertThat(controller.useFinalizer()).isTrue(); } + @Test + void cleanerNotAllowedWithTriggerOnAllEventEnabled() { + Reconciler reconciler = mock(Reconciler.class, withSettings().extraInterfaces(Cleaner.class)); + final var configuration = MockControllerConfiguration.forResource(Secret.class); + when(configuration.getConfigurationService()).thenReturn(new BaseConfigurationService()); + when(configuration.triggerReconcilerOnAllEvent()).thenReturn(true); + + var exception = + assertThrows( + OperatorException.class, + () -> + new Controller( + reconciler, configuration, MockKubernetesClient.client(Secret.class))); + + assertThat(exception.getMessage()).isEqualTo(CLEANER_NOT_SUPPORTED_ON_ALL_EVENT_ERROR_MESSAGE); + } + + @Test + void managedWorkflowNotAllowedWithOnAllEventEnabled() { + Reconciler reconciler = mock(Reconciler.class); + final var configuration = MockControllerConfiguration.forResource(Secret.class); + + var configurationService = mock(ConfigurationService.class); + var mockWorkflowFactory = mock(ManagedWorkflowFactory.class); + var mockManagedWorkflow = mock(ManagedWorkflow.class); + + when(configuration.getConfigurationService()).thenReturn(configurationService); + var workflowSpec = mock(WorkflowSpec.class); + when(configuration.getWorkflowSpec()).thenReturn(Optional.of(workflowSpec)); + when(configurationService.getMetrics()).thenReturn(NOOP); + when(configurationService.getWorkflowFactory()).thenReturn(mockWorkflowFactory); + when(mockWorkflowFactory.workflowFor(any())).thenReturn(mockManagedWorkflow); + var managedWorkflowMock = workflow(true); + when(mockManagedWorkflow.resolve(any(), any())).thenReturn(managedWorkflowMock); + + when(configuration.triggerReconcilerOnAllEvent()).thenReturn(true); + + var exception = + assertThrows( + OperatorException.class, + () -> + new Controller( + reconciler, configuration, MockKubernetesClient.client(Secret.class))); + + assertThat(exception.getMessage()) + .isEqualTo(MANAGED_WORKFLOWS_NOT_SUPPORTED_TRIGGER_RECONCILER_ON_ALL_EVENT_ERROR_MESSAGE); + } + @ParameterizedTest @CsvSource({ "true, true, true, false", @@ -122,7 +174,8 @@ void callsCleanupOnWorkflowWhenHasCleanerAndReconcilerIsNotCleaner( new Controller( reconciler, configuration, MockKubernetesClient.client(Secret.class)); - controller.cleanup(new Secret(), new DefaultContext<>(null, controller, new Secret())); + controller.cleanup( + new Secret(), new DefaultContext<>(null, controller, new Secret(), false, false)); verify(managedWorkflowMock, times(workflowCleanerExecuted ? 1 : 0)).cleanup(any(), any()); } @@ -131,6 +184,7 @@ private Workflow workflow(boolean hasCleaner) { var workflow = mock(Workflow.class); when(workflow.cleanup(any(), any())).thenReturn(mock(WorkflowCleanupResult.class)); when(workflow.hasCleaner()).thenReturn(hasCleaner); + when(workflow.isEmpty()).thenReturn(false); return workflow; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 9819eb7ee9..860f843261 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.BaseConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; @@ -24,6 +25,7 @@ import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -34,6 +36,7 @@ import static io.javaoperatorsdk.operator.TestUtils.markForDeletion; import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; +import static io.javaoperatorsdk.operator.TestUtils.testCustomResource1; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; @@ -41,6 +44,7 @@ import static org.mockito.Mockito.after; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -129,7 +133,7 @@ void ifExecutionInProgressWaitsUntilItsFinished() { void schedulesAnEventRetryOnException() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null); + ExecutionScope executionScope = new ExecutionScope(null, null, false, false); executionScope.setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException("test")); @@ -271,7 +275,8 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() { var cr = testCustomResource(crID); eventProcessor.eventProcessingFinished( - new ExecutionScope(null).setResource(cr), PostExecutionControl.defaultDispatch()); + new ExecutionScope(null, null, false, false).setResource(cr), + PostExecutionControl.defaultDispatch()); verify(retryTimerEventSourceMock, times(1)).cancelOnceSchedule(eq(crID)); } @@ -324,7 +329,8 @@ void startProcessedMarkedEventReceivedBefore() { @Test void notUpdatesEventSourceHandlerIfResourceUpdated() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceStatusPatched(customResource); @@ -337,7 +343,8 @@ void notUpdatesEventSourceHandlerIfResourceUpdated() { void notReschedulesAfterTheFinalizerRemoveProcessed() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); @@ -350,7 +357,8 @@ void notReschedulesAfterTheFinalizerRemoveProcessed() { void skipEventProcessingIfFinalizerRemoveProcessed() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); @@ -367,7 +375,8 @@ void skipEventProcessingIfFinalizerRemoveProcessed() { void newResourceAfterMissedDeleteEvent() { TestCustomResource customResource = testCustomResource(); markForDeletion(customResource); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.customResourceFinalizerRemoved(customResource); var newResource = testCustomResource(); @@ -403,7 +412,8 @@ void rateLimitsReconciliationSubmission() { @Test void schedulesRetryForMarReconciliationInterval() { TestCustomResource customResource = testCustomResource(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(customResource); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(customResource); PostExecutionControl postExecutionControl = PostExecutionControl.defaultDispatch(); eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); @@ -425,7 +435,8 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { eventSourceManagerMock, metricsMock)); eventProcessorWithRetry.start(); - ExecutionScope executionScope = new ExecutionScope(null).setResource(testCustomResource()); + ExecutionScope executionScope = + new ExecutionScope(null, null, false, false).setResource(testCustomResource()); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException()); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); @@ -454,7 +465,7 @@ void executionOfReconciliationShouldNotStartIfProcessorStopped() throws Interrup eventProcessor = spy( new EventProcessor( - controllerConfiguration(null, rateLimiterMock, configurationService), + controllerConfiguration(null, rateLimiterMock, configurationService, false), reconciliationDispatcherMock, eventSourceManagerMock, null)); @@ -482,11 +493,202 @@ void cleansUpForDeleteEventEvenIfProcessorNotStarted() { null)); eventProcessor.handleEvent(prepareCREvent(resourceID)); - eventProcessor.handleEvent(new ResourceEvent(ResourceAction.DELETED, resourceID, null)); + eventProcessor.handleEvent( + new ResourceDeleteEvent( + ResourceAction.DELETED, resourceID, TestUtils.testCustomResource(), true)); eventProcessor.handleEvent(prepareCREvent(resourceID)); // no exception thrown } + @Test + void triggerOnAllEventProcessesDeleteEvent() { + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()); + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + + verify(reconciliationDispatcherMock, times(2)).handleExecution(any()); + } + + // this is a special corner case that needs special care + @Test + void triggerOnAllEventDeleteEventInstantlyAfterEvent() { + var reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()); + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + verify(reconciliationDispatcherMock, times(1)).handleExecution(any()); + } + + @Test + void triggerOnAllEventDeleteEventAfterEventProcessed() { + var reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()); + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + verify(reconciliationDispatcherMock, times(2)).handleExecution(any()); + } + + @Test + void triggerOnAllEventRetriesDeleteEventError() { + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent( + GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()) + .thenReturn(PostExecutionControl.exceptionDuringExecution(new RuntimeException())) + .thenReturn(PostExecutionControl.defaultDispatch()); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + // retry event + eventProcessor.handleEvent(new Event(TestUtils.testCustomResource1Id())); + + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + + verify(reconciliationDispatcherMock, times(3)).handleExecution(any()); + } + + @Test + void processesAdditionalEventWhileInDeleteModeRetry() { + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent( + GenericRetry.defaultLimitedExponentialRetry() + .setInitialInterval(1000) + .setMaxAttempts(-1), + rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()) + .thenReturn(PostExecutionControl.exceptionDuringExecution(new RuntimeException())); + + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + verify(reconciliationDispatcherMock, atMost(2)).handleExecution(any()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + // retry event + eventProcessor.handleEvent(new Event(TestUtils.testCustomResource1Id())); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + verify(reconciliationDispatcherMock, times(3)).handleExecution(any()); + } + + @Test + void afterRetryExhaustedAdditionalEventTriggerReconciliationWhenDeleteEventPresent() { + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent( + GenericRetry.defaultLimitedExponentialRetry() + .setInitialInterval(100) + .setIntervalMultiplier(1) + .setMaxAttempts(1), + rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()) + .thenReturn(PostExecutionControl.exceptionDuringExecution(new RuntimeException())); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + eventProcessor.handleEvent(new Event(TestUtils.testCustomResource1Id())); + await() + .untilAsserted(() -> verify(reconciliationDispatcherMock, times(3)).handleExecution(any())); + + eventProcessor.handleEvent(new Event(TestUtils.testCustomResource1Id())); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + verify(reconciliationDispatcherMock, times(4)).handleExecution(any()); + } + + @Test + void passesResourceFromStateToDispatcher() { + eventProcessor = + spy( + new EventProcessor( + controllerConfigTriggerAllEvent(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, + null)); + when(reconciliationDispatcherMock.handleExecution(any())) + .thenReturn(PostExecutionControl.defaultDispatch()); + when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class)); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + + eventProcessor.handleEvent(prepareCRDeleteEvent1()); + waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id()); + var captor = ArgumentCaptor.forClass(ExecutionScope.class); + verify(reconciliationDispatcherMock, times(2)).handleExecution(captor.capture()); + assertThat(captor.getAllValues().get(1).getResource()).isNotNull(); + } + + @Test + void onAllEventRateLimiting() {} + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( @@ -504,6 +706,20 @@ private ResourceEvent prepareCREvent() { return prepareCREvent(new ResourceID(UUID.randomUUID().toString(), TEST_NAMESPACE)); } + private ResourceEvent prepareCREvent1() { + return prepareCREvent(testCustomResource1()); + } + + private ResourceEvent prepareCRDeleteEvent1() { + when(controllerEventSourceMock.get(eq(TestUtils.testCustomResource1Id()))) + .thenReturn(Optional.empty()); + return new ResourceDeleteEvent( + ResourceAction.DELETED, + ResourceID.fromResource(TestUtils.testCustomResource1()), + TestUtils.testCustomResource1(), + true); + } + private ResourceEvent prepareCREvent(HasMetadata hasMetadata) { when(controllerEventSourceMock.get(eq(ResourceID.fromResource(hasMetadata)))) .thenReturn(Optional.of(hasMetadata)); @@ -528,17 +744,25 @@ private void overrideData(ResourceID id, HasMetadata applyTo) { } ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLimiter) { - return controllerConfiguration(retry, rateLimiter, new BaseConfigurationService()); + return controllerConfiguration(retry, rateLimiter, new BaseConfigurationService(), false); + } + + ControllerConfiguration controllerConfigTriggerAllEvent(Retry retry, RateLimiter rateLimiter) { + return controllerConfiguration(retry, rateLimiter, new BaseConfigurationService(), true); } ControllerConfiguration controllerConfiguration( - Retry retry, RateLimiter rateLimiter, ConfigurationService configurationService) { + Retry retry, + RateLimiter rateLimiter, + ConfigurationService configurationService, + boolean triggerOnAllEvent) { ControllerConfiguration res = mock(ControllerConfiguration.class); when(res.getName()).thenReturn("Test"); when(res.getRetry()).thenReturn(retry); when(res.getRateLimiter()).thenReturn(rateLimiter); when(res.maxReconciliationInterval()).thenReturn(Optional.of(Duration.ofMillis(1000))); when(res.getConfigurationService()).thenReturn(configurationService); + when(res.triggerReconcilerOnAllEvent()).thenReturn(triggerOnAllEvent); return res; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 89f3655356..4b13f2e29d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -394,6 +394,7 @@ void propagatesRetryInfoToContextIfFinalizerSet() { reconciliationDispatcher.handleExecution( new ExecutionScope( + null, new RetryInfo() { @Override public int getAttemptCount() { @@ -404,7 +405,9 @@ public int getAttemptCount() { public boolean isLastAttempt() { return true; } - }) + }, + false, + false) .setResource(testCustomResource)); ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Context.class); @@ -495,6 +498,7 @@ void callErrorStatusHandlerIfImplemented() { reconciliationDispatcher.handleExecution( new ExecutionScope( + null, new RetryInfo() { @Override public int getAttemptCount() { @@ -505,7 +509,9 @@ public int getAttemptCount() { public boolean isLastAttempt() { return true; } - }) + }, + false, + false) .setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); @@ -528,7 +534,7 @@ void callErrorStatusHandlerEvenOnFirstError() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); assertThat(postExecControl.exceptionDuringExecution()).isTrue(); @@ -549,7 +555,7 @@ void errorHandlerCanInstructNoRetryWithUpdate() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); @@ -571,7 +577,7 @@ void errorHandlerCanInstructNoRetryNoUpdate() { var postExecControl = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); verify(customResourceFacade, times(0)).patchStatus(eq(testCustomResource), any()); @@ -588,7 +594,7 @@ void errorStatusHandlerCanPatchResource() { reconciler.errorHandler = () -> ErrorStatusUpdateControl.patchStatus(testCustomResource); reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any()); verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any()); @@ -611,7 +617,7 @@ void ifRetryLimitedToZeroMaxAttemptsErrorHandlerGetsCorrectLastAttempt() { reconciler.errorHandler = () -> ErrorStatusUpdateControl.noStatusUpdate(); reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); verify(reconciler, times(1)) .updateErrorStatus( @@ -675,7 +681,7 @@ void reSchedulesFromErrorHandler() { var res = reconciliationDispatcher.handleExecution( - new ExecutionScope(null).setResource(testCustomResource)); + new ExecutionScope(null, null, false, false).setResource(testCustomResource)); assertThat(res.getReScheduleDelay()).contains(delay); assertThat(res.getRuntimeException()).isEmpty(); @@ -703,6 +709,9 @@ void reconcilerContextUsesTheSameInstanceOfResourceAsParam() { .isNotSameAs(testCustomResource); } + @Test + void procAllEventModeNoReSchedulesAllowedForDeleteEvent() {} + private ObservedGenCustomResource createObservedGenCustomResource() { ObservedGenCustomResource observedGenCustomResource = new ObservedGenCustomResource(); observedGenCustomResource.setMetadata(new ObjectMeta()); @@ -723,7 +732,7 @@ private void removeFinalizers(CustomResource customResource) { } public ExecutionScope executionScopeWithCREvent(T resource) { - return (ExecutionScope) new ExecutionScope<>(null).setResource(resource); + return (ExecutionScope) new ExecutionScope<>(null, null, false, false).setResource(resource); } private class TestReconciler diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java index 487ba25885..626b833c38 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java @@ -34,7 +34,7 @@ public void returnsNoEventPresentIfNotMarkedYet() { @Test public void marksEvent() { - state.markEventReceived(); + state.markEventReceived(false); assertThat(state.eventPresent()).isTrue(); assertThat(state.deleteEventPresent()).isFalse(); @@ -42,7 +42,7 @@ public void marksEvent() { @Test public void marksDeleteEvent() { - state.markDeleteEventReceived(); + state.markDeleteEventReceived(TestUtils.testCustomResource(), true); assertThat(state.deleteEventPresent()).isTrue(); assertThat(state.eventPresent()).isFalse(); @@ -50,9 +50,9 @@ public void marksDeleteEvent() { @Test public void afterDeleteEventMarkEventIsNotRelevant() { - state.markEventReceived(); + state.markEventReceived(false); - state.markDeleteEventReceived(); + state.markDeleteEventReceived(TestUtils.testCustomResource(), true); assertThat(state.deleteEventPresent()).isTrue(); assertThat(state.eventPresent()).isFalse(); @@ -60,8 +60,8 @@ public void afterDeleteEventMarkEventIsNotRelevant() { @Test public void cleansUp() { - state.markEventReceived(); - state.markDeleteEventReceived(); + state.markEventReceived(false); + state.markDeleteEventReceived(TestUtils.testCustomResource(), true); manager.remove(sampleResourceID); @@ -75,16 +75,16 @@ public void cannotMarkEventAfterDeleteEventReceived() { Assertions.assertThrows( IllegalStateException.class, () -> { - state.markDeleteEventReceived(); - state.markEventReceived(); + state.markDeleteEventReceived(TestUtils.testCustomResource(), true); + state.markEventReceived(false); }); } @Test public void listsResourceIDSWithEventsPresent() { - state.markEventReceived(); - state2.markEventReceived(); - state.unMarkEventReceived(); + state.markEventReceived(false); + state2.markEventReceived(false); + state.unMarkEventReceived(false); var res = manager.resourcesWithEventPresent(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index 6548bbddc7..f6e4be31c9 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -53,10 +53,10 @@ void skipsEventHandlingIfGenerationNotIncreased() { TestCustomResource oldCustomResource = TestUtils.testCustomResource(); oldCustomResource.getMetadata().setFinalizers(List.of(FINALIZER)); - source.eventReceived(ResourceAction.UPDATED, customResource, oldCustomResource); + source.eventReceived(ResourceAction.UPDATED, customResource, oldCustomResource, null); verify(eventHandler, times(1)).handleEvent(any()); - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); + source.eventReceived(ResourceAction.UPDATED, customResource, customResource, null); verify(eventHandler, times(1)).handleEvent(any()); } @@ -64,12 +64,12 @@ void skipsEventHandlingIfGenerationNotIncreased() { void dontSkipEventHandlingIfMarkedForDeletion() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); // mark for deletion customResource1.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString()); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -77,11 +77,11 @@ void dontSkipEventHandlingIfMarkedForDeletion() { void normalExecutionIfGenerationChanges() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); customResource1.getMetadata().setGeneration(2L); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -92,10 +92,10 @@ void handlesAllEventIfNotGenerationAware() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -103,7 +103,7 @@ void handlesAllEventIfNotGenerationAware() { void eventWithNoGenerationProcessedIfNoFinalizer() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); } @@ -112,7 +112,7 @@ void eventWithNoGenerationProcessedIfNoFinalizer() { void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null); verify(testController.getEventSourceManager(), times(1)) .broadcastOnResourceEvent( @@ -128,8 +128,8 @@ void filtersOutEventsOnAddAndUpdate() { source = new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null)); setUpSource(source, true, controllerConfig); - source.eventReceived(ResourceAction.ADDED, cr, null); - source.eventReceived(ResourceAction.UPDATED, cr, cr); + source.eventReceived(ResourceAction.ADDED, cr, null, null); + source.eventReceived(ResourceAction.UPDATED, cr, cr, null); verify(eventHandler, never()).handleEvent(any()); } @@ -141,9 +141,9 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() { source = new ControllerEventSource<>(new TestController(null, null, res -> false)); setUpSource(source, true, controllerConfig); - source.eventReceived(ResourceAction.ADDED, cr, null); - source.eventReceived(ResourceAction.UPDATED, cr, cr); - source.eventReceived(ResourceAction.DELETED, cr, cr); + source.eventReceived(ResourceAction.ADDED, cr, null, null); + source.eventReceived(ResourceAction.UPDATED, cr, cr, null); + source.eventReceived(ResourceAction.DELETED, cr, cr, true); verify(eventHandler, never()).handleEvent(any()); } @@ -208,7 +208,8 @@ public TestConfiguration( .withOnAddFilter(onAddFilter) .withOnUpdateFilter(onUpdateFilter) .withGenericFilter(genericFilter) - .buildForController()); + .buildForController(), + false); } } } diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java index 0ebcef2d5c..1f96fb4b87 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java @@ -126,8 +126,12 @@ public T serverSideApply(T resource) { return kubernetesClient.resource(resource).inNamespace(namespace).serverSideApply(); } + public T update(T resource) { + return kubernetesClient.resource(resource).inNamespace(namespace).update(); + } + public T replace(T resource) { - return kubernetesClient.resource(resource).inNamespace(namespace).replace(); + return kubernetesClient.resource(resource).inNamespace(namespace).replace(resource); } public boolean delete(T resource) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventCustomResource.java new file mode 100644 index 0000000000..59cb1e3d02 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventCustomResource.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("aecs") +public class TriggerReconcilerOnAllEventCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventIT.java new file mode 100644 index 0000000000..659b02c4c3 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventIT.java @@ -0,0 +1,241 @@ +package io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; + +import static io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile.TriggerReconcilerOnAllEventReconciler.ADDITIONAL_FINALIZER; +import static io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile.TriggerReconcilerOnAllEventReconciler.FINALIZER; +import static io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile.TriggerReconcilerOnAllEventReconciler.NO_MORE_EXCEPTION_ANNOTATION_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class TriggerReconcilerOnAllEventIT { + + public static final String TEST = "test1"; + public static final int MAX_RETRY_ATTEMPTS = 2; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler( + new TriggerReconcilerOnAllEventReconciler(), + o -> + o.withRetry( + new GenericRetry() + .setInitialInterval(800) + .setMaxAttempts(MAX_RETRY_ATTEMPTS) + .setIntervalMultiplier(1))) + .build(); + + @Test + void eventsPresent() { + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + extension.serverSideApply(testResource()); + await() + .untilAsserted( + () -> { + assertThat(reconciler.isResourceEventPresent()).isTrue(); + assertThat(getResource().hasFinalizer(FINALIZER)).isTrue(); + }); + + extension.delete(getResource()); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r).isNull(); + assertThat(reconciler.isDeleteEventPresent()).isTrue(); + assertThat(reconciler.isEventOnMarkedForDeletion()).isTrue(); + }); + } + + @Test + void deleteEventPresentWithoutFinalizer() { + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + reconciler.setUseFinalizer(false); + extension.serverSideApply(testResource()); + + await().untilAsserted(() -> assertThat(reconciler.isResourceEventPresent()).isTrue()); + + extension.delete(getResource()); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r).isNull(); + assertThat(reconciler.isDeleteEventPresent()).isTrue(); + }); + } + + @Test + void retriesExceptionOnDeleteEvent() { + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + reconciler.setUseFinalizer(false); + reconciler.setThrowExceptionOnFirstDeleteEvent(true); + + extension.serverSideApply(testResource()); + + await().untilAsserted(() -> assertThat(reconciler.isResourceEventPresent()).isTrue()); + + extension.delete(getResource()); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r).isNull(); + assertThat(reconciler.isDeleteEventPresent()).isTrue(); + }); + } + + @Test + void additionalFinalizer() { + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + reconciler.setUseFinalizer(true); + var res = testResource(); + res.addFinalizer(ADDITIONAL_FINALIZER); + + extension.create(res); + + extension.delete(getResource()); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r).isNotNull(); + assertThat(r.getMetadata().getFinalizers()).containsExactly(ADDITIONAL_FINALIZER); + }); + var eventCount = reconciler.getEventCount(); + + res = getResource(); + res.removeFinalizer(ADDITIONAL_FINALIZER); + extension.update(res); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r).isNull(); + assertThat(reconciler.getEventCount()).isEqualTo(eventCount + 1); + }); + } + + @Test + void additionalEventDuringRetryOnDeleteEvent() { + + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + reconciler.setThrowExceptionIfNoAnnotation(true); + reconciler.setWaitAfterFirstRetry(true); + var res = testResource(); + res.addFinalizer(ADDITIONAL_FINALIZER); + extension.create(res); + extension.delete(getResource()); + + await() + .pollDelay(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.getEventCount()).isGreaterThan(2); + }); + var eventCount = reconciler.getEventCount(); + + await() + .untilAsserted( + () -> { + assertThat(reconciler.isWaiting()); + }); + + res = getResource(); + res.getMetadata().getAnnotations().put("my-annotation", "true"); + extension.update(res); + reconciler.setContinuerOnRetryWait(true); + + await() + .pollDelay(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.getEventCount()).isEqualTo(eventCount + 1); + }); + + // second retry + await() + .pollDelay(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.getEventCount()).isEqualTo(eventCount + 2); + }); + + addNoMoreExceptionAnnotation(); + + await() + .untilAsserted( + () -> { + var r = getResource(); + assertThat(r.getMetadata().getFinalizers()).doesNotContain(FINALIZER); + }); + + removeAdditionalFinalizerWaitForResourceDeletion(); + } + + @Test + void additionalEventAfterExhaustedRetry() { + + var reconciler = extension.getReconcilerOfType(TriggerReconcilerOnAllEventReconciler.class); + reconciler.setThrowExceptionIfNoAnnotation(true); + var res = testResource(); + res.addFinalizer(ADDITIONAL_FINALIZER); + extension.create(res); + extension.delete(getResource()); + + await() + .pollDelay(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.getEventCount()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); + }); + + addNoMoreExceptionAnnotation(); + + await() + .pollDelay(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.getEventCount()).isGreaterThan(MAX_RETRY_ATTEMPTS + 1); + }); + + removeAdditionalFinalizerWaitForResourceDeletion(); + } + + private void removeAdditionalFinalizerWaitForResourceDeletion() { + var res = getResource(); + res.removeFinalizer(ADDITIONAL_FINALIZER); + extension.update(res); + await().untilAsserted(() -> assertThat(getResource()).isNull()); + } + + private void addNoMoreExceptionAnnotation() { + TriggerReconcilerOnAllEventCustomResource res; + res = getResource(); + res.getMetadata().getAnnotations().put(NO_MORE_EXCEPTION_ANNOTATION_KEY, "true"); + extension.update(res); + } + + TriggerReconcilerOnAllEventCustomResource getResource() { + return extension.get(TriggerReconcilerOnAllEventCustomResource.class, TEST); + } + + TriggerReconcilerOnAllEventCustomResource testResource() { + var res = new TriggerReconcilerOnAllEventCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST).build()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventReconciler.java new file mode 100644 index 0000000000..5dee44a300 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventReconciler.java @@ -0,0 +1,184 @@ +package io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +@ControllerConfiguration(triggerReconcilerOnAllEvent = true, generationAwareEventProcessing = false) +public class TriggerReconcilerOnAllEventReconciler + implements Reconciler { + + private static final Logger log = + LoggerFactory.getLogger(TriggerReconcilerOnAllEventReconciler.class); + + private volatile boolean throwExceptionOnFirstDeleteEvent = false; + private volatile boolean throwExceptionIfNoAnnotation = false; + + private volatile boolean waitAfterFirstRetry = false; + private volatile boolean continuerOnRetryWait = false; + private volatile boolean waiting = false; + + private volatile boolean isFirstDeleteEvent = true; + + public static final String FINALIZER = "all.event.mode/finalizer"; + public static final String ADDITIONAL_FINALIZER = "all.event.mode/finalizer2"; + public static final String NO_MORE_EXCEPTION_ANNOTATION_KEY = "no.more.exception"; + + protected volatile boolean useFinalizer = true; + + private final AtomicInteger eventCounter = new AtomicInteger(0); + + private boolean deleteEventPresent = false; + private boolean eventOnMarkedForDeletion = false; + private boolean resourceEventPresent = false; + + @Override + public UpdateControl reconcile( + TriggerReconcilerOnAllEventCustomResource primary, + Context context) + throws InterruptedException { + log.info("Reconciling"); + increaseEventCount(); + + if (!primary.isMarkedForDeletion()) { + setResourceEventPresent(true); + } + + if (!primary.isMarkedForDeletion() && getUseFinalizer() && !primary.hasFinalizer(FINALIZER)) { + log.info("Adding finalizer"); + PrimaryUpdateAndCacheUtils.addFinalizer(context, FINALIZER); + return UpdateControl.noUpdate(); + } + + if (waitAfterFirstRetry + && context.getRetryInfo().isPresent() + && context.getRetryInfo().orElseThrow().getAttemptCount() == 1) { + waiting = true; + while (!continuerOnRetryWait) { + Thread.sleep(50); + } + waiting = false; + } + + if (throwExceptionIfNoAnnotation + && !primary.getMetadata().getAnnotations().containsKey(NO_MORE_EXCEPTION_ANNOTATION_KEY)) { + throw new RuntimeException("On purpose exception for missing annotation"); + } + + if (primary.isMarkedForDeletion() && !context.isPrimaryResourceDeleted()) { + setEventOnMarkedForDeletion(true); + if (getUseFinalizer() && primary.hasFinalizer(FINALIZER)) { + log.info("Removing finalizer"); + PrimaryUpdateAndCacheUtils.removeFinalizer(context, FINALIZER); + } + } + + if (context.isPrimaryResourceDeleted() + && isFirstDeleteEvent() + && isThrowExceptionOnFirstDeleteEvent()) { + isFirstDeleteEvent = false; + throw new RuntimeException("On purpose exception"); + } + + if (context.isPrimaryResourceDeleted()) { + setDeleteEventPresent(true); + } + log.info("Reconciliation finished"); + return UpdateControl.noUpdate(); + } + + public boolean isFirstDeleteEvent() { + return isFirstDeleteEvent; + } + + public void setFirstDeleteEvent(boolean firstDeleteEvent) { + isFirstDeleteEvent = firstDeleteEvent; + } + + public boolean isThrowExceptionOnFirstDeleteEvent() { + return throwExceptionOnFirstDeleteEvent; + } + + public void setThrowExceptionOnFirstDeleteEvent(boolean throwExceptionOnFirstDeleteEvent) { + this.throwExceptionOnFirstDeleteEvent = throwExceptionOnFirstDeleteEvent; + } + + public boolean isThrowExceptionIfNoAnnotation() { + return throwExceptionIfNoAnnotation; + } + + public void setThrowExceptionIfNoAnnotation(boolean throwExceptionIfNoAnnotation) { + this.throwExceptionIfNoAnnotation = throwExceptionIfNoAnnotation; + } + + public boolean isWaitAfterFirstRetry() { + return waitAfterFirstRetry; + } + + public void setWaitAfterFirstRetry(boolean waitAfterFirstRetry) { + this.waitAfterFirstRetry = waitAfterFirstRetry; + } + + public boolean isContinuerOnRetryWait() { + return continuerOnRetryWait; + } + + public void setContinuerOnRetryWait(boolean continuerOnRetryWait) { + this.continuerOnRetryWait = continuerOnRetryWait; + } + + public boolean isWaiting() { + return waiting; + } + + public void setWaiting(boolean waiting) { + this.waiting = waiting; + } + + public int getEventCount() { + return eventCounter.get(); + } + + public void increaseEventCount() { + eventCounter.incrementAndGet(); + } + + public boolean getUseFinalizer() { + return useFinalizer; + } + + public void setUseFinalizer(boolean useFinalizer) { + this.useFinalizer = useFinalizer; + } + + public boolean isDeleteEventPresent() { + return deleteEventPresent; + } + + public void setDeleteEventPresent(boolean deleteEventPresent) { + this.deleteEventPresent = deleteEventPresent; + } + + public boolean isEventOnMarkedForDeletion() { + return eventOnMarkedForDeletion; + } + + public void setEventOnMarkedForDeletion(boolean eventOnMarkedForDeletion) { + this.eventOnMarkedForDeletion = eventOnMarkedForDeletion; + } + + public boolean isResourceEventPresent() { + return resourceEventPresent; + } + + public void setResourceEventPresent(boolean resourceEventPresent) { + this.resourceEventPresent = resourceEventPresent; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventSpec.java new file mode 100644 index 0000000000..9254be3b25 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/triggerallevent/onlyreconcile/TriggerReconcilerOnAllEventSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.triggerallevent.onlyreconcile; + +public class TriggerReconcilerOnAllEventSpec { + + private String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/sample-operators/mysql-schema/src/main/resources/log4j2.xml b/sample-operators/mysql-schema/src/main/resources/log4j2.xml index 01484221f9..5ab4735126 100644 --- a/sample-operators/mysql-schema/src/main/resources/log4j2.xml +++ b/sample-operators/mysql-schema/src/main/resources/log4j2.xml @@ -6,7 +6,7 @@ - +