package org.n52.eventing.delivery.mqtt;

import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/n52/eventing/delivery/mqtt/PahoMqttDelivery.class */
public class PahoMqttDelivery {
    private static final Logger LOG = LoggerFactory.getLogger(PahoMqttDelivery.class);
    private final String protocol;
    private final String host;
    private final int port;
    private final String clientId;
    private MqttClient client;
    private boolean destroyed;

    public PahoMqttDelivery(String str, int i, String str2, String str3) {
        this.protocol = str2;
        this.host = str;
        this.port = i;
        this.clientId = str3;
    }

    public String getProtocol() {
        return this.protocol;
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public void connect() throws MqttException {
        this.client = new MqttClient(String.format("%s://%s:%s", this.protocol, this.host, Integer.valueOf(this.port)), this.clientId, new MemoryPersistence());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(5000);
        mqttConnectOptions.setAutomaticReconnect(true);
        LOG.info("Connecting to {} with client-id {}", String.format("%s://%s:%s", this.protocol, this.host, Integer.valueOf(this.port)), this.clientId);
        this.client.connect(mqttConnectOptions);
        this.client.setCallback(new MqttCallbackExtended() { // from class: org.n52.eventing.delivery.mqtt.PahoMqttDelivery.1
            public void connectionLost(Throwable th) {
                PahoMqttDelivery.LOG.warn("Connection lost", th);
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                PahoMqttDelivery.LOG.trace("Delivery completed for message id '{}'", Integer.valueOf(iMqttDeliveryToken.getMessageId()));
            }

            public void connectComplete(boolean z, String str) {
                PahoMqttDelivery.LOG.info("Re-connected to MQTT broker.");
            }
        });
    }

    public void destroy() {
        try {
            if (this.client.isConnected()) {
                this.client.disconnectForcibly(5000L);
            }
        } catch (MqttException e) {
            LOG.warn(e.getMessage(), e);
        }
        this.destroyed = true;
    }

    public void publishToTopic(InputStream inputStream, String str) {
        if (this.destroyed) {
            return;
        }
        if (this.client == null || !this.client.isConnected()) {
            try {
                connect();
            } catch (MqttException e) {
                LOG.warn("Could not connect to MQTT host", e);
                LOG.debug(e.getMessage(), e);
                return;
            }
        }
        try {
            LOG.trace("Publishing message");
            this.client.publish(str, new MqttMessage(ByteStreams.toByteArray(inputStream)));
            LOG.trace("Message published");
        } catch (IOException | MqttException e2) {
            LOG.warn("Could publish message", e2);
            LOG.debug(e2.getMessage(), e2);
        }
    }
}
