package org.springframework.integration.core;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageDeliveryException;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.MessagingException;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.support.channel.ChannelResolver;
import org.springframework.integration.support.converter.MessageConverter;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/core/MessagingTemplate.class */
public class MessagingTemplate implements MessagingOperations, BeanFactoryAware, InitializingBean {
    private volatile MessageChannel defaultChannel;
    private volatile ChannelResolver channelResolver;
    private volatile boolean initialized;
    protected final Log logger = LogFactory.getLog(getClass());
    private volatile MessageConverter messageConverter = new SimpleMessageConverter();
    private volatile long sendTimeout = -1;
    private volatile long receiveTimeout = -1;
    private final Object initializationMonitor = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/core/MessagingTemplate$TemporaryReplyChannel.class */
    public static class TemporaryReplyChannel implements PollableChannel {
        private volatile Message<?> message;
        private final long receiveTimeout;
        private final CountDownLatch latch = new CountDownLatch(1);

        public TemporaryReplyChannel(long j) {
            this.receiveTimeout = j;
        }

        @Override // org.springframework.integration.core.PollableChannel
        public Message<?> receive() {
            return receive(-1L);
        }

        @Override // org.springframework.integration.core.PollableChannel
        public Message<?> receive(long j) {
            try {
                if (this.receiveTimeout < 0) {
                    this.latch.await();
                } else {
                    this.latch.await(this.receiveTimeout, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return this.message;
        }

        @Override // org.springframework.integration.MessageChannel
        public boolean send(Message<?> message) {
            return send(message, -1L);
        }

        @Override // org.springframework.integration.MessageChannel
        public boolean send(Message<?> message, long j) {
            this.message = message;
            this.latch.countDown();
            return true;
        }
    }

    public MessagingTemplate() {
    }

    public MessagingTemplate(MessageChannel messageChannel) {
        this.defaultChannel = messageChannel;
    }

    public void setDefaultChannel(MessageChannel messageChannel) {
        this.defaultChannel = messageChannel;
    }

    public void setChannelResolver(ChannelResolver channelResolver) {
        Assert.notNull(channelResolver, "'channelResolver' must not be null");
        this.channelResolver = channelResolver;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "'messageConverter' must not be null");
        this.messageConverter = messageConverter;
    }

    public void setSendTimeout(long j) {
        this.sendTimeout = j;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) {
        if (this.channelResolver != null || beanFactory == null) {
            return;
        }
        this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        synchronized (this.initializationMonitor) {
            if (this.initialized) {
                return;
            }
            this.initialized = true;
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> void send(Message<P> message) {
        send(getRequiredDefaultChannel(), message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> void send(MessageChannel messageChannel, Message<P> message) {
        doSend(messageChannel, message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> void send(String str, Message<P> message) {
        send(resolveChannelName(str), message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(T t) {
        Message message = this.messageConverter.toMessage(t);
        if (message != null) {
            send(message);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(MessageChannel messageChannel, T t) {
        Message message = this.messageConverter.toMessage(t);
        if (message != null) {
            send(messageChannel, message);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(String str, T t) {
        Message message = this.messageConverter.toMessage(t);
        if (message != null) {
            send(str, message);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(T t, MessagePostProcessor messagePostProcessor) {
        Message<?> postProcessMessage = messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(t));
        if (postProcessMessage != null) {
            send(postProcessMessage);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(MessageChannel messageChannel, T t, MessagePostProcessor messagePostProcessor) {
        Message<?> postProcessMessage = messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(t));
        if (postProcessMessage != null) {
            send(messageChannel, postProcessMessage);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <T> void convertAndSend(String str, T t, MessagePostProcessor messagePostProcessor) {
        Message<?> postProcessMessage = messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(t));
        if (postProcessMessage != null) {
            send(str, postProcessMessage);
        }
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> Message<P> receive() {
        MessageChannel requiredDefaultChannel = getRequiredDefaultChannel();
        Assert.state(requiredDefaultChannel instanceof PollableChannel, "The 'defaultChannel' must be a PollableChannel for receive operations.");
        return receive((PollableChannel) requiredDefaultChannel);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> Message<P> receive(PollableChannel pollableChannel) {
        return doReceive(pollableChannel);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public <P> Message<P> receive(String str) {
        MessageChannel resolveChannelName = resolveChannelName(str);
        Assert.isInstanceOf(PollableChannel.class, resolveChannelName, "A PollableChannel is required for receive operations. ");
        return receive((PollableChannel) resolveChannelName);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object receiveAndConvert() throws MessagingException {
        Message receive = receive();
        if (receive != null) {
            return this.messageConverter.fromMessage(receive);
        }
        return null;
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object receiveAndConvert(PollableChannel pollableChannel) throws MessagingException {
        Message receive = receive(pollableChannel);
        if (receive != null) {
            return this.messageConverter.fromMessage(receive);
        }
        return null;
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object receiveAndConvert(String str) throws MessagingException {
        Message receive = receive(str);
        if (receive != null) {
            return this.messageConverter.fromMessage(receive);
        }
        return null;
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Message<?> sendAndReceive(Message<?> message) {
        return sendAndReceive(getRequiredDefaultChannel(), message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Message<?> sendAndReceive(MessageChannel messageChannel, Message<?> message) {
        return doSendAndReceive(messageChannel, message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Message<?> sendAndReceive(String str, Message<?> message) {
        return sendAndReceive(resolveChannelName(str), message);
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(Object obj) {
        return this.messageConverter.fromMessage(sendAndReceive(this.messageConverter.toMessage(obj)));
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(MessageChannel messageChannel, Object obj) {
        return this.messageConverter.fromMessage(sendAndReceive(messageChannel, this.messageConverter.toMessage(obj)));
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(String str, Object obj) {
        return this.messageConverter.fromMessage(sendAndReceive(str, this.messageConverter.toMessage(obj)));
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(Object obj, MessagePostProcessor messagePostProcessor) {
        return this.messageConverter.fromMessage(sendAndReceive(messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(obj))));
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(MessageChannel messageChannel, Object obj, MessagePostProcessor messagePostProcessor) {
        return this.messageConverter.fromMessage(sendAndReceive(messageChannel, messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(obj))));
    }

    @Override // org.springframework.integration.core.MessagingOperations
    public Object convertSendAndReceive(String str, Object obj, MessagePostProcessor messagePostProcessor) {
        return this.messageConverter.fromMessage(sendAndReceive(str, messagePostProcessor.postProcessMessage(this.messageConverter.toMessage(obj))));
    }

    private void doSend(MessageChannel messageChannel, Message<?> message) {
        Assert.notNull(messageChannel, "channel must not be null");
        long j = this.sendTimeout;
        if (!(j >= 0 ? messageChannel.send(message, j) : messageChannel.send(message))) {
            throw new MessageDeliveryException(message, "failed to send message to channel '" + messageChannel + "' within timeout: " + j);
        }
    }

    private <P> Message<P> doReceive(PollableChannel pollableChannel) {
        Assert.notNull(pollableChannel, "channel must not be null");
        long j = this.receiveTimeout;
        Message<P> message = (Message<P>) (j >= 0 ? pollableChannel.receive(j) : pollableChannel.receive());
        if (message == null && this.logger.isTraceEnabled()) {
            this.logger.trace("failed to receive message from channel '" + pollableChannel + "' within timeout: " + j);
        }
        return message;
    }

    private <S, R> Message<R> doSendAndReceive(MessageChannel messageChannel, Message<S> message) {
        Object replyChannel = message.getHeaders().getReplyChannel();
        Object errorChannel = message.getHeaders().getErrorChannel();
        TemporaryReplyChannel temporaryReplyChannel = new TemporaryReplyChannel(this.receiveTimeout);
        doSend(messageChannel, MessageBuilder.fromMessage(message).setReplyChannel(temporaryReplyChannel).setErrorChannel(temporaryReplyChannel).build());
        Message<R> doReceive = doReceive(temporaryReplyChannel);
        if (doReceive != null) {
            doReceive = MessageBuilder.fromMessage(doReceive).setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel).setHeader("errorChannel", errorChannel).build();
        }
        return doReceive;
    }

    private MessageChannel getRequiredDefaultChannel() {
        Assert.state(this.defaultChannel != null, "No 'defaultChannel' specified for MessagingTemplate. Unable to invoke methods without an explicit channel argument.");
        return this.defaultChannel;
    }

    private ChannelResolver getRequiredChannelResolver() {
        Assert.state(this.channelResolver != null, "No 'channelResolver' specified for MessagingTemplate. Unable to invoke methods with a channel name argument.");
        return this.channelResolver;
    }

    protected MessageChannel resolveChannelName(String str) {
        return getRequiredChannelResolver().resolveChannelName(str);
    }
}
