package org.n52.eventing.connector.mqtt;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.inject.Inject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.n52.eventing.rest.Configuration;
import org.n52.eventing.rest.publications.PublicationDataIngestor;
import org.n52.eventing.rest.publications.PublicationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/n52/eventing/connector/mqtt/PahoMqttConnector.class */
public class PahoMqttConnector implements PublicationProvider, InitializingBean, DisposableBean {
    private String host;
    private String clientId;
    private MqttClient client;
    private MessageCallback callback;
    private boolean connected;

    @Inject
    private Optional<PublicationDataIngestor> dataIngestorOptional;

    @Autowired
    private Configuration config;
    private PublicationDataIngestor dataIngestor;
    private int port;
    private String protocol;
    private boolean parseToString;
    private String mimeType;
    private String publicationIdentifer;
    private static final Logger LOG = LoggerFactory.getLogger(PahoMqttConnector.class);
    private static final Set<String> STRING_MIME_TYPES = new HashSet(Arrays.asList("text/csv", "application/json", "application/xml", "text/xml", "text/plain"));

    /* loaded from: input_file:org/n52/eventing/connector/mqtt/PahoMqttConnector$MessageCallback.class */
    public interface MessageCallback {
        void receive(byte[] bArr);
    }

    /* loaded from: input_file:org/n52/eventing/connector/mqtt/PahoMqttConnector$QualityOfService.class */
    public enum QualityOfService {
        AT_MOST_ONCE,
        AT_LEAST_ONCE,
        EXACTLY_ONCE
    }

    public void afterPropertiesSet() throws Exception {
        if (!this.dataIngestorOptional.isPresent()) {
            LOG.warn("No dataIngestor available, skipping MQTT connector init");
            return;
        }
        this.dataIngestor = this.dataIngestorOptional.get();
        Optional parameter = this.config.getParameter("connector.mqtt.host");
        if (!parameter.isPresent()) {
            LOG.warn("No connector.mqtt.host specified, skipping connector init");
            return;
        }
        Optional parameter2 = this.config.getParameter("connector.mqtt.publicationIdentifer");
        if (!parameter2.isPresent()) {
            LOG.warn("connector.mqtt.publicationIdentifer specified. It is required in order to relate data to the publication, skipping connector init");
            return;
        }
        this.publicationIdentifer = (String) parameter2.get();
        Integer num = (Integer) this.config.getParameterAsInt("connector.mqtt.port").orElse(1883);
        String str = (String) this.config.getParameter("connector.mqtt.protocol").orElse("tcp");
        Optional parameter3 = this.config.getParameter("connector.mqtt.mimeType");
        if (parameter3.isPresent() && STRING_MIME_TYPES.contains(parameter3.get())) {
            this.parseToString = true;
            this.mimeType = (String) parameter3.get();
        }
        init(str, (String) parameter.get(), num.intValue(), UUID.randomUUID().toString(), bArr -> {
            if (this.parseToString) {
                this.dataIngestor.ingestData(new String(bArr), this.publicationIdentifer, this.mimeType);
            } else {
                this.dataIngestor.ingestData(bArr, this.publicationIdentifer);
            }
        });
        try {
            connect();
            subscribe((String) this.config.getParameter("connector.mqtt.topic").orElse("#"), QualityOfService.EXACTLY_ONCE);
        } catch (MqttException e) {
            LOG.warn("Could not connect to MQTT host: {}", e.getMessage());
            LOG.debug(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init(String str, String str2, int i, String str3, MessageCallback messageCallback) {
        this.protocol = str;
        this.host = str2;
        this.port = i;
        this.clientId = str3;
        this.callback = messageCallback;
    }

    public void connect() throws MqttException {
        this.client = new MqttClient(String.format("%s://%s:%s", this.protocol, this.host, Integer.valueOf(this.port)), this.clientId, new MemoryPersistence());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(5000);
        mqttConnectOptions.setAutomaticReconnect(true);
        LOG.info("Connecting to {} with client-id {}", String.format("%s://%s:%s", this.protocol, this.host, Integer.valueOf(this.port)), this.clientId);
        this.client.connect(mqttConnectOptions);
        this.client.setCallback(new MqttCallbackExtended() { // from class: org.n52.eventing.connector.mqtt.PahoMqttConnector.1
            public void connectionLost(Throwable th) {
                PahoMqttConnector.LOG.warn("Connection lost", th);
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                PahoMqttConnector.LOG.trace("New message on topic '{}': {}", str, mqttMessage);
                try {
                    PahoMqttConnector.this.callback.receive(mqttMessage.getPayload());
                } catch (RuntimeException e) {
                    PahoMqttConnector.LOG.warn("Error in callback: {}", e.getMessage());
                    PahoMqttConnector.LOG.debug(e.getMessage(), e);
                }
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                PahoMqttConnector.LOG.info("Delivery completed for message id '{}'", Integer.valueOf(iMqttDeliveryToken.getMessageId()));
            }

            public void connectComplete(boolean z, String str) {
                PahoMqttConnector.LOG.info("Re-connected to MQTT broker.");
            }
        });
        this.connected = true;
    }

    public void subscribe(String str, QualityOfService qualityOfService) throws MqttException {
        this.client.subscribe(str, qualityOfService.ordinal());
        LOG.info("Subscribed to topic {}", str);
    }

    public void destroy() {
        try {
            if (this.client.isConnected() && this.connected) {
                this.client.disconnectForcibly(5000L);
            }
        } catch (MqttException e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    public String getContentType() {
        if (this.mimeType != null) {
            return this.mimeType;
        }
        return null;
    }

    public String getDescription() {
        return "MQTT Data feed: " + this.publicationIdentifer;
    }

    public String getIdentifier() {
        return this.publicationIdentifer;
    }
}
