package org.n52.amqp;

import java.io.IOException;
import java.net.URI;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:org/n52/amqp/Connection.class */
public class Connection {
    private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
    private final String username;
    private final String password;
    private final URI remoteURI;
    private boolean open = true;

    public Connection(URI uri, String str, String str2) {
        this.username = str;
        this.password = str2;
        this.remoteURI = uri;
    }

    public boolean isOpen() {
        return this.open;
    }

    public URI getRemoteURI() {
        return this.remoteURI;
    }

    public String getUsername() {
        return this.username;
    }

    public String getPassword() {
        return this.password;
    }

    public void close() {
        this.open = false;
        LOG.info("Closed connection for client {}", this.remoteURI);
    }

    public Publisher createPublisher() throws PublisherCreationFailedException {
        try {
            return new Publisher(this);
        } catch (Exception e) {
            throw new PublisherCreationFailedException("Could not create publisher", e);
        }
    }

    public Observable<AmqpMessage> createObservable() {
        return Observable.create(subscriber -> {
            LOG.debug("Creating observable for {}", this.remoteURI);
            Messenger messenger = null;
            while (this.open && !subscriber.isUnsubscribed()) {
                messenger = Messenger.Factory.create();
                try {
                    messenger.start();
                    messenger.subscribe(this.remoteURI.toString());
                    while (!messenger.stopped()) {
                        LOG.debug("start receiving");
                        messenger.recv();
                        while (messenger.incoming() > 0) {
                            LOG.debug("starting receiving loop");
                            Message message = messenger.get();
                            Section body = message.getBody();
                            if (body instanceof AmqpValue) {
                                subscriber.onNext(createMessage(message));
                            } else {
                                LOG.warn("Unsupported type of body: " + (body == null ? "n/a" : body.getClass()));
                            }
                        }
                    }
                } catch (IOException e) {
                    LOG.warn("Could not spawn subscriber", e);
                    return;
                }
            }
            if (messenger != null && !messenger.stopped()) {
                messenger.stop();
            }
            if (!subscriber.isUnsubscribed()) {
                subscriber.unsubscribe();
            }
            subscriber.onCompleted();
        });
    }

    private AmqpMessage createMessage(Message message) {
        return new AmqpMessage(message.getBody().getValue(), createContentType(message.getContentType()), message.getSubject());
    }

    private ContentType createContentType(String str) {
        if (str == null) {
            return null;
        }
        return new ContentType(str);
    }
}
