在这篇文章中,我们将带领您了解SpringIntegration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用的全貌,同时,我们还将为您介绍
在这篇文章中,我们将带领您了解Spring Integration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用的全貌,同时,我们还将为您介绍有关006-spring cloud gateway-GatewayAutoConfiguration核心配置-GatewayProperties初始化加载、Route初始化加载、502 Bad Gateway - Registered endpoint failed to handle the request、502 Bad Gateway Registered endpoint failed to handle the request、An application is attempting to perform an action that requires privileges. Authentication is requir的知识,以帮助您更好地理解这个主题。
本文目录一览:- Spring Integration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用
- 006-spring cloud gateway-GatewayAutoConfiguration核心配置-GatewayProperties初始化加载、Route初始化加载
- 502 Bad Gateway - Registered endpoint failed to handle the request
- 502 Bad Gateway Registered endpoint failed to handle the request
- An application is attempting to perform an action that requires privileges. Authentication is requir
Spring Integration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用
我的情况类似于此SO问题中描述的情况。区别在于我不使用aWebFlux.outboundGateway
而是Ftp.outboundGateway
在其上调用AbstractRemoteFileOutboundGateway.Command.GET
命令,常见的问题是我无法使用定义RequestHandlerRetryAdvice
。
配置看起来像这样(向下分解到相关部分):
@RestController@RequestMapping( value = "/somepath" )public class DownloadController{ private DownloadGateway downloadGateway; public DownloadController( DownloadGateway downloadGateway ) { this.downloadGateway = downloadGateway; } @PostMapping( "/downloads" ) public void download( @RequestParam( "filename" ) String filename ) { Map<String, Object> headers = new HashMap<>(); downloadGateway.triggerDownload( filename, headers ); }}@MessagingGatewaypublic interface DownloadGateway{ @Gateway( requestChannel = "downloadFiles.input" ) void triggerDownload( Object value, Map<String, Object> headers );}@Configuration@EnableIntegrationpublic class FtpDefinition{ private FtpProperties ftpProperties; public FtpDefinition( FtpProperties ftpProperties ) { this.ftpProperties = ftpProperties; } @Bean public DirectChannel gatewayDownloadsOutputChannel() { return new DirectChannel(); } @Bean public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile ) { return f -> f.handle( getRemoteFile, getRetryAdvice() ) .channel( "gatewayDownloadsOutputChannel" ); } private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice() { return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> { RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); advice.setRetryTemplate( getRetryTemplate() ); return advice; } ).get() ); } private RetryTemplate getRetryTemplate() { RetryTemplate result = new RetryTemplate(); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod( 5000 ); result.setBackOffPolicy( backOffPolicy ); return result; } @Bean public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory ) { return Ftp.outboundGateway( sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload" ) .fileExistsMode( FileExistsMode.REPLACE ) .localDirectoryExpression( "''" + ftpProperties.getLocalDir() + "''" ) .autoCreateLocalDirectory( true ); } @Bean public SessionFactory<FTPFile> ftpSessionFactory() { DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory(); sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() ); sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() ); sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() ); sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() ); return sessionFactory; }}@SpringBootApplication@EnableIntegration@IntegrationComponentScanpublic class FtpTestApplication { public static void main(String[] args) { SpringApplication.run( FtpTestApplication.class, args ); }}@Configuration@PropertySource( "classpath:ftp.properties" )@ConfigurationProperties( prefix = "ftp" )@Datapublic class FtpProperties{ @NotNull private String localDir; @NotNull private List<Server> servers; @Data public static class Server { @NotNull private String host; @NotNull private int port; @NotNull private String user; @NotNull private String password; }}
Controller主要用于测试目的,在实际的实现中有一个轮询器。我FtpProperties
持有服务器列表,因为在实际实现中,我使用aDelegatingSessionFactory
根据一些参数选择一个实例。
根据加里·罗素(Gary Russell)的评论,我希望重试失败的下载。但是,如果我中断了下载服务器端(通过在FileZilla实例中发出“踢用户”),我将立即获得堆栈跟踪,而不会重试:
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.[...]
我还需要上传文件,为此我使用Ftp.outboundAdapter
。在这种情况下RetryTemplate
,如果使用相同的方法,如果我中断了上传服务器端,SpringIntegration将再执行两次尝试,每次尝试的延迟为5s,然后再进行log java.net.SocketException: Connectionreset
,所有操作均符合预期。
我尝试调试了一下,并注意到在第一次尝试通过进行上传之前Ftp.outboundAdapter
,遇到了断点RequestHandlerRetryAdvice.doInvoke()
。但是,通过下载时Ftp.outboundGateway
,该断点_永远不会_ 命中。
我的配置有问题吗,有人可以RequestHandlerRetryAdvice
使用Ftp.outboundGateway
/AbstractRemoteFileOutboundGateway.Command.GET
吗?
答案1
小编典典抱歉耽搁了; 我们这周在SpringOne平台上。
问题是由于网关规范是一个bean的事实-网关最终在应用建议之前被初始化。
我这样更改了您的代码…
@Beanpublic IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) { return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice()) .channel("gatewayDownloadsOutputChannel");}...private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) { return Ftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload") .fileExistsMode(FileExistsMode.REPLACE) .localDirectoryExpression("''/tmp''") .autoCreateLocalDirectory(true);}
…而且有效。
通常最好不要直接处理Specs,而只是将它们内联到流定义中…
@Beanpublic IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) { return f -> f.handle(Ftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload") .fileExistsMode(FileExistsMode.REPLACE) .localDirectoryExpression("''/tmp''") .autoCreateLocalDirectory(true), getRetryAdvice()) .channel("gatewayDownloadsOutputChannel");}
006-spring cloud gateway-GatewayAutoConfiguration核心配置-GatewayProperties初始化加载、Route初始化加载
一、GatewayProperties
1.1、在GatewayAutoConfiguration中加载
在Spring-Cloud-Gateway初始化时,同时GatewayAutoConfiguration核心配置类会被初始化加载如下 :
NettyConfiguration 底层通信netty配置
GlobalFilter (AdaptCachedBodyGlobalFilter,RouteToRequestUrlFilter,ForwardRoutingFilter,ForwardPathFilter,WebsocketRoutingFilter,WeightCalculatorWebFilter等)
FilteringWebHandler
GatewayProperties
PrefixPathGatewayFilterFactory
RoutePredicateFactory
RouteDefinitionLocator
RouteLocator
RoutePredicateHandlerMapping 查找匹配到 Route并进行处理
GatewayWebfluxEndpoint 管理网关的 HTTP API
其中在GatewayAutoConfiguration配置加载中含初始化加载GatewayProperties实例的配置:
查看GatewayAutoConfiguration源码:
@Bean
public GatewayProperties gatewayProperties() {
return new GatewayProperties();
}
1.2、再次查看GatewayProperties源码:
@ConfigurationProperties("spring.cloud.gateway")
@Validated
public class GatewayProperties {
@NotNull
@Valid
private List<RouteDefinition> routes = new ArrayList();
private List<FilterDefinition> defaultFilters = new ArrayList();
private List<MediaType> streamingMediaTypes;
public GatewayProperties() {
this.streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON);
}
public List<RouteDefinition> getRoutes() {
return this.routes;
}
public void setRoutes(List<RouteDefinition> routes) {
this.routes = routes;
}
public List<FilterDefinition> getDefaultFilters() {
return this.defaultFilters;
}
public void setDefaultFilters(List<FilterDefinition> defaultFilters) {
this.defaultFilters = defaultFilters;
}
public List<MediaType> getStreamingMediaTypes() {
return this.streamingMediaTypes;
}
public void setStreamingMediaTypes(List<MediaType> streamingMediaTypes) {
this.streamingMediaTypes = streamingMediaTypes;
}
public String toString() {
return "GatewayProperties{routes=" + this.routes + ", defaultFilters=" + this.defaultFilters + ", streamingMediaTypes=" + this.streamingMediaTypes + ''}'';
}
}
以上会被默认加载并且读取配置信息,如下配置信息:
- spring.cloud.gateway.routes:网关路由定义配置,列表形式
- spring.cloud.gateway.default-filters: 网关默认过滤器定义配置,列表形式
- spring.cloud.gateway.streamingMediaTypes:网关网络媒体类型,列表形式
spring:
cloud:
gateway:
default-filters:
- PrefixPath=/httpbin
- AddResponseHeader=X-Response-Default-Foo, Default-Bar
routes:
- id: websocket_test
uri: ws://localhost:9000
order: 9000
predicates:
- Path=/echo
- id: default_path_to_httpbin
uri: ${test.uri}
order: 10000
predicates:
- Path=/**
注意:default-filters的配置PrefixPath=/httpbin字符串,可以查看FilterDefinition的构造函数,它其中构造函数包含接收一个text字符串解析字符传并创建实例信息。predicates的配置也是如此。
字符传格式:name=param1,param2,param3
public FilterDefinition(String text) {
int eqIdx = text.indexOf("=");
if (eqIdx <= 0) {
this.setName(text);
} else {
this.setName(text.substring(0, eqIdx));
String[] args = StringUtils.tokenizeToStringArray(text.substring(eqIdx + 1), ",");
for(int i = 0; i < args.length; ++i) {
this.args.put(NameUtils.generateName(i), args[i]);
}
}
}
二、Route初始化加载
2.1、GatewayAutoConfiguration加载RouteLocator
/**
* 创建一个根据RouteDefinition转换的路由定位器
*/
@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
List<GatewayFilterFactory> GatewayFilters,
List<RoutePredicateFactory> predicates,
RouteDefinitionLocator routeDefinitionLocator) {
return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, GatewayFilters, properties);
}
/**
* 创建一个缓存路由的路由定位器
* @param routeLocators
* @return
*/
@Bean
@Primary//在相同的bean中,优先使用用@Primary注解的bean.
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
//1.创建组合路由定位器,根据(容器)已有的路由定位器集合
//2.创建缓存功能的路由定位器
return new CachingRouteLocator(new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}
路由定位器的创建流程:
1、RouteDefinitionRouteLocator
2、CompositeRouteLocator
3、CachingRouteLocator
其中 RouteDefinitionRouteLocator 是获取路由的主要地方,CompositeRouteLocator,CachingRouteLocator对路由定位器做了附加功能的包装,最终使用的是CachingRouteLocator对外提供服务
2.2、查看RouteLocator源码:
/**
* 路由定位器,服务获取路由信息
* 可以通过 RouteDefinitionRouteLocator 获取 RouteDefinition ,并转换成 Route
*/
public interface RouteLocator {
/**
* 获取路由
*/
Flux<Route> getRoutes();
}
查看RouteLocator实现类
2.3、缓存功能实现→CachingRouteLocator
// 路由定位器的包装类,实现了路由的本地缓存功能
public class CachingRouteLocator implements RouteLocator {
//目标路由定位器
private final RouteLocator delegate;
/**
* 路由信息
* Flux 相当于一个 RxJava Observable,
* 能够发出 0~N 个数据项,然后(可选地)completing 或 erroring。处理多个数据项作为stream
*/
private final Flux<Route> routes;
// 本地缓存,用于缓存路由定位器获取的路由集合
private final Map<String, List> cache = new HashMap<>();
public CachingRouteLocator(RouteLocator delegate) {
this.delegate = delegate;
routes = CacheFlux.lookup(cache, "routes", Route.class)
.onCacheMissResume(() -> this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE));
}
@Override
public Flux<Route> getRoutes() {
return this.routes;
}
// 刷新缓存
public Flux<Route> refresh() {
this.cache.clear();
return this.routes;
}
@EventListener(RefreshRoutesEvent.class)
void handleRefresh() {
refresh();
}
}
1、路由信息的本地缓存,通过Map<String, List> cache 缓存路由到内存中;
2、此类通过@EventListener(RefreshRoutesEvent.class)监听RefreshRoutesEvent事件实现了对缓存的动态刷新;
注:路由动态刷新,使用GatewayControllerEndpoint发布刷新事件
@RestControllerEndpoint(id = "gateway")
public class GatewayControllerEndpoint implements ApplicationEventPublisherAware{
// 调用url= /gateway/refresh 刷新缓存中的路由信息
@PostMapping("/refresh")
public Mono<Void> refresh() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return Mono.empty();
}
}
2.4、组合功能实现→CompositeRouteLocator
//组合多个 RRouteLocator 的实现,为Route提供统一获取入口
public class CompositeRouteLocator implements RouteLocator {
/**
* 能够发出 0~N 个数据项(RouteLocator),然后(可选地)completing 或 erroring。处理多个数据项作为stream
*/
private final Flux<RouteLocator> delegates;
public CompositeRouteLocator(Flux<RouteLocator> delegates) {
this.delegates = delegates;
}
@Override
public Flux<Route> getRoutes() {
//this.delegates.flatMap((routeLocator)-> routeLocator.getRoutes());
return this.delegates.flatMap(RouteLocator::getRoutes);
}
}
此类将遍历传入的目录路由定位器集合,组合每个路由定位器获取到的路由信息
2.5、通过路由定义转换路由实现→RouteDefinitionRouteLocator


