/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.AbstractServiceConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.EndpointHealth;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.BucketServiceMapping;
import com.couchbase.client.core.service.EndpointStateZipper;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.strategies.SelectionStrategy;
import com.couchbase.client.core.state.AbstractStateMachine;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;

public abstract class PooledService
extends AbstractStateMachine<LifecycleState>
implements Service {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Service.class);
    private final String hostname;
    private final String bucket;
    private final String username;
    private final String password;
    private final int port;
    private final CoreEnvironment env;
    private final CoreContext ctx;
    private final int minEndpoints;
    private final int maxEndpoints;
    private final boolean fixedEndpoints;
    private final EndpointStateZipper endpointStates;
    private final RingBuffer<ResponseEvent> responseBuffer;
    private final Service.EndpointFactory endpointFactory;
    private final List<Endpoint> endpoints;
    private final LifecycleState initialState;
    private final SelectionStrategy selectionStrategy;
    private final Subscription idleSubscription;
    private final Object epMutex = new Object();
    private volatile int pendingRequests;
    private volatile boolean disconnect;

    PooledService(final String hostname, String bucket, String username, String password, int port, CoreContext ctx, final AbstractServiceConfig serviceConfig, Service.EndpointFactory endpointFactory, SelectionStrategy selectionStrategy) {
        super(serviceConfig.minEndpoints() == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED);
        this.preCheckEndpointSettings(serviceConfig);
        this.initialState = serviceConfig.minEndpoints() == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED;
        this.hostname = hostname;
        this.bucket = bucket;
        this.username = username;
        this.password = password;
        this.port = port;
        this.env = ctx.environment();
        this.ctx = ctx;
        this.minEndpoints = serviceConfig.minEndpoints();
        this.maxEndpoints = serviceConfig.maxEndpoints();
        this.responseBuffer = ctx.responseRingBuffer();
        this.endpointFactory = endpointFactory;
        this.endpoints = new CopyOnWriteArrayList<Endpoint>();
        this.fixedEndpoints = this.minEndpoints == this.maxEndpoints;
        this.selectionStrategy = selectionStrategy;
        this.pendingRequests = 0;
        this.disconnect = false;
        this.endpointStates = new EndpointStateZipper(this.initialState);
        this.endpointStates.states().subscribe((Action1)new Action1<LifecycleState>(){

            public void call(LifecycleState lifecycleState) {
                PooledService.this.transitionState(lifecycleState);
            }
        });
        this.idleSubscription = serviceConfig.idleTime() == 0 ? null : Observable.interval((long)serviceConfig.idleTime(), (TimeUnit)TimeUnit.SECONDS, (Scheduler)this.env.scheduler()).subscribe((Subscriber)new Subscriber<Long>(){

            public void onCompleted() {
                LOGGER.trace("Completed Idle Timer Subscription");
            }

            public void onError(Throwable e) {
                LOGGER.warn("Error while subscribing to Idle Timer", e);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onNext(Long aLong) {
                ArrayList<Endpoint> toDisconnect = new ArrayList<Endpoint>();
                Iterator iterator = PooledService.this.epMutex;
                synchronized (iterator) {
                    boolean removed;
                    int maxToRemove = PooledService.this.endpoints.size() - PooledService.this.minEndpoints;
                    do {
                        removed = false;
                        for (int i = 0; i < PooledService.this.endpoints.size(); ++i) {
                            Endpoint e = (Endpoint)PooledService.this.endpoints.get(i);
                            if (e == null) continue;
                            long diffs = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - e.lastResponse());
                            if (!e.isFree() || diffs < (long)serviceConfig.idleTime()) continue;
                            if (maxToRemove > 0) {
                                --maxToRemove;
                            } else {
                                LOGGER.debug("Would remove {}, but minimum threshold reached, ignoring for this run.", (Object)PooledService.logIdent(hostname, PooledService.this));
                                continue;
                            }
                            LOGGER.debug(PooledService.logIdent(hostname, PooledService.this) + "Endpoint {} idle for longer than {}s, disconnecting.", (Object)e, (Object)serviceConfig.idleTime());
                            PooledService.this.endpoints.remove(i);
                            PooledService.this.endpointStates.deregister(e);
                            removed = true;
                            toDisconnect.add(e);
                            LOGGER.debug(PooledService.logIdent(hostname, PooledService.this) + "New number of endpoints is {}", (Object)PooledService.this.endpoints.size());
                        }
                    } while (removed);
                }
                for (Endpoint ep : toDisconnect) {
                    ep.disconnect().subscribe((Subscriber)new Subscriber<LifecycleState>(){

                        public void onCompleted() {
                        }

                        public void onError(Throwable e) {
                            LOGGER.warn("Got an error while disconnecting endpoint!", e);
                        }

                        public void onNext(LifecycleState state) {
                        }
                    });
                }
                PooledService.this.ensureMinimum();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureMinimum() {
        int belowMin = this.minEndpoints - this.endpoints.size();
        if (belowMin > 0) {
            LOGGER.debug(PooledService.logIdent(this.hostname, this) + "Service is {} below minimum, filling up.", (Object)belowMin);
            Object object = this.epMutex;
            synchronized (object) {
                for (int i = 0; i < belowMin; ++i) {
                    Endpoint endpoint = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
                    this.endpoints.add(endpoint);
                    this.endpointStates.register(endpoint, endpoint);
                    endpoint.connect().subscribe((Subscriber)new Subscriber<LifecycleState>(){

                        public void onCompleted() {
                        }

                        public void onError(Throwable e) {
                            LOGGER.warn("Got an error while connecting endpoint!", e);
                        }

                        public void onNext(LifecycleState state) {
                        }
                    });
                }
                LOGGER.debug(PooledService.logIdent(this.hostname, this) + "New number of endpoints is {}", (Object)this.endpoints.size());
            }
        }
    }

    private void preCheckEndpointSettings(AbstractServiceConfig serviceConfig) {
        int minEndpoints = serviceConfig.minEndpoints();
        int maxEndpoints = serviceConfig.maxEndpoints();
        boolean pipelining = serviceConfig.isPipelined();
        if (minEndpoints < 0 || maxEndpoints < 0) {
            throw new IllegalArgumentException("The minEndpoints and maxEndpoints must not be negative");
        }
        if (maxEndpoints == 0) {
            throw new IllegalArgumentException("The maxEndpoints must be greater than 0");
        }
        if (maxEndpoints < minEndpoints) {
            throw new IllegalArgumentException("The maxEndpoints must not be smaller than mindEndpoints");
        }
        if (pipelining && minEndpoints != maxEndpoints) {
            throw new IllegalArgumentException("Pipelining and non-fixed size of endpoints is currently not supported.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Observable<LifecycleState> connect() {
        if (this.state() == LifecycleState.CONNECTED || this.state() == LifecycleState.CONNECTING) {
            LOGGER.debug(PooledService.logIdent(this.hostname, this) + "Already connected or connecting, skipping connect.");
            return Observable.just(this.state());
        }
        LOGGER.debug(PooledService.logIdent(this.hostname, this) + "Got instructed to connect.");
        Object object = this.epMutex;
        synchronized (object) {
            int numToConnect = this.minEndpoints - this.endpoints.size();
            if (numToConnect == 0) {
                LOGGER.debug("No endpoints needed to connect, skipping.");
                return Observable.just(this.state());
            }
            for (int i = 0; i < numToConnect; ++i) {
                Endpoint endpoint = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
                this.endpoints.add(endpoint);
                this.endpointStates.register(endpoint, endpoint);
            }
            LOGGER.debug(PooledService.logIdent(this.hostname, this) + "New number of endpoints is {}", (Object)this.endpoints.size());
        }
        return Observable.from(this.endpoints).flatMap((Func1)new Func1<Endpoint, Observable<LifecycleState>>(){

            public Observable<LifecycleState> call(Endpoint endpoint) {
                LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "Connecting Endpoint during Service connect.");
                return endpoint.connect();
            }
        }).lastOrDefault((Object)this.initialState).map((Func1)new Func1<LifecycleState, LifecycleState>(){

            public LifecycleState call(LifecycleState state) {
                return (LifecycleState)((Object)PooledService.this.state());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Observable<LifecycleState> disconnect() {
        ArrayList<Endpoint> endpoints;
        this.disconnect = true;
        LOGGER.debug(PooledService.logIdent(this.hostname, this) + "Got instructed to disconnect.");
        Object object = this.epMutex;
        synchronized (object) {
            endpoints = new ArrayList<Endpoint>(this.endpoints);
            this.endpoints.clear();
            LOGGER.debug(PooledService.logIdent(this.hostname, this) + "New number of endpoints is {}", (Object)endpoints.size());
        }
        return Observable.from(endpoints).flatMap((Func1)new Func1<Endpoint, Observable<LifecycleState>>(){

            public Observable<LifecycleState> call(Endpoint endpoint) {
                LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "Disconnecting Endpoint during Service disconnect.");
                return endpoint.disconnect();
            }
        }).lastOrDefault((Object)this.initialState).map((Func1)new Func1<LifecycleState, LifecycleState>(){

            public LifecycleState call(LifecycleState state) {
                PooledService.this.endpointStates.terminate();
                if (PooledService.this.idleSubscription != null && !PooledService.this.idleSubscription.isUnsubscribed()) {
                    PooledService.this.idleSubscription.unsubscribe();
                }
                return (LifecycleState)((Object)PooledService.this.state());
            }
        });
    }

    @Override
    public void send(CouchbaseRequest request) {
        Endpoint endpoint;
        if (request instanceof SignalFlush) {
            this.sendFlush((SignalFlush)request);
            return;
        }
        Endpoint endpoint2 = endpoint = this.endpoints.size() > 0 ? this.selectionStrategy.select(request, this.endpoints) : null;
        if (!request.isActive()) {
            return;
        }
        if (endpoint == null) {
            if (this.fixedEndpoints || this.endpoints.size() + this.pendingRequests >= this.maxEndpoints) {
                RetryHelper.retryOrCancel(this.env, request, this.responseBuffer);
            } else {
                this.maybeOpenAndSend(request);
            }
        } else {
            endpoint.send(request);
        }
    }

    private void maybeOpenAndSend(final CouchbaseRequest request) {
        ++this.pendingRequests;
        LOGGER.debug(PooledService.logIdent(this.hostname, this) + "Need to open a new Endpoint (size {}), pending requests {}", (Object)this.endpoints.size(), (Object)this.pendingRequests);
        final Endpoint endpoint = this.endpointFactory.create(this.hostname, this.bucket, this.username, this.password, this.port, this.ctx);
        final Subscription subscription = PooledService.whenState(endpoint, LifecycleState.CONNECTED, new Action1<LifecycleState>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(LifecycleState lifecycleState) {
                block7: {
                    try {
                        if (PooledService.this.disconnect) {
                            RetryHelper.retryOrCancel(PooledService.this.env, request, PooledService.this.responseBuffer);
                            break block7;
                        }
                        endpoint.send(request);
                        endpoint.send(SignalFlush.INSTANCE);
                        Object object = PooledService.this.epMutex;
                        synchronized (object) {
                            PooledService.this.endpoints.add(endpoint);
                            PooledService.this.endpointStates.register(endpoint, endpoint);
                            LOGGER.debug(PooledService.logIdent(PooledService.this.hostname, PooledService.this) + "New number of endpoints is {}", (Object)PooledService.this.endpoints.size());
                        }
                    }
                    finally {
                        PooledService.this.pendingRequests--;
                    }
                }
            }
        });
        endpoint.connect().subscribe((Subscriber)new Subscriber<LifecycleState>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                PooledService.this.unsubscribeAndRetry(subscription, request);
            }

            public void onNext(LifecycleState state) {
                if (state == LifecycleState.DISCONNECTING || state == LifecycleState.DISCONNECTED) {
                    PooledService.this.unsubscribeAndRetry(subscription, request);
                }
            }
        });
    }

    private void unsubscribeAndRetry(Subscription subscription, CouchbaseRequest request) {
        if (subscription != null && !subscription.isUnsubscribed()) {
            subscription.unsubscribe();
        }
        --this.pendingRequests;
        RetryHelper.retryOrCancel(this.env, request, this.responseBuffer);
    }

    private void sendFlush(SignalFlush signalFlush) {
        for (Endpoint endpoint : this.endpoints) {
            if (endpoint == null) continue;
            endpoint.send(signalFlush);
        }
    }

    @Override
    public Observable<EndpointHealth> diagnostics() {
        ArrayList<Observable> diags = new ArrayList<Observable>();
        for (Endpoint endpoint : this.endpoints()) {
            diags.add(endpoint.diagnostics(this.type()).toObservable());
        }
        return Observable.merge(diags);
    }

    @Override
    public BucketServiceMapping mapping() {
        return this.type().mapping();
    }

    static String logIdent(String hostname, Service service) {
        return "[" + hostname + "][" + service.getClass().getSimpleName() + "]: ";
    }

    private static Subscription whenState(Endpoint endpoint, final LifecycleState wanted, Action1<LifecycleState> then) {
        return endpoint.states().filter((Func1)new Func1<LifecycleState, Boolean>(){

            public Boolean call(LifecycleState state) {
                return state == wanted;
            }
        }).take(1).subscribe(then);
    }

    protected List<Endpoint> endpoints() {
        return this.endpoints;
    }
}

