001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 025import ca.uhn.fhir.batch2.model.JobInstance; 026import ca.uhn.fhir.batch2.model.StatusEnum; 027import ca.uhn.fhir.batch2.model.WorkChunk; 028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 032import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 033import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 034import ca.uhn.fhir.interceptor.api.HookParams; 035import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 036import ca.uhn.fhir.interceptor.api.Pointcut; 037import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 038import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; 039import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 040import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 041import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 042import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 043import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView; 044import ca.uhn.fhir.model.api.PagingIterator; 045import ca.uhn.fhir.rest.api.server.RequestDetails; 046import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 047import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 048import ca.uhn.fhir.util.Logs; 049import com.fasterxml.jackson.core.JsonParser; 050import com.fasterxml.jackson.databind.JsonNode; 051import com.fasterxml.jackson.databind.ObjectMapper; 052import com.fasterxml.jackson.databind.node.ObjectNode; 053import jakarta.annotation.Nonnull; 054import jakarta.annotation.Nullable; 055import jakarta.persistence.EntityManager; 056import jakarta.persistence.LockModeType; 057import jakarta.persistence.Query; 058import org.apache.commons.collections4.ListUtils; 059import org.apache.commons.lang3.Validate; 060import org.slf4j.Logger; 061import org.springframework.data.domain.Page; 062import org.springframework.data.domain.PageImpl; 063import org.springframework.data.domain.PageRequest; 064import org.springframework.data.domain.Pageable; 065import org.springframework.data.domain.Sort; 066import org.springframework.transaction.annotation.Propagation; 067import org.springframework.transaction.annotation.Transactional; 068import org.springframework.transaction.support.TransactionSynchronizationManager; 069 070import java.time.Instant; 071import java.util.Collections; 072import java.util.Date; 073import java.util.HashSet; 074import java.util.Iterator; 075import java.util.List; 076import java.util.Objects; 077import java.util.Optional; 078import java.util.Set; 079import java.util.UUID; 080import java.util.function.Consumer; 081import java.util.stream.Collectors; 082import java.util.stream.Stream; 083 084import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 085import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 086import static org.apache.commons.lang3.StringUtils.isBlank; 087 088public class JpaJobPersistenceImpl implements IJobPersistence { 089 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 090 public static final String CREATE_TIME = "myCreateTime"; 091 092 private final IBatch2JobInstanceRepository myJobInstanceRepository; 093 private final IBatch2WorkChunkRepository myWorkChunkRepository; 094 private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo; 095 private final EntityManager myEntityManager; 096 private final IHapiTransactionService myTransactionService; 097 private final IInterceptorBroadcaster myInterceptorBroadcaster; 098 099 /** 100 * Constructor 101 */ 102 public JpaJobPersistenceImpl( 103 IBatch2JobInstanceRepository theJobInstanceRepository, 104 IBatch2WorkChunkRepository theWorkChunkRepository, 105 IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo, 106 IHapiTransactionService theTransactionService, 107 EntityManager theEntityManager, 108 IInterceptorBroadcaster theInterceptorBroadcaster) { 109 Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository"); 110 Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository"); 111 myJobInstanceRepository = theJobInstanceRepository; 112 myWorkChunkRepository = theWorkChunkRepository; 113 myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo; 114 myTransactionService = theTransactionService; 115 myEntityManager = theEntityManager; 116 myInterceptorBroadcaster = theInterceptorBroadcaster; 117 } 118 119 @Override 120 @Transactional(propagation = Propagation.REQUIRED) 121 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 122 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 123 entity.setId(UUID.randomUUID().toString()); 124 entity.setSequence(theBatchWorkChunk.sequence); 125 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 126 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 127 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 128 entity.setInstanceId(theBatchWorkChunk.instanceId); 129 entity.setSerializedData(theBatchWorkChunk.serializedData); 130 entity.setCreateTime(new Date()); 131 entity.setStartTime(new Date()); 132 entity.setStatus(getOnCreateStatus(theBatchWorkChunk)); 133 134 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 135 ourLog.trace( 136 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 137 myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity)); 138 139 return entity.getId(); 140 } 141 142 /** 143 * Gets the initial onCreate state for the given workchunk. 144 * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all 145 * chunks in the previous step are COMPLETED. 146 * Non gated job chunks start in READY 147 */ 148 private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) { 149 if (theBatchWorkChunk.isGatedExecution) { 150 return WorkChunkStatusEnum.GATE_WAITING; 151 } else { 152 return WorkChunkStatusEnum.READY; 153 } 154 } 155 156 @Override 157 @Transactional(propagation = Propagation.REQUIRED) 158 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 159 // take a lock on the chunk id to ensure that the maintenance run isn't doing anything. 160 Batch2WorkChunkEntity chunkLock = 161 myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE); 162 // remove from the current state to avoid stale data. 163 myEntityManager.detach(chunkLock); 164 165 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 166 // But how does re-run happen if k8s kills a processor mid run? 167 List<WorkChunkStatusEnum> priorStates = 168 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 169 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 170 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 171 172 if (rowsModified == 0) { 173 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 174 return Optional.empty(); 175 } else { 176 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 177 return chunk.map(this::toChunk); 178 } 179 } 180 181 @Override 182 @Transactional(propagation = Propagation.REQUIRED) 183 public String storeNewInstance(JobInstance theInstance) { 184 Validate.isTrue(isBlank(theInstance.getInstanceId())); 185 186 invokePreStorageBatchHooks(theInstance); 187 188 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 189 entity.setId(UUID.randomUUID().toString()); 190 entity.setDefinitionId(theInstance.getJobDefinitionId()); 191 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 192 entity.setStatus(theInstance.getStatus()); 193 entity.setParams(theInstance.getParameters()); 194 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 195 entity.setFastTracking(theInstance.isFastTracking()); 196 entity.setCreateTime(new Date()); 197 entity.setStartTime(new Date()); 198 entity.setReport(theInstance.getReport()); 199 entity.setTriggeringUsername(theInstance.getTriggeringUsername()); 200 entity.setTriggeringClientId(theInstance.getTriggeringClientId()); 201 202 entity = myJobInstanceRepository.save(entity); 203 return entity.getId(); 204 } 205 206 @Override 207 @Transactional(propagation = Propagation.REQUIRES_NEW) 208 public List<JobInstance> fetchInstances( 209 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 210 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 211 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 212 } 213 214 @Override 215 @Transactional(propagation = Propagation.REQUIRES_NEW) 216 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 217 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 218 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 219 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 220 theJobDefinitionId, theRequestedStatuses, pageRequest)); 221 } 222 223 @Override 224 @Transactional(propagation = Propagation.REQUIRES_NEW) 225 public List<JobInstance> fetchInstancesByJobDefinitionId( 226 String theJobDefinitionId, int thePageSize, int thePageIndex) { 227 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 228 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 229 } 230 231 @Override 232 @Transactional(propagation = Propagation.REQUIRES_NEW) 233 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 234 PageRequest pageRequest = 235 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 236 237 String jobStatus = theRequest.getJobStatus(); 238 if (Objects.equals(jobStatus, "")) { 239 Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest); 240 return pageOfEntities.map(this::toInstance); 241 } 242 243 StatusEnum status = StatusEnum.valueOf(jobStatus); 244 List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest)); 245 Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status); 246 return new PageImpl<>(jobs, pageRequest, jobsOfStatus); 247 } 248 249 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 250 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 251 } 252 253 @Override 254 @Nonnull 255 public Optional<JobInstance> fetchInstance(String theInstanceId) { 256 return myTransactionService 257 .withSystemRequestOnDefaultPartition() 258 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 259 } 260 261 @Override 262 @Transactional(propagation = Propagation.REQUIRES_NEW) 263 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 264 String definitionId = theRequest.getJobDefinition(); 265 String params = theRequest.getParameters(); 266 Set<StatusEnum> statuses = theRequest.getStatuses(); 267 268 Pageable pageable = PageRequest.of(thePage, theBatchSize); 269 270 List<Batch2JobInstanceEntity> instanceEntities; 271 272 if (statuses != null && !statuses.isEmpty()) { 273 if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) { 274 if (originalRequestUrlTruncation(params) != null) { 275 params = originalRequestUrlTruncation(params); 276 } 277 } 278 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 279 definitionId, params, statuses, pageable); 280 } else { 281 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 282 } 283 return toInstanceList(instanceEntities); 284 } 285 286 private String originalRequestUrlTruncation(String theParams) { 287 try { 288 ObjectMapper mapper = new ObjectMapper(); 289 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 290 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 291 JsonNode rootNode = mapper.readTree(theParams); 292 String originalUrl = "originalRequestUrl"; 293 294 if (rootNode instanceof ObjectNode) { 295 ObjectNode objectNode = (ObjectNode) rootNode; 296 297 if (objectNode.has(originalUrl)) { 298 String url = objectNode.get(originalUrl).asText(); 299 if (url.contains("?")) { 300 objectNode.put(originalUrl, url.split("\\?")[0]); 301 } 302 } 303 return mapper.writeValueAsString(objectNode); 304 } 305 } catch (Exception e) { 306 ourLog.info("Error Truncating Original Request Url", e); 307 } 308 return null; 309 } 310 311 @Override 312 @Transactional(propagation = Propagation.REQUIRES_NEW) 313 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 314 // default sort is myCreateTime Asc 315 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 316 return myTransactionService 317 .withSystemRequestOnDefaultPartition() 318 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 319 .map(this::toInstance) 320 .collect(Collectors.toList())); 321 } 322 323 @Override 324 public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) { 325 int updated = myWorkChunkRepository.updateChunkStatus( 326 theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED); 327 theCallback.accept(updated); 328 } 329 330 @Override 331 public int updatePollWaitingChunksForJobIfReady(String theInstanceId) { 332 return myWorkChunkRepository.updateWorkChunksForPollWaiting( 333 theInstanceId, 334 Date.from(Instant.now()), 335 Set.of(WorkChunkStatusEnum.POLL_WAITING), 336 WorkChunkStatusEnum.READY); 337 } 338 339 @Override 340 @Transactional(propagation = Propagation.REQUIRES_NEW) 341 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 342 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 343 return myTransactionService 344 .withSystemRequestOnDefaultPartition() 345 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 346 .map(this::toInstance) 347 .collect(Collectors.toList())); 348 } 349 350 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 351 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 352 } 353 354 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 355 return JobInstanceUtil.fromEntityToInstance(theEntity); 356 } 357 358 @Override 359 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 360 String chunkId = theParameters.getChunkId(); 361 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 362 363 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { 364 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 365 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 366 Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId); 367 368 Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed " 369 + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) " 370 + "where myId = :chunkId and myErrorCount > :maxCount"); 371 query.setParameter("chunkId", chunkId); 372 query.setParameter("failed", WorkChunkStatusEnum.FAILED); 373 query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT); 374 int failChangeCount = query.executeUpdate(); 375 376 if (failChangeCount > 0) { 377 return WorkChunkStatusEnum.FAILED; 378 } else { 379 return WorkChunkStatusEnum.ERRORED; 380 } 381 }); 382 } 383 384 @Override 385 public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) { 386 int updated = myWorkChunkRepository.updateWorkChunkNextPollTime( 387 theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline); 388 389 if (updated != 1) { 390 ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated); 391 } 392 } 393 394 @Override 395 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 396 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 397 String errorMessage = truncateErrorMessage(theErrorMessage); 398 myTransactionService 399 .withSystemRequestOnDefaultPartition() 400 .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 401 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED)); 402 } 403 404 @Override 405 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 406 myTransactionService 407 .withSystemRequestOnDefaultPartition() 408 .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 409 theEvent.getChunkId(), 410 new Date(), 411 theEvent.getRecordsProcessed(), 412 theEvent.getRecoveredErrorCount(), 413 WorkChunkStatusEnum.COMPLETED, 414 theEvent.getRecoveredWarningMessage())); 415 } 416 417 @Nullable 418 private static String truncateErrorMessage(String theErrorMessage) { 419 String errorMessage; 420 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 421 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 422 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 423 } else { 424 errorMessage = theErrorMessage; 425 } 426 return errorMessage; 427 } 428 429 @Override 430 public void markWorkChunksWithStatusAndWipeData( 431 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 432 assert TransactionSynchronizationManager.isActualTransactionActive(); 433 434 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 435 String errorMessage = truncateErrorMessage(theErrorMessage); 436 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 437 for (List<String> idList : listOfListOfIds) { 438 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 439 idList, new Date(), theStatus, errorMessage); 440 } 441 } 442 443 @Override 444 public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep( 445 String theInstanceId, String theCurrentStepId) { 446 if (getRunningJob(theInstanceId) == null) { 447 return Collections.unmodifiableSet(new HashSet<>()); 448 } 449 return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 450 } 451 452 private Batch2JobInstanceEntity getRunningJob(String theInstanceId) { 453 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 454 if (instance.isEmpty()) { 455 return null; 456 } 457 if (instance.get().getStatus().isEnded()) { 458 return null; 459 } 460 return instance.get(); 461 } 462 463 private void fetchChunks( 464 String theInstanceId, 465 boolean theIncludeData, 466 int thePageSize, 467 int thePageIndex, 468 Consumer<WorkChunk> theConsumer) { 469 myTransactionService 470 .withSystemRequestOnDefaultPartition() 471 .withPropagation(Propagation.REQUIRES_NEW) 472 .execute(() -> { 473 List<Batch2WorkChunkEntity> chunks; 474 if (theIncludeData) { 475 chunks = myWorkChunkRepository.fetchChunks( 476 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 477 } else { 478 chunks = myWorkChunkRepository.fetchChunksNoData( 479 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 480 } 481 for (Batch2WorkChunkEntity chunk : chunks) { 482 theConsumer.accept(toChunk(chunk)); 483 } 484 }); 485 } 486 487 @Override 488 public void updateInstanceUpdateTime(String theInstanceId) { 489 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 490 } 491 492 @Override 493 public WorkChunk createWorkChunk(WorkChunk theWorkChunk) { 494 if (theWorkChunk.getId() == null) { 495 theWorkChunk.setId(UUID.randomUUID().toString()); 496 } 497 return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk))); 498 } 499 500 /** 501 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 502 */ 503 @Override 504 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 505 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 506 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 507 } 508 509 @Override 510 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 511 return myWorkChunkRepository 512 .fetchChunksForStep(theInstanceId, theStepId) 513 .map(this::toChunk); 514 } 515 516 @Override 517 public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates( 518 Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) { 519 Page<Batch2WorkChunkMetadataView> page = 520 myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates); 521 522 return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); 523 } 524 525 @Override 526 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 527 Batch2JobInstanceEntity instanceEntity = 528 myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE); 529 if (null == instanceEntity) { 530 ourLog.error("No instance found with Id {}", theInstanceId); 531 return false; 532 } 533 // convert to JobInstance for public api 534 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 535 536 // run the modification callback 537 boolean wasModified = theModifier.doUpdate(jobInstance); 538 539 if (wasModified) { 540 // copy fields back for flush. 541 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 542 } 543 544 return wasModified; 545 } 546 547 @Override 548 @Transactional(propagation = Propagation.REQUIRES_NEW) 549 public void deleteInstanceAndChunks(String theInstanceId) { 550 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 551 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 552 myJobInstanceRepository.deleteById(theInstanceId); 553 } 554 555 @Override 556 @Transactional(propagation = Propagation.REQUIRES_NEW) 557 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 558 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 559 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 560 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 561 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 562 } 563 564 @Override 565 public boolean markInstanceAsStatusWhenStatusIn( 566 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 567 int recordsChanged = 568 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 569 ourLog.debug( 570 "Update job {} to status {} if in status {}: {}", 571 theInstanceId, 572 theStatusEnum, 573 thePriorStates, 574 recordsChanged > 0); 575 return recordsChanged > 0; 576 } 577 578 @Override 579 @Transactional(propagation = Propagation.REQUIRES_NEW) 580 public JobOperationResultJson cancelInstance(String theInstanceId) { 581 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 582 String operationString = "Cancel job instance " + theInstanceId; 583 584 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 585 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 586 String messagePrefix = "Job instance <" + theInstanceId + ">"; 587 if (recordsChanged > 0) { 588 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 589 } else { 590 Optional<JobInstance> instance = fetchInstance(theInstanceId); 591 if (instance.isPresent()) { 592 return JobOperationResultJson.newFailure( 593 operationString, messagePrefix + " was already cancelled. Nothing to do."); 594 } else { 595 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 596 } 597 } 598 } 599 600 private void invokePreStorageBatchHooks(JobInstance theJobInstance) { 601 if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) { 602 HookParams params = new HookParams() 603 .add(JobInstance.class, theJobInstance) 604 .add(RequestDetails.class, new SystemRequestDetails()); 605 606 myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); 607 } 608 } 609 610 @Override 611 @Transactional(propagation = Propagation.REQUIRES_NEW) 612 public boolean advanceJobStepAndUpdateChunkStatus( 613 String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) { 614 boolean changed = updateInstance(theJobInstanceId, instance -> { 615 if (instance.getCurrentGatedStepId().equals(theNextStepId)) { 616 // someone else beat us here. No changes 617 return false; 618 } 619 ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId); 620 instance.setCurrentGatedStepId(theNextStepId); 621 return true; 622 }); 623 624 if (changed) { 625 ourLog.debug( 626 "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.", 627 theJobInstanceId, 628 theNextStepId); 629 WorkChunkStatusEnum nextStep = 630 theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY; 631 // when we reach here, the current step id is equal to theNextStepId 632 // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the 633 // workers. 634 // In order to keep them compatible, turn QUEUED chunks into READY, too. 635 // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0. 636 int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus( 637 theJobInstanceId, 638 theNextStepId, 639 List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED), 640 nextStep); 641 ourLog.debug( 642 "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.", 643 numChanged, 644 theJobInstanceId, 645 theNextStepId); 646 } 647 648 return changed; 649 } 650}