//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.springframework.cloud.gateway.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.config.GatewayProperties;
import org.springframework.cloud.gateway.event.FilterArgsEvent;
import org.springframework.cloud.gateway.event.PredicateArgsEvent;
import org.springframework.cloud.gateway.filter.FilterDefinition;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.OrderedGatewayFilter;
import org.springframework.cloud.gateway.filter.factory.GatewayFilterFactory;
import org.springframework.cloud.gateway.handler.AsyncPredicate;
import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
import org.springframework.cloud.gateway.handler.predicate.RoutePredicateFactory;
import org.springframework.cloud.gateway.route.Route.AsyncBuilder;
import org.springframework.cloud.gateway.support.ConfigurationUtils;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.validation.Validator;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
public class RouteDefinitionRouteLocator implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
protected final Log logger = LogFactory.getLog(this.getClass());
private final RouteDefinitionLocator routeDefinitionLocator;
private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap();
private final Map<String, GatewayFilterFactory> gatewayFilterFactories = new HashMap();
private final GatewayProperties gatewayProperties;
private final SpelExpressionParser parser = new SpelExpressionParser();
private BeanFactory beanFactory;
private ApplicationEventPublisher publisher;
@Autowired
private Validator validator;
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator, List<RoutePredicateFactory> predicates, List<GatewayFilterFactory> gatewayFilterFactories, GatewayProperties gatewayProperties) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.initFactories(predicates);
gatewayFilterFactories.forEach((factory) -> {
GatewayFilterFactory var10000 = (GatewayFilterFactory)this.gatewayFilterFactories.put(factory.name(), factory);
});
this.gatewayProperties = gatewayProperties;
}
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach((factory) -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key + " already exists, class: " + this.predicates.get(key) + ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (this.logger.isInfoEnabled()) {
this.logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}
public Flux<Route> getRoutes() {
return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute).map((route) -> {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
private Route convertToRoute(RouteDefinition routeDefinition) {
AsyncPredicate<ServerWebExchange> predicate = this.combinePredicates(routeDefinition);
List<GatewayFilter> gatewayFilters = this.getFilters(routeDefinition);
return ((AsyncBuilder)Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters)).build();
}
private List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = (List)filterDefinitions.stream().map((definition) -> {
GatewayFilterFactory factory = (GatewayFilterFactory)this.gatewayFilterFactories.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
} else {
Map<String, String> args = definition.getArgs();
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
Object configuration = factory.newConfig();
ConfigurationUtils.bind(configuration, properties, factory.shortcutFieldPrefix(), definition.getName(), this.validator);
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
}
}).collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList(filters.size());
for(int i = 0; i < filters.size(); ++i) {
GatewayFilter gatewayFilter = (GatewayFilter)filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
} else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList();
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(this.loadGatewayFilters("defaultFilters", this.gatewayProperties.getDefaultFilters()));
}
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(this.loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters()));
}
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
private AsyncPredicate<ServerWebExchange> combinePredicates(RouteDefinition routeDefinition) {
List<PredicateDefinition> predicates = routeDefinition.getPredicates();
AsyncPredicate<ServerWebExchange> predicate = this.lookup(routeDefinition, (PredicateDefinition)predicates.get(0));
AsyncPredicate found;
for(Iterator var4 = predicates.subList(1, predicates.size()).iterator(); var4.hasNext(); predicate = predicate.and(found)) {
PredicateDefinition andPredicate = (PredicateDefinition)var4.next();
found = this.lookup(routeDefinition, andPredicate);
}
return predicate;
}
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route, PredicateDefinition predicate) {
RoutePredicateFactory<Object> factory = (RoutePredicateFactory)this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name " + predicate.getName());
} else {
Map<String, String> args = predicate.getArgs();
if (this.logger.isDebugEnabled()) {
this.logger.debug("RouteDefinition " + route.getId() + " applying " + args + " to " + predicate.getName());
}
Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
Object config = factory.newConfig();
ConfigurationUtils.bind(config, properties, factory.shortcutFieldPrefix(), predicate.getName(), this.validator);
if (this.publisher != null) {
this.publisher.publishEvent(new PredicateArgsEvent(this, route.getId(), properties));
}
return factory.applyAsync(config);
}
}
}
此类的核心方法getRoutes通过传入的routeDefinitionLocator获取路由定位,并循环遍历路由定位依次转换成路由返回,
代码中可以看到getRoutes通过convertToRoute方法将路由定位转换成路由的
2.5.1、RouteDefinition转换:convertToRoute
// RouteDefinition 转换为对应的Route
private Route convertToRoute(RouteDefinition routeDefinition) {
//获取routeDefinition中的Predicate信息
Predicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
//获取routeDefinition中的GatewayFilter信息
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
//构建路由信息
return Route.builder(routeDefinition)
.predicate(predicate)
.replaceFilters(gatewayFilters)
.build();
}
convertToRoute方法功能作用
获取routeDefinition中的Predicate信息 (通过combinePredicates方法)
获取routeDefinition中的GatewayFilter信息(通过gatewayFilters方法)
构建路由信息
1、convertToRoute中combinePredicates获取routeDefinition中的Predicate信息如下:
// 返回组合的谓词
private Predicate<ServerWebExchange> combinePredicates(RouteDefinition routeDefinition) {
//获取RouteDefinition中的PredicateDefinition集合
List<PredicateDefinition> predicates = routeDefinition.getPredicates();
Predicate<ServerWebExchange> predicate = lookup(routeDefinition, predicates.get(0));
for (PredicateDefinition andPredicate : predicates.subList(1, predicates.size())) {
Predicate<ServerWebExchange> found = lookup(routeDefinition, andPredicate);
//流程4
//返回一个组合的谓词,表示该谓词与另一个谓词的短路逻辑AND
predicate = predicate.and(found);
}
return predicate;
}
/**
* 获取一个谓语定义(PredicateDefinition)转换的谓语
* @param route
* @param predicate
* @return
*/
@SuppressWarnings("unchecked")
private Predicate<ServerWebExchange> lookup(RouteDefinition route, PredicateDefinition predicate) {
//流程1
//流程1==获取谓语创建工厂
RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name " + predicate.getName());
}
//流程2
//获取参数
Map<String, String> args = predicate.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + route.getId() + " applying "
+ args + " to " + predicate.getName());
}
//组装参数
Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
//构建创建谓语的配置信息
Object config = factory.newConfig();
ConfigurationUtils.bind(config, properties,
factory.shortcutFieldPrefix(), predicate.getName(), validator);
if (this.publisher != null) {
this.publisher.publishEvent(new PredicateArgsEvent(this, route.getId(), properties));
}
//流程3
//通过谓语工厂构建谓语
return factory.apply(config);
}
获取Predicate流程:
- 根据PredicateDefinition name 获取 RoutePredicateFactory
- 根据PredicateDefinition args 组装 config信息
- 通过RoutePredicateFactory 根据config信息创建Predicate信息
- 多个Predicate 以短路逻辑AND组合
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
//校验gatewayProperties是否含义默认的过滤器集合
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
//加载全局配置的默认过滤器集合
filters.addAll(loadGatewayFilters("defaultFilters",
this.gatewayProperties.getDefaultFilters()));
}
if (!routeDefinition.getFilters().isEmpty()) {
//加载路由定义中的过滤器集合
filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters()));
}
//排序
AnnotationAwareOrderComparator.sort(filters);
return filters;
}
/**
* 加载过滤器,根据过滤器的定义加载
* @param id
* @param filterDefinitions
* @return
*/
@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
//遍历过滤器定义,将过滤器定义转换成对应的过滤器
List<GatewayFilter> filters = filterDefinitions.stream()
.map(definition -> {
//流程1 //通过过滤器定义名称获取过滤器创建工厂
GatewayFilterFactory factory = this.gatewayFilterFactories.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
}
//流程2
//获取参数
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName());
}
//根据args组装配置信息
Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory);
//构建过滤器创建配置信息
Object configuration = factory.newConfig();
ConfigurationUtils.bind(configuration, properties,
factory.shortcutFieldPrefix(), definition.getName(), validator);
//流程3
//通过过滤器工厂创建GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
//发布事件
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
})
.collect(Collectors.toList());
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
//包装过滤器使其所有过滤器继承Ordered属性,可进行排序
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
- getFilters 方法 同时加载 全局配置 gatewayProperties与routeDefinition配置下的所有过滤器定义filterDefinitions
- loadGatewayFilters 负责将filterDefinition转化成对应的GatewayFilter
转化流程如下
- 根据filterDefinition name 获取 GatewayFilterFactory
- 根据filterDefinition args 组装 config信息
- 通过GatewayFilterFactory 根据config信息创建PGatewayFilter信息
502 Bad Gateway - Registered endpoint failed to handle the request
2017-08-07
Greeting from China! I have one question regarding error message “” in SAP cloud platform where I would like to consume an OData service from SAP cloud for customer ( C4C system).
I have a nodejs application where I hard code an end point to an OData service in C4C system. The request is sent to C4C and then I display the result in nodejs console.
The end point for C4C OData service:
https://qxl.dev.sapbydesign.com/sap/byd/odata/v1/opportunity/OpportunityCollection(‘00163E06551B1EE79E9E69D7F8FBCDCF’)
When I test this application locally, it works perfect.
var sURL = "https://qxl-cust233.dev.sapbydesign.com/sap/byd/odata/v1/opportunity/OpportunityCollection(''00163E06551B1EE79E9E69D7F8FBCDCF'')";
var username = ''WANGJER''
var password = ''Saptest1''
var options = {
url: sURL,
auth: {
user: username,
password: password
},
headers: {
''Accept'': ''application/json'',
''Content-Type'': ''application/json''
}
};
request(options, function (error, response, body) {
// console.log(''body:'', body);
var opportunity = JSON.parse(body);
var line1 = "Opportunity name: " + opportunity.d.results.Name;
console.log(line1);
var line2 = "Responsible: " + opportunity.d.results.MainEmployeeResponsiblePartyName;
console.log(line2);
var line3 = "Opportunity id: " + opportunity.d.results.ObjectID;
console.log(line3);
var responseText = line1 + "\n" + line2 + "\n" + line3;
res.send(responseText);
res.send("another line"); // this line will cause error - repeated send response is not allowed
});
After I upload it to Cloudfoundry@SCP, it fails to work: I set breakpoint in C4C system and found that the request sent from SCP NEVER reaches C4C system. After some times, SCP received error message “502 Bad Gateway - Registered endpoint failed to handle the request”.
要获取更多Jerry的原创文章,请关注公众号"汪子熙":
本文分享 CSDN - 汪子熙。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
502 Bad Gateway Registered endpoint failed to handle the request
502 Bad Gateway: Registered endpoint failed to handle the request.
访问一个部署在 SAP 云平台上的 nodejs 应用时遇到上述错误消息:
查看日志,发现是 nodejs 应用连接 SAP 云平台上 Redis 数据库发生了错误造成的:
8 Mar 2019, 17:15:45-[APP/PROC/WEB/0]
Redis has meet with some trouble: ReplyError: Ready check failed:NOAUTH Authentication required.
OK
要获取更多 Jerry 的原创文章,请关注公众号 "汪子熙":
本文同步分享在 博客 “汪子熙”(CSDN)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。
An application is attempting to perform an action that requires privileges. Authentication is requir
An application is attempting to perform an action that requires privileges. Authentication is required to perform this action.
resolve : sudo command...
我们今天的关于Spring Integration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用的分享就到这里,谢谢您的阅读,如果想了解更多关于006-spring cloud gateway-GatewayAutoConfiguration核心配置-GatewayProperties初始化加载、Route初始化加载、502 Bad Gateway - Registered endpoint failed to handle the request、502 Bad Gateway Registered endpoint failed to handle the request、An application is attempting to perform an action that requires privileges. Authentication is requir的相关信息,可以在本站进行搜索。
本文标签: