package org.n52.ses.wsn;

import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.namespace.QName;
import org.apache.muse.core.routing.MessageHandler;
import org.apache.muse.util.ReflectUtils;
import org.apache.muse.util.xml.XmlSerializable;
import org.apache.muse.util.xml.XmlUtils;
import org.apache.muse.ws.addressing.EndpointReference;
import org.apache.muse.ws.addressing.WsaConstants;
import org.apache.muse.ws.addressing.soap.SoapFault;
import org.apache.muse.ws.notification.Filter;
import org.apache.muse.ws.notification.NotificationMessage;
import org.apache.muse.ws.notification.Policy;
import org.apache.muse.ws.notification.faults.SubscribeCreationFailedFault;
import org.apache.muse.ws.notification.faults.TopicNotSupportedFault;
import org.apache.muse.ws.notification.faults.UnacceptableInitialTerminationTimeFault;
import org.apache.muse.ws.notification.impl.FilterCollection;
import org.apache.muse.ws.notification.impl.SimpleNotificationProducer;
import org.apache.muse.ws.notification.topics.Topic;
import org.apache.muse.ws.resource.WsResource;
import org.n52.oxf.xmlbeans.tools.XmlUtil;
import org.n52.ses.api.IFilterEngine;
import org.n52.ses.api.common.FreeResourceListener;
import org.n52.ses.api.ws.EngineCoveredFilter;
import org.n52.ses.api.ws.INotificationMessage;
import org.n52.ses.api.ws.ISubscriptionManager;
import org.n52.ses.requestlogger.RequestLoggerWrapper;
import org.n52.ses.util.common.ConfigurationRegistry;
import org.n52.ses.util.common.NamedThreadFactory;
import org.n52.ses.wsbr.SesTopicFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;

/* loaded from: input_file:org/n52/ses/wsn/SESNotificationProducer.class */
public class SESNotificationProducer extends SimpleNotificationProducer implements FreeResourceListener {
    public static final String CONTEXT_PATH = "Broker";
    private ExecutorService notfiyPool;
    private static final Logger logger = LoggerFactory.getLogger(SESNotificationProducer.class);
    private static Object FIRST_RUN_MUTEX = new Object();
    private static boolean FIRST_RUN = true;

    public void freeResources() {
        if (this.notfiyPool != null) {
            this.notfiyPool.shutdownNow();
        }
    }

    public void publish(QName qName, XmlSerializable xmlSerializable) throws SoapFault {
        publish(qName, new XmlSerializable[]{xmlSerializable});
    }

    public void publish(QName qName, XmlSerializable[] xmlSerializableArr) throws SoapFault {
        Element[] elementArr = new Element[xmlSerializableArr.length];
        for (int i = 0; i < xmlSerializableArr.length; i++) {
            elementArr[i] = xmlSerializableArr[i].toXML();
        }
        publish(qName, elementArr);
    }

    protected MessageHandler createSubscribeHandler() {
        SubscribeWithPolicyHandler subscribeWithPolicyHandler = new SubscribeWithPolicyHandler();
        subscribeWithPolicyHandler.setMethod(ReflectUtils.getFirstMethod(getClass(), "subscribe"));
        return subscribeWithPolicyHandler;
    }

    public void publish(QName qName, Element[] elementArr) throws SoapFault {
        Topic topic;
        NotificationMessage createNotificationMessage = createNotificationMessage();
        for (Element element : elementArr) {
            createNotificationMessage.addMessageContent(element);
        }
        createNotificationMessage.setTopic(qName);
        for (SESSubscriptionManager sESSubscriptionManager : getSubscriptions()) {
            if (!sESSubscriptionManager.isHasEngineCoveredFilter()) {
                sESSubscriptionManager.publish(createNotificationMessage);
            }
        }
        if (qName == null || (topic = getTopic(qName)) == null) {
            return;
        }
        topic.setCurrentMessage(createNotificationMessage);
    }

