序
本文主要研究一下servicecomb-saga的alpha-server
启动alpha-server
alpha-server是servicecomb-saga的分布式事务协调中心,采用spring boot开发,可以直接从jar包启动,需要依赖mysql或pg数据库,同时初始化数据。启动命令如下:
java -jar -Dspring.profiles.active=prd \
alpha-server-0.2.0-exec.jar \
--spring.datasource.url=jdbc:postgresql://localhost:5432/postgres \
--spring.datasource.username=postgres \
--spring.datasource.password=postgres
AlphaConfig
@EntityScan(basePackages = "org.apache.servicecomb.saga.alpha")
@Configuration
class AlphaConfig {
private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Value("${alpha.compensation.retry.delay:3000}")
private int delay;
@Bean
Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
return new ConcurrentHashMap<>();
}
@Bean
OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks));
}
@Bean
TxEventRepository springTxEventRepository(TxEventEnvelopeRepository eventRepo) {
return new SpringTxEventRepository(eventRepo);
}
@Bean
CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) {
return new SpringCommandRepository(eventRepo, commandRepository);
}
@Bean
TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) {
return new SpringTxTimeoutRepository(timeoutRepo);
}
@Bean
ScheduledExecutorService compensationScheduler() {
return scheduler;
}
@Bean
GrpcServerConfig grpcServerConfig() { return new GrpcServerConfig(); }
@Bean
TxConsistentService txConsistentService(
@Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
GrpcServerConfig serverConfig,
ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
CommandRepository commandRepository,
TxTimeoutRepository timeoutRepository,
OmegaCallback omegaCallback,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
new EventScanner(scheduler,
eventRepository, commandRepository, timeoutRepository,
omegaCallback, eventPollingInterval).run();
TxConsistentService consistentService = new TxConsistentService(eventRepository);
ServerStartable startable = buildGrpc(serverConfig, consistentService, omegaCallbacks);
new Thread(startable::start).start();
return consistentService;
}
private ServerStartable buildGrpc(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
return new GrpcStartable(serverConfig,
new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
}
@PostConstruct
void init() {
new PendingTaskRunner(pendingCompensations, delay).run();
}
@PreDestroy
void shutdown() {
scheduler.shutdownNow();
}
}
- 这里主要是启动grpc server
- respository部分有SpringCommandRepository、SpringTxEventRepository、SpringTxTimeoutRepository
小结
- 从整个alpha工程结构来看,相对比较粗糙一点,config、repository等类都没有按包名归类
- 另外整个工程没有做成spring-boot-starter,不方便使用jar包嵌入进行扩展,有待改进
doc
- saga-spring-demo
本文暂时没有评论,来添加一个吧(●'◡'●)