001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.export.svc; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.context.RuntimeSearchParam; 025import ca.uhn.fhir.fhirpath.IFhirPath; 026import ca.uhn.fhir.i18n.Msg; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 031import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; 032import ca.uhn.fhir.jpa.api.svc.IIdHelperService; 033import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; 034import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; 035import ca.uhn.fhir.jpa.dao.IResultIterator; 036import ca.uhn.fhir.jpa.dao.ISearchBuilder; 037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 038import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; 039import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters; 042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 043import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 044import ca.uhn.fhir.jpa.util.QueryChunker; 045import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; 046import ca.uhn.fhir.mdm.dao.IMdmLinkDao; 047import ca.uhn.fhir.mdm.model.MdmPidTuple; 048import ca.uhn.fhir.model.api.Include; 049import ca.uhn.fhir.model.primitive.IdDt; 050import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 051import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; 052import ca.uhn.fhir.rest.param.HasOrListParam; 053import ca.uhn.fhir.rest.param.HasParam; 054import ca.uhn.fhir.rest.param.ReferenceOrListParam; 055import ca.uhn.fhir.rest.param.ReferenceParam; 056import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 057import ca.uhn.fhir.util.ExtensionUtil; 058import ca.uhn.fhir.util.HapiExtensions; 059import ca.uhn.fhir.util.Logs; 060import ca.uhn.fhir.util.SearchParameterUtil; 061import jakarta.annotation.Nonnull; 062import jakarta.persistence.EntityManager; 063import org.apache.commons.lang3.StringUtils; 064import org.hl7.fhir.instance.model.api.IBaseExtension; 065import org.hl7.fhir.instance.model.api.IBaseReference; 066import org.hl7.fhir.instance.model.api.IBaseResource; 067import org.hl7.fhir.instance.model.api.IIdType; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070import org.springframework.beans.factory.annotation.Autowired; 071 072import java.io.IOException; 073import java.util.ArrayList; 074import java.util.HashMap; 075import java.util.HashSet; 076import java.util.Iterator; 077import java.util.LinkedHashSet; 078import java.util.List; 079import java.util.Map; 080import java.util.Optional; 081import java.util.Set; 082import java.util.stream.Collectors; 083 084import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS; 085import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; 086 087public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> { 088 private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class); 089 090 public static final int QUERY_CHUNK_SIZE = 100; 091 public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES = 092 List.of("Practitioner", "Organization"); 093 094 @Autowired 095 private FhirContext myContext; 096 097 @Autowired 098 private BulkExportHelperService myBulkExportHelperSvc; 099 100 @Autowired 101 private JpaStorageSettings myStorageSettings; 102 103 @Autowired 104 private DaoRegistry myDaoRegistry; 105 106 @Autowired 107 protected SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 108 109 @Autowired 110 private IIdHelperService<JpaPid> myIdHelperService; 111 112 @SuppressWarnings("rawtypes") 113 @Autowired 114 protected IMdmLinkDao myMdmLinkDao; 115 116 @Autowired 117 private MdmExpansionCacheSvc myMdmExpansionCacheSvc; 118 119 @Autowired 120 private EntityManager myEntityManager; 121 122 @Autowired 123 private IHapiTransactionService myHapiTransactionService; 124 125 @Autowired 126 private ISearchParamRegistry mySearchParamRegistry; 127 128 private IFhirPath myFhirPath; 129 130 @Override 131 public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) { 132 return myHapiTransactionService 133 .withSystemRequest() 134 .withRequestPartitionId(theParams.getPartitionIdOrAllPartitions()) 135 .readOnly() 136 .execute(() -> { 137 String resourceType = theParams.getResourceType(); 138 String jobId = theParams.getInstanceId(); 139 String chunkId = theParams.getChunkId(); 140 RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType); 141 142 LinkedHashSet<JpaPid> pids; 143 if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.PATIENT) { 144 pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def); 145 } else if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.GROUP) { 146 pids = getPidsForGroupStyleExport(theParams, resourceType, def); 147 } else { 148 pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def); 149 } 150 151 ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size()); 152 return pids.iterator(); 153 }); 154 } 155 156 @SuppressWarnings("unchecked") 157 private LinkedHashSet<JpaPid> getPidsForPatientStyleExport( 158 ExportPIDIteratorParameters theParams, 159 String resourceType, 160 String theJobId, 161 String theChunkId, 162 RuntimeResourceDefinition def) 163 throws IOException { 164 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 165 // Patient 166 if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) { 167 String errorMessage = 168 "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export"; 169 ourLog.error(errorMessage); 170 throw new IllegalStateException(Msg.code(797) + errorMessage); 171 } 172 173 Set<String> patientSearchParams = 174 SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType()); 175 176 for (String patientSearchParam : patientSearchParams) { 177 List<SearchParameterMap> maps = 178 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false); 179 for (SearchParameterMap map : maps) { 180 // Ensure users did not monkey with the patient compartment search parameter. 181 validateSearchParametersForPatient(map, theParams); 182 183 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 184 185 filterBySpecificPatient(theParams, resourceType, patientSearchParam, map); 186 187 SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId); 188 189 Logs.getBatchTroubleshootingLog() 190 .debug( 191 "Executing query for bulk export job[{}] chunk[{}]: {}", 192 theJobId, 193 theChunkId, 194 map.toNormalizedQueryString(myContext)); 195 196 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 197 map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPartitions())) { 198 int pidCount = 0; 199 while (resultIterator.hasNext()) { 200 if (pidCount % 10000 == 0) { 201 Logs.getBatchTroubleshootingLog() 202 .debug( 203 "Bulk export job[{}] chunk[{}] has loaded {} pids", 204 theJobId, 205 theChunkId, 206 pidCount); 207 } 208 pidCount++; 209 pids.add(resultIterator.next()); 210 } 211 } 212 } 213 } 214 return pids; 215 } 216 217 private static void filterBySpecificPatient( 218 ExportPIDIteratorParameters theParams, 219 String resourceType, 220 String patientSearchParam, 221 SearchParameterMap map) { 222 if (resourceType.equalsIgnoreCase("Patient")) { 223 if (theParams.getPatientIds() != null) { 224 ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams); 225 map.add(PARAM_ID, referenceOrListParam); 226 } 227 } else { 228 if (theParams.getPatientIds() != null) { 229 ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams); 230 map.add(patientSearchParam, referenceOrListParam); 231 } else { 232 map.add(patientSearchParam, new ReferenceParam().setMissing(false)); 233 } 234 } 235 } 236 237 @Nonnull 238 private static ReferenceOrListParam getReferenceOrListParam(ExportPIDIteratorParameters theParams) { 239 ReferenceOrListParam referenceOrListParam = new ReferenceOrListParam(); 240 for (String patientId : theParams.getPatientIds()) { 241 referenceOrListParam.addOr(new ReferenceParam(patientId)); 242 } 243 return referenceOrListParam; 244 } 245 246 @SuppressWarnings("unchecked") 247 private LinkedHashSet<JpaPid> getPidsForSystemStyleExport( 248 ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef) 249 throws IOException { 250 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 251 // System 252 List<SearchParameterMap> maps = 253 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 254 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 255 256 for (SearchParameterMap map : maps) { 257 Logs.getBatchTroubleshootingLog() 258 .debug( 259 "Executing query for bulk export job[{}] chunk[{}]: {}", 260 theJobId, 261 theChunkId, 262 map.toNormalizedQueryString(myContext)); 263 264 // requires a transaction 265 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 266 map, new SearchRuntimeDetails(null, theJobId), null, theParams.getPartitionIdOrAllPartitions())) { 267 int pidCount = 0; 268 while (resultIterator.hasNext()) { 269 if (pidCount % 10000 == 0) { 270 Logs.getBatchTroubleshootingLog() 271 .debug( 272 "Bulk export job[{}] chunk[{}] has loaded {} pids", 273 theJobId, 274 theChunkId, 275 pidCount); 276 } 277 pidCount++; 278 pids.add(resultIterator.next()); 279 } 280 } 281 } 282 return pids; 283 } 284 285 private LinkedHashSet<JpaPid> getPidsForGroupStyleExport( 286 ExportPIDIteratorParameters theParams, String theResourceType, RuntimeResourceDefinition theDef) 287 throws IOException { 288 LinkedHashSet<JpaPid> pids; 289 290 if (theResourceType.equalsIgnoreCase("Patient")) { 291 ourLog.info("Expanding Patients of a Group Bulk Export."); 292 pids = getExpandedPatientList(theParams); 293 ourLog.info("Obtained {} PIDs", pids.size()); 294 } else if (theResourceType.equalsIgnoreCase("Group")) { 295 pids = getSingletonGroupList(theParams); 296 } else { 297 pids = getRelatedResourceTypePids(theParams, theDef); 298 } 299 return pids; 300 } 301 302 private LinkedHashSet<JpaPid> getRelatedResourceTypePids( 303 ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) throws IOException { 304 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 305 // Check if the patient compartment search parameter is active to enable export of this resource 306 RuntimeSearchParam activeSearchParam = 307 getActivePatientSearchParamForCurrentResourceType(theParams.getResourceType()); 308 if (activeSearchParam != null) { 309 // expand the group pid -> list of patients in that group (list of patient pids) 310 Set<JpaPid> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams); 311 assert !expandedMemberResourceIds.isEmpty(); 312 Logs.getBatchTroubleshootingLog() 313 .debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds); 314 315 // for each patient pid -> 316 // search for the target resources, with their correct patient references, chunked. 317 // The results will be jammed into myReadPids 318 QueryChunker<JpaPid> queryChunker = new QueryChunker<>(); 319 queryChunker.chunk(expandedMemberResourceIds, QUERY_CHUNK_SIZE, (idChunk) -> { 320 try { 321 queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef); 322 } catch (IOException ex) { 323 // we will never see this; 324 // SearchBuilder#QueryIterator does not (nor can ever) throw 325 // an IOException... but Java requires the check, 326 // so we'll put a log here (just in the off chance) 327 ourLog.error("Couldn't close query iterator ", ex); 328 throw new RuntimeException(Msg.code(2346) + "Couldn't close query iterator", ex); 329 } 330 }); 331 } else { 332 ourLog.warn("No active patient compartment search parameter(s) for resource type " 333 + theParams.getResourceType()); 334 } 335 return pids; 336 } 337 338 private LinkedHashSet<JpaPid> getSingletonGroupList(ExportPIDIteratorParameters theParams) { 339 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 340 IBaseResource group = myDaoRegistry 341 .getResourceDao("Group") 342 .read(new IdDt(theParams.getGroupId()), new SystemRequestDetails().setRequestPartitionId(partitionId)); 343 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 344 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 345 pids.add(pidOrNull); 346 return pids; 347 } 348 349 /** 350 * Get a ISearchBuilder for the given resource type. 351 */ 352 protected ISearchBuilder<JpaPid> getSearchBuilderForResourceType(String theResourceType) { 353 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType); 354 RuntimeResourceDefinition def = myContext.getResourceDefinition(theResourceType); 355 Class<? extends IBaseResource> typeClass = def.getImplementingClass(); 356 return mySearchBuilderFactory.newSearchBuilder(dao, theResourceType, typeClass); 357 } 358 359 protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) { 360 RuntimeSearchParam searchParam = null; 361 Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = 362 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResourceType); 363 if (onlyPatientSearchParamForResourceType.isPresent()) { 364 searchParam = onlyPatientSearchParamForResourceType.get(); 365 } 366 return searchParam; 367 } 368 369 @Override 370 public void expandMdmResources(List<IBaseResource> theResources) { 371 for (IBaseResource resource : theResources) { 372 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(resource.fhirType())) { 373 annotateBackwardsReferences(resource); 374 } 375 } 376 } 377 378 /** 379 * For Patient 380 **/ 381 private RuntimeSearchParam validateSearchParametersForPatient( 382 SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 383 RuntimeSearchParam runtimeSearchParam = 384 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 385 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 386 throw new IllegalArgumentException(Msg.code(796) 387 + String.format( 388 "Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 389 runtimeSearchParam.getName())); 390 } 391 return runtimeSearchParam; 392 } 393 394 /** 395 * for group exports 396 **/ 397 private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, String theResourceType) { 398 // we only validate for certain types 399 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) { 400 RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(theResourceType); 401 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 402 throw new IllegalArgumentException(Msg.code(792) 403 + String.format( 404 "Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 405 runtimeSearchParam.getName())); 406 } 407 } 408 } 409 410 /** 411 * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members, 412 * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. 413 */ 414 @SuppressWarnings("unchecked") 415 private LinkedHashSet<JpaPid> getExpandedPatientList(ExportPIDIteratorParameters theParameters) throws IOException { 416 List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, true); 417 List<IIdType> ids = 418 members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); 419 ourLog.info("While extracting patients from a group, we found {} patients.", ids.size()); 420 ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", "))); 421 422 List<JpaPid> pidsOrThrowException = members; 423 LinkedHashSet<JpaPid> patientPidsToExport = new LinkedHashSet<>(pidsOrThrowException); 424 425 if (theParameters.isExpandMdm()) { 426 RequestPartitionId partitionId = theParameters.getPartitionIdOrAllPartitions(); 427 SystemRequestDetails srd = new SystemRequestDetails().setRequestPartitionId(partitionId); 428 IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParameters.getGroupId()), srd); 429 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 430 List<MdmPidTuple<JpaPid>> goldenPidSourcePidTuple = 431 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 432 goldenPidSourcePidTuple.forEach(tuple -> { 433 patientPidsToExport.add(tuple.getGoldenPid()); 434 patientPidsToExport.add(tuple.getSourcePid()); 435 }); 436 populateMdmResourceCache(goldenPidSourcePidTuple); 437 } 438 return patientPidsToExport; 439 } 440 441 /** 442 * Given the parameters, find all members' patient references in the group with the typeFilter applied. 443 * 444 * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] 445 */ 446 @SuppressWarnings("unchecked") 447 private List<JpaPid> getMembersFromGroupWithFilter( 448 ExportPIDIteratorParameters theParameters, boolean theConsiderSince) throws IOException { 449 RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); 450 List<JpaPid> resPids = new ArrayList<>(); 451 452 List<SearchParameterMap> maps = 453 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParameters, theConsiderSince); 454 455 maps.forEach(map -> addMembershipToGroupClause(map, theParameters.getGroupId())); 456 457 for (SearchParameterMap map : maps) { 458 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient"); 459 ourLog.debug( 460 "Searching for members of group {} with job instance {} with map {}", 461 theParameters.getGroupId(), 462 theParameters.getInstanceId(), 463 map); 464 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 465 map, 466 new SearchRuntimeDetails(null, theParameters.getInstanceId()), 467 null, 468 theParameters.getPartitionIdOrAllPartitions())) { 469 470 while (resultIterator.hasNext()) { 471 resPids.add(resultIterator.next()); 472 } 473 } 474 } 475 return resPids; 476 } 477 478 /** 479 * This method takes an {@link SearchParameterMap} and adds a clause to it that will filter the search results to only 480 * return members of the defined group. 481 * 482 * @param theMap the map to add the clause to. 483 * @param theGroupId the group ID to filter by. 484 */ 485 private void addMembershipToGroupClause(SearchParameterMap theMap, String theGroupId) { 486 HasOrListParam hasOrListParam = new HasOrListParam(); 487 hasOrListParam.addOr(new HasParam("Group", "member", "_id", theGroupId)); 488 theMap.add(PARAM_HAS, hasOrListParam); 489 } 490 491 /** 492 * @param thePidTuples 493 */ 494 @SuppressWarnings({"unchecked", "rawtypes"}) 495 private void populateMdmResourceCache(List<MdmPidTuple<JpaPid>> thePidTuples) { 496 if (myMdmExpansionCacheSvc.hasBeenPopulated()) { 497 return; 498 } 499 // First, convert this zipped set of tuples to a map of 500 // { 501 // patient/gold-1 -> [patient/1, patient/2] 502 // patient/gold-2 -> [patient/3, patient/4] 503 // } 504 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 505 extract(thePidTuples, goldenResourceToSourcePidMap); 506 507 // Next, lets convert it to an inverted index for fast lookup 508 // { 509 // patient/1 -> patient/gold-1 510 // patient/2 -> patient/gold-1 511 // patient/3 -> patient/gold-2 512 // patient/4 -> patient/gold-2 513 // } 514 Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); 515 goldenResourceToSourcePidMap.forEach((key, value) -> { 516 String goldenResourceId = 517 myIdHelperService.translatePidIdToForcedIdWithCache(key).orElse(key.toString()); 518 PersistentIdToForcedIdMap pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); 519 520 Set<String> sourceResourceIds = pidsToForcedIds.getResolvedResourceIds(); 521 522 sourceResourceIds.forEach( 523 sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); 524 }); 525 526 // Now that we have built our cached expansion, store it. 527 myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); 528 } 529 530 private void extract( 531 List<MdmPidTuple<JpaPid>> theGoldenPidTargetPidTuples, 532 Map<JpaPid, Set<JpaPid>> theGoldenResourceToSourcePidMap) { 533 for (MdmPidTuple<JpaPid> goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) { 534 JpaPid goldenPid = goldenPidTargetPidTuple.getGoldenPid(); 535 JpaPid sourcePid = goldenPidTargetPidTuple.getSourcePid(); 536 theGoldenResourceToSourcePidMap 537 .computeIfAbsent(goldenPid, key -> new HashSet<>()) 538 .add(sourcePid); 539 } 540 } 541 542 // gets all the resources related to each patient provided in the list of thePatientPids 543 @SuppressWarnings("unchecked") 544 private void queryResourceTypeWithReferencesToPatients( 545 Set<JpaPid> theReadPids, 546 List<JpaPid> thePatientPids, 547 ExportPIDIteratorParameters theParams, 548 RuntimeResourceDefinition theDef) 549 throws IOException { 550 551 // Convert Resource Persistent IDs to actual client IDs. 552 Set<JpaPid> pidSet = new HashSet<>(thePatientPids); 553 Set<String> patientIds = myIdHelperService.translatePidsToFhirResourceIds(pidSet); 554 555 // Build SP map 556 // First, inject the _typeFilters and _since from the export job 557 List<SearchParameterMap> expandedSpMaps = 558 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 559 for (SearchParameterMap expandedSpMap : expandedSpMaps) { 560 561 // Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we 562 // need to manually set that. 563 validateSearchParametersForGroup(expandedSpMap, theParams.getResourceType()); 564 565 // Fetch and cache a search builder for this resource type 566 // filter by ResourceType 567 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 568 569 // Now, further filter the query with patient references defined by the chunk of IDs we have. 570 // filter by PatientIds 571 if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) { 572 filterSearchByHasParam(patientIds, expandedSpMap, theParams); 573 } else { 574 filterSearchByResourceIds(patientIds, expandedSpMap, theParams); 575 } 576 577 // Execute query and all found pids to our local iterator. 578 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 579 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 580 expandedSpMap, new SearchRuntimeDetails(null, theParams.getInstanceId()), null, partitionId)) { 581 while (resultIterator.hasNext()) { 582 theReadPids.add(resultIterator.next()); 583 } 584 } 585 586 // Construct our Includes filter 587 // We use this to recursively fetch resources of interest 588 // (but should only request those the user has requested/can see) 589 Set<Include> includes = new HashSet<>(); 590 for (String resourceType : theParams.getRequestedResourceTypes()) { 591 includes.add(new Include(resourceType + ":*", true)); 592 } 593 594 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 595 SearchBuilderLoadIncludesParameters<JpaPid> loadIncludesParameters = 596 new SearchBuilderLoadIncludesParameters<>(); 597 loadIncludesParameters.setFhirContext(myContext); 598 loadIncludesParameters.setMatches(theReadPids); 599 loadIncludesParameters.setEntityManager(myEntityManager); 600 loadIncludesParameters.setRequestDetails(requestDetails); 601 loadIncludesParameters.setIncludeFilters(includes); 602 loadIncludesParameters.setReverseMode(false); 603 loadIncludesParameters.setLastUpdated(expandedSpMap.getLastUpdated()); 604 loadIncludesParameters.setSearchIdOrDescription(theParams.getInstanceId()); 605 loadIncludesParameters.setDesiredResourceTypes(theParams.getRequestedResourceTypes()); 606 Set<JpaPid> includeIds = searchBuilder.loadIncludes(loadIncludesParameters); 607 608 // gets rid of the Patient duplicates 609 theReadPids.addAll(includeIds.stream() 610 .filter((id) -> !id.getResourceType().equals("Patient")) 611 .collect(Collectors.toSet())); 612 } 613 } 614 615 private RuntimeSearchParam getActivePatientSearchParamForCurrentResourceType(String theResourceType) { 616 String activeSearchParamName = ""; 617 String resourceToCheck = theResourceType; 618 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) { 619 activeSearchParamName = 620 getPatientSearchParamForCurrentResourceType(theResourceType).getName(); 621 } else if ("Practitioner".equalsIgnoreCase(theResourceType)) { 622 resourceToCheck = "Patient"; 623 activeSearchParamName = "general-practitioner"; 624 } else if ("Organization".equalsIgnoreCase(theResourceType)) { 625 resourceToCheck = "Patient"; 626 activeSearchParamName = "organization"; 627 } 628 return mySearchParamRegistry.getActiveSearchParam(resourceToCheck, activeSearchParamName); 629 } 630 631 /** 632 * Must not be called for resources types listed in PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES 633 * 634 * @param idChunk 635 * @param expandedSpMap 636 * @param theParams 637 */ 638 private void filterSearchByResourceIds( 639 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 640 ReferenceOrListParam orList = new ReferenceOrListParam(); 641 idChunk.forEach(id -> orList.add(new ReferenceParam(id))); 642 RuntimeSearchParam patientSearchParamForCurrentResourceType = 643 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 644 expandedSpMap.add(patientSearchParamForCurrentResourceType.getName(), orList); 645 } 646 647 /** 648 * @param idChunk 649 * @param expandedSpMap 650 */ 651 private void filterSearchByHasParam( 652 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 653 HasOrListParam hasOrListParam = new HasOrListParam(); 654 idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id, theParams.getResourceType()))); 655 expandedSpMap.add("_has", hasOrListParam); 656 } 657 658 private HasParam buildHasParam(String theResourceId, String theResourceType) { 659 if ("Practitioner".equalsIgnoreCase(theResourceType)) { 660 return new HasParam("Patient", "general-practitioner", "_id", theResourceId); 661 } else if ("Organization".equalsIgnoreCase(theResourceType)) { 662 return new HasParam("Patient", "organization", "_id", theResourceId); 663 } else { 664 throw new IllegalArgumentException( 665 Msg.code(2077) + " We can't handle forward references onto type " + theResourceType); 666 } 667 } 668 669 /** 670 * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients. 671 * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched 672 * patients. 673 * 674 * @return a Set of Strings representing the resource IDs of all members of a group. 675 */ 676 private Set<JpaPid> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) throws IOException { 677 Set<JpaPid> expandedIds = new HashSet<>(); 678 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 679 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 680 IBaseResource group = 681 myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails); 682 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 683 684 // Attempt to perform MDM Expansion of membership 685 if (theParams.isExpandMdm()) { 686 expandedIds.addAll(performMembershipExpansionViaMdmTable(pidOrNull)); 687 } 688 689 // Now manually add the members of the group (its possible even with mdm expansion that some members dont have 690 // MDM matches, 691 // so would be otherwise skipped 692 List<JpaPid> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams, false); 693 ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter); 694 expandedIds.addAll(membersFromGroupWithFilter); 695 696 return expandedIds; 697 } 698 699 @SuppressWarnings({"rawtypes", "unchecked"}) 700 private Set<JpaPid> performMembershipExpansionViaMdmTable(JpaPid pidOrNull) { 701 List<MdmPidTuple<JpaPid>> goldenPidTargetPidTuples = 702 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 703 // Now lets translate these pids into resource IDs 704 Set<JpaPid> uniquePids = new HashSet<>(); 705 goldenPidTargetPidTuples.forEach(tuple -> { 706 uniquePids.add(tuple.getGoldenPid()); 707 uniquePids.add(tuple.getSourcePid()); 708 }); 709 PersistentIdToForcedIdMap pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); 710 711 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 712 extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap); 713 populateMdmResourceCache(goldenPidTargetPidTuples); 714 715 return uniquePids; 716 } 717 718 /* Mdm Expansion */ 719 720 private RuntimeSearchParam getRuntimeSearchParam(IBaseResource theResource) { 721 Optional<RuntimeSearchParam> oPatientSearchParam = 722 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResource.fhirType()); 723 if (!oPatientSearchParam.isPresent()) { 724 String errorMessage = String.format( 725 "[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", 726 theResource.fhirType()); 727 throw new IllegalArgumentException(Msg.code(2242) + errorMessage); 728 } else { 729 return oPatientSearchParam.get(); 730 } 731 } 732 733 private void annotateBackwardsReferences(IBaseResource iBaseResource) { 734 Optional<String> patientReference = getPatientReference(iBaseResource); 735 if (patientReference.isPresent()) { 736 addGoldenResourceExtension(iBaseResource, patientReference.get()); 737 } else { 738 ourLog.error( 739 "Failed to find the patient reference information for resource {}. This is a bug, " 740 + "as all resources which can be exported via Group Bulk Export must reference a patient.", 741 iBaseResource); 742 } 743 } 744 745 private Optional<String> getPatientReference(IBaseResource iBaseResource) { 746 String fhirPath; 747 748 RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource); 749 fhirPath = getPatientFhirPath(runtimeSearchParam); 750 751 if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) { 752 return Optional.of(iBaseResource.getIdElement().getIdPart()); 753 } else { 754 Optional<IBaseReference> optionalReference = 755 getFhirParser().evaluateFirst(iBaseResource, fhirPath, IBaseReference.class); 756 if (optionalReference.isPresent()) { 757 return optionalReference.map(theIBaseReference -> 758 theIBaseReference.getReferenceElement().getIdPart()); 759 } else { 760 return Optional.empty(); 761 } 762 } 763 } 764 765 private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { 766 String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); 767 IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension( 768 iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); 769 if (!StringUtils.isBlank(goldenResourceId)) { 770 ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); 771 } 772 } 773 774 private String prefixPatient(String theResourceId) { 775 return "Patient/" + theResourceId; 776 } 777 778 private IFhirPath getFhirParser() { 779 if (myFhirPath == null) { 780 myFhirPath = myContext.newFhirPath(); 781 } 782 return myFhirPath; 783 } 784 785 private String getPatientFhirPath(RuntimeSearchParam theRuntimeParam) { 786 String path = theRuntimeParam.getPath(); 787 // GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like 788 // Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play 789 // nicely with 790 // our FHIRPath evaluator. 791 if (path.contains(".where")) { 792 path = path.substring(0, path.indexOf(".where")); 793 } 794 return path; 795 } 796}