    public WsResource subscribe(EndpointReference endpointReference, Filter filter, Date date, Policy policy) throws TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault {
        if (logger.isDebugEnabled()) {
            logger.debug("subscribing... " + endpointReference.getAddress());
        }
        if (ConfigurationRegistry.getInstance() != null && endpointReference.getAddress().toString().equals(ConfigurationRegistry.getInstance().getSesPortTypeEPR().getAddress().toString())) {
            logger.warn("There was an attempt to create an infinite loop. The EndpointReference linked to this service's PortType. Subscription rejected.");
            throw new SubscribeCreationFailedFault("There was an attempt to create an infinite loop. The EndpointReference linked to this service's PortType. Subscription rejected.");
        }
        FilterCollection findEngineCoveredFilters = findEngineCoveredFilters(filter, new FilterCollection());
        WsResource subscribe = super.subscribe(endpointReference, filter, date, policy);
        subscribe.getEndpointReference().setMetadata(new QName(SesTopicFactory.NAMESPACE, SESSubscriptionManager.CONTEXT_PATH, "sesinst"), new QName(SesTopicFactory.NAMESPACE, "SubscriptionManagerService", "sesinst"), "SubscriptionManagerPort", ConfigurationRegistry.getInstance().getSubMgrWsdl());
        SESSubscriptionManager capability = subscribe.getCapability("http://docs.oasis-open.org/wsn/bw-2/SubscriptionManager");
        try {
            IFilterEngine filterEngine = ConfigurationRegistry.getInstance().getFilterEngine();
            if (filterEngine == null) {
                throw new SubscribeCreationFailedFault("Could not access an instanceof IFilterEngine! Subscriptions may not match!");
            }
            if (filterEngine.registerFilter(capability, findEngineCoveredFilters)) {
                capability.setHasEngineCoveredFilter(true);
            }
            return subscribe;
        } catch (Exception e) {
            try {
                subscribe.shutdown();
            } catch (SoapFault e2) {
                logger.warn("Could not remove false resource '" + XmlUtils.toString(subscribe.getEndpointReference().getParameter(WsaConstants.DEFAULT_RESOURCE_ID_QNAME)) + "'.\nPlease remove manually from the router-entries\\" + SESSubscriptionManager.CONTEXT_PATH + " folder or it will throw an exception at restart.");
            }
            throw new SubscribeCreationFailedFault(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FilterCollection findEngineCoveredFilters(Filter filter, FilterCollection filterCollection) {
        if (filter instanceof FilterCollection) {
            Iterator it = ((FilterCollection) filter).getFilters().iterator();
            while (it.hasNext()) {
                findEngineCoveredFilters((Filter) it.next(), filterCollection);
            }
        } else if (filter instanceof EngineCoveredFilter) {
            filterCollection.addFilter(filter);
        }
        return filterCollection;
    }

    public void shutdown() throws SoapFault {
        super.shutdown();
        this.notfiyPool.shutdownNow();
    }

    public void initialize() throws SoapFault {
        logger.info("initialising SESNotificationProducer..");
        super.initialize();
        synchronized (FIRST_RUN_MUTEX) {
            if (FIRST_RUN) {
                ConfigurationRegistry configurationRegistry = ConfigurationRegistry.getInstance();
                configurationRegistry.registerFreeResourceListener(this);
                RequestLoggerWrapper.init(configurationRegistry);
                SesTopicFactory.addDefaultTopics(this);
                this.notfiyPool = Executors.newFixedThreadPool(Integer.parseInt(configurationRegistry.getPropertyForKey("MAX_THREADS")), new NamedThreadFactory("NotifyHandlerPool"));
                new Thread(new Runnable() { // from class: org.n52.ses.wsn.SESNotificationProducer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        ConfigurationRegistry.getInstance().waitForAllPersistentPublishers();
                        for (ISubscriptionManager iSubscriptionManager : ConfigurationRegistry.getInstance().getReresubs()) {
                            if (iSubscriptionManager instanceof SESSubscriptionManager) {
                                SESSubscriptionManager sESSubscriptionManager = (SESSubscriptionManager) iSubscriptionManager;
                                SESNotificationProducer.this.addSubscription(sESSubscriptionManager.getWsResource());
                                if (sESSubscriptionManager.getFilter() instanceof FilterCollection) {
                                    try {
                                        if (ConfigurationRegistry.getInstance().getFilterEngine().registerFilter(sESSubscriptionManager, SESNotificationProducer.this.findEngineCoveredFilters(sESSubscriptionManager.getFilter(), new FilterCollection()))) {
                                            sESSubscriptionManager.setHasEngineCoveredFilter(true);
                                        }
                                    } catch (Exception e) {
                                        SESNotificationProducer.logger.warn(e.getMessage(), e);
                                    }
                                }
                                sESSubscriptionManager.resumeSubscription();
                            }
                        }
                    }
                }).start();
                FIRST_RUN = false;
            }
        }
    }

    public void initializeCompleted() throws SoapFault {
        super.initializeCompleted();
        getResource().getCapability("http://docs.oasis-open.org/wsn/bw-2/NotificationConsumer").addMessageListener(new SESMessageListener(this));
    }

    public void publishCompleteMessage(final NotificationMessage notificationMessage) throws SoapFault {
        IFilterEngine filterEngine = ConfigurationRegistry.getInstance().getFilterEngine();
        if (filterEngine == null) {
            logger.warn("Could not access an instanceof IFilterEngine! Subscriptions may not match!");
        } else {
            filterEngine.filter(new INotificationMessage() { // from class: org.n52.ses.wsn.SESNotificationProducer.2
                public String xmlToString() {
                    return XmlUtil.toString(notificationMessage.toXML());
                }

                public Object getNotificationMessage() {
                    return notificationMessage;
                }
            });
        }
        if (this.notfiyPool == null) {
            return;
        }
        this.notfiyPool.submit(new Runnable() { // from class: org.n52.ses.wsn.SESNotificationProducer.3
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = notificationMessage.getMessageContentNames().iterator();
                while (it.hasNext()) {
                    Element messageContent = notificationMessage.getMessageContent((QName) it.next());
                    if (messageContent != null) {
                        try {
                            SESNotificationProducer.this.publish(notificationMessage.getTopic(), messageContent);
                        } catch (SoapFault e) {
                            SESNotificationProducer.logger.warn(e.getMessage(), e);
                        }
                    }
                }
            }
        });
    }
}
