/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.mq.server;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.ReceiveRequest;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSDestinationManager;
import org.jboss.mq.server.JMSTopic;
import org.jboss.mq.server.RoutedMessage;
import org.jboss.util.threadpool.ThreadPool;

public class ClientConsumer
implements Runnable {
    private static Logger log = Logger.getLogger(ClientConsumer.class);
    JMSDestinationManager server;
    ConnectionToken connectionToken;
    boolean enabled;
    boolean closed = false;
    HashMap subscriptions = new HashMap();
    HashMap removedSubscriptions = new HashMap();
    LinkedList blockedSubscriptions = new LinkedList();
    private LinkedList messages = new LinkedList();
    private boolean enqueued = false;
    private ThreadPool threadPool = null;

    public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException {
        this.server = server;
        this.connectionToken = connectionToken;
        this.threadPool = server.getThreadPool();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setEnabled(boolean enabled) throws JMSException {
        if (log.isTraceEnabled()) {
            log.trace("" + this + "->setEnabled(enabled=" + enabled + ")");
        }
        LinkedList linkedList = this.blockedSubscriptions;
        synchronized (linkedList) {
            this.enabled = enabled;
            if (enabled) {
                Iterator it = this.blockedSubscriptions.iterator();
                while (it.hasNext()) {
                    Subscription sub = (Subscription)it.next();
                    JMSDestination dest = this.server.getJMSDestination(sub.destination);
                    if (dest == null) continue;
                    dest.addReceiver(sub);
                }
                this.blockedSubscriptions.clear();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void queueMessageForSending(RoutedMessage r) {
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            if (this.closed) {
                return;
            }
            this.messages.add(r);
            if (!this.enqueued) {
                this.threadPool.run(this);
                this.enqueued = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSubscription(Subscription req) throws JMSException {
        if (log.isTraceEnabled()) {
            log.trace("Adding subscription for: " + req);
        }
        req.connectionToken = this.connectionToken;
        req.clientConsumer = this;
        JMSDestination jmsdest = this.server.getJMSDestination(req.destination);
        if (jmsdest == null) {
            throw new InvalidDestinationException("The destination " + req.destination + " does not exist !");
        }
        jmsdest.addSubscriber(req);
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            this.subscriptions.put(new Integer(req.subscriptionId), req);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        boolean trace = log.isTraceEnabled();
        if (trace) {
            log.trace("" + this + "->close()");
        }
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            this.closed = true;
            if (this.enqueued) {
                this.enqueued = false;
            }
            this.messages.clear();
        }
        HashMap subscriptionsClone = null;
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            subscriptionsClone = (HashMap)this.subscriptions.clone();
        }
        Iterator<Object> i = subscriptionsClone.keySet().iterator();
        while (i.hasNext()) {
            Integer subscriptionId = (Integer)i.next();
            try {
                this.removeSubscription(subscriptionId);
            }
            catch (JMSException ignore) {}
        }
        HashMap removedSubsClone = null;
        HashMap ignore = this.subscriptions;
        synchronized (ignore) {
            removedSubsClone = (HashMap)this.removedSubscriptions.clone();
        }
        i = removedSubsClone.values().iterator();
        while (i.hasNext()) {
            Subscription removed = (Subscription)i.next();
            JMSDestination queue = this.server.getJMSDestination(removed.destination);
            if (queue == null) {
                log.warn("The subscription was registered with a destination that does not exist: " + removed);
            } else {
                try {
                    queue.nackMessages(removed);
                }
                catch (JMSException e) {
                    log.warn("Unable to nack removed subscription: " + removed, e);
                }
            }
            this.removeRemovedSubscription(removed.subscriptionId);
        }
    }

    public SpyMessage receive(int subscriberId, long wait) throws JMSException {
        Subscription req = this.getSubscription(subscriberId);
        if (req == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        JMSDestination queue = this.server.getJMSDestination(req.destination);
        if (queue == null) {
            throw new InvalidDestinationException("The subscription's destination " + req.destination + " does not exist");
        }
        if (this.addBlockedSubscription(req, wait)) {
            return queue.receive(req, wait != -1L);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeSubscription(int subscriptionId) throws JMSException {
        Subscription req;
        if (log.isTraceEnabled()) {
            log.trace("" + this + "->removeSubscription(subscriberId=" + subscriptionId + ")");
        }
        Integer subId = new Integer(subscriptionId);
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            req = (Subscription)this.subscriptions.remove(subId);
            if (req != null) {
                this.removedSubscriptions.put(subId, req);
            }
        }
        if (req == null) {
            throw new JMSException("The subscription had not been previously registered");
        }
        JMSDestination queue = this.server.getPossiblyClosingJMSDestination(req.destination);
        if (queue == null) {
            throw new InvalidDestinationException("The subscription was registered with a destination that does not exist !");
        }
        queue.removeSubscriber(req);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            ReceiveRequest[] job;
            LinkedList linkedList = this.messages;
            synchronized (linkedList) {
                if (this.closed) {
                    return;
                }
                job = new ReceiveRequest[this.messages.size()];
                Iterator iter = this.messages.iterator();
                int i = 0;
                while (iter.hasNext()) {
                    RoutedMessage rm = (RoutedMessage)iter.next();
                    job[i] = rm.toReceiveRequest();
                    iter.remove();
                    ++i;
                }
                this.enqueued = false;
            }
            this.connectionToken.clientIL.receive(job);
        }
        catch (Throwable t) {
            LinkedList linkedList = this.messages;
            synchronized (linkedList) {
                if (this.closed) {
                    log.warn("Could not send messages to a receiver.", t);
                } else {
                    log.trace("Could not send messages to a receiver. It is closed.", t);
                }
            }
            try {
                this.server.connectionFailure(this.connectionToken);
            }
            catch (Throwable ignore) {
                log.warn("Could not close the client connection..", ignore);
            }
        }
    }

    public String toString() {
        return "ClientConsumer:" + this.connectionToken.getClientID();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge(AcknowledgementRequest request, Tx txId) throws JMSException {
        Subscription sub = this.retrieveSubscription(request.subscriberId);
        if (sub == null) {
            HashMap hashMap = this.subscriptions;
            synchronized (hashMap) {
                sub = (Subscription)this.removedSubscriptions.get(new Integer(request.subscriberId));
            }
        }
        if (sub == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        JMSDestination queue = this.server.getJMSDestination(sub.destination);
        if (queue == null) {
            throw new InvalidDestinationException("The subscription's destination " + sub.destination + " does not exist");
        }
        queue.acknowledge(request, sub, txId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean addBlockedSubscription(Subscription sub, long wait) {
        LinkedList linkedList = this.blockedSubscriptions;
        synchronized (linkedList) {
            if (!this.enabled && wait != -1L) {
                this.blockedSubscriptions.add(sub);
            }
            return this.enabled;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeRemovedSubscription(int subId) {
        JMSDestination topic;
        Subscription sub = null;
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            sub = (Subscription)this.removedSubscriptions.remove(new Integer(subId));
        }
        if (sub != null && (topic = this.server.getPossiblyClosingJMSDestination(sub.destination)) != null && topic instanceof JMSTopic) {
            ((JMSTopic)topic).cleanupSubscription(sub);
        }
    }

    public Subscription getSubscription(int subscriberId) throws JMSException {
        Subscription req = this.retrieveSubscription(subscriberId);
        if (req == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        return req;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Subscription retrieveSubscription(int subscriberId) throws JMSException {
        Integer id = new Integer(subscriberId);
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            return (Subscription)this.subscriptions.get(id);
        }
    }
}

