## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importsysfromtypingimportDict,Optional,TYPE_CHECKINGfrompyspark.sql.columnimportColumnfrompyspark.sql.utilsimportto_scala_mapifTYPE_CHECKING:frompyspark.sql.dataframeimportDataFrame__all__=["MergeIntoWriter"]classMergeIntoWriter:""" `MergeIntoWriter` provides methods to define and execute merge actions based on specified conditions. .. versionadded: 4.0.0 """def__init__(self,df:"DataFrame",table:str,condition:Column):self._spark=df.sparkSessionfrompyspark.sql.classic.columnimport_to_java_columnself._jwriter=df._jdf.mergeInto(table,_to_java_column(condition))
[docs]defwhenMatched(self,condition:Optional[Column]=None)->"MergeIntoWriter.WhenMatched":""" Initialize a `WhenMatched` action with a condition. This `WhenMatched` action will be executed when a source row matches a target table row based on the merge condition and the specified `condition` is satisfied. This `WhenMatched` can be followed by one of the following merge actions: - `updateAll`: Update all the matched target table rows with source dataset rows. - `update(Dict)`: Update all the matched target table rows while changing only a subset of columns based on the provided assignment. - `delete`: Delete all target rows that have a match in the source table. """returnself.WhenMatched(self,condition)
[docs]defwhenNotMatched(self,condition:Optional[Column]=None)->"MergeIntoWriter.WhenNotMatched":""" Initialize a `WhenNotMatched` action with a condition. This `WhenNotMatched` action will be executed when a source row does not match any target row based on the merge condition and the specified `condition` is satisfied. This `WhenNotMatched` can be followed by one of the following merge actions: - `insertAll`: Insert all rows from the source that are not already in the target table. - `insert(Dict)`: Insert all rows from the source that are not already in the target table, with the specified columns based on the provided assignment. """returnself.WhenNotMatched(self,condition)
[docs]defwhenNotMatchedBySource(self,condition:Optional[Column]=None)->"MergeIntoWriter.WhenNotMatchedBySource":""" Initialize a `WhenNotMatchedBySource` action with a condition. This `WhenNotMatchedBySource` action will be executed when a target row does not match any rows in the source table based on the merge condition and the specified `condition` is satisfied. This `WhenNotMatchedBySource` can be followed by one of the following merge actions: - `updateAll`: Update all the not matched target table rows with source dataset rows. - `update(Dict)`: Update all the not matched target table rows while changing only the specified columns based on the provided assignment. - `delete`: Delete all target rows that have no matches in the source table. """returnself.WhenNotMatchedBySource(self,condition)
[docs]defwithSchemaEvolution(self)->"MergeIntoWriter":""" Enable automatic schema evolution for this merge operation. """self._jwriter=self._jwriter.withSchemaEvolution()returnself
[docs]defmerge(self)->None:""" Execute the merge operation. """self._jwriter.merge()
classWhenMatched:""" A class for defining actions to be taken when matching rows in a DataFrame during a merge operation."""def__init__(self,writer:"MergeIntoWriter",condition:Optional[Column]):self.writer=writerifconditionisNone:self.when_matched=writer._jwriter.whenMatched()else:frompyspark.sql.classic.columnimport_to_java_columnself.when_matched=writer._jwriter.whenMatched(_to_java_column(condition))defupdateAll(self)->"MergeIntoWriter":""" Specifies an action to update all matched rows in the DataFrame. """self.writer._jwriter=self.when_matched.updateAll()returnself.writerdefupdate(self,assignments:Dict[str,Column])->"MergeIntoWriter":""" Specifies an action to update matched rows in the DataFrame with the provided column assignments. """jvm=self.writer._spark._jvmfrompyspark.sql.classic.columnimport_to_java_columnjmap=to_scala_map(jvm,{k:_to_java_column(v)fork,vinassignments.items()})self.writer._jwriter=self.when_matched.update(jmap)returnself.writerdefdelete(self)->"MergeIntoWriter":""" Specifies an action to delete matched rows from the DataFrame. """self.writer._jwriter=self.when_matched.delete()returnself.writerclassWhenNotMatched:""" A class for defining actions to be taken when no matching rows are found in a DataFrame during a merge operation."""def__init__(self,writer:"MergeIntoWriter",condition:Optional[Column]):self.writer=writerifconditionisNone:self.when_not_matched=writer._jwriter.whenNotMatched()else:frompyspark.sql.classic.columnimport_to_java_columnself.when_not_matched=writer._jwriter.whenNotMatched(_to_java_column(condition))definsertAll(self)->"MergeIntoWriter":""" Specifies an action to insert all non-matched rows into the DataFrame. """self.writer._jwriter=self.when_not_matched.insertAll()returnself.writerdefinsert(self,assignments:Dict[str,Column])->"MergeIntoWriter":""" Specifies an action to insert non-matched rows into the DataFrame with the provided column assignments. """jvm=self.writer._spark._jvmfrompyspark.sql.classic.columnimport_to_java_columnjmap=to_scala_map(jvm,{k:_to_java_column(v)fork,vinassignments.items()})self.writer._jwriter=self.when_not_matched.insert(jmap)returnself.writerclassWhenNotMatchedBySource:""" A class for defining actions to be performed when there is no match by source during a merge operation in a MergeIntoWriter. """def__init__(self,writer:"MergeIntoWriter",condition:Optional[Column]):self.writer=writerifconditionisNone:self.when_not_matched_by_source=writer._jwriter.whenNotMatchedBySource()else:frompyspark.sql.classic.columnimport_to_java_columnself.when_not_matched_by_source=writer._jwriter.whenNotMatchedBySource(_to_java_column(condition))defupdateAll(self)->"MergeIntoWriter":""" Specifies an action to update all non-matched rows in the target DataFrame when not matched by the source. """self.writer._jwriter=self.when_not_matched_by_source.updateAll()returnself.writerdefupdate(self,assignments:Dict[str,Column])->"MergeIntoWriter":""" Specifies an action to update non-matched rows in the target DataFrame with the provided column assignments when not matched by the source. """jvm=self.writer._spark._jvmfrompyspark.sql.classic.columnimport_to_java_columnjmap=to_scala_map(jvm,{k:_to_java_column(v)fork,vinassignments.items()})self.writer._jwriter=self.when_not_matched_by_source.update(jmap)returnself.writerdefdelete(self)->"MergeIntoWriter":""" Specifies an action to delete matched rows from the DataFrame. """self.writer._jwriter=self.when_not_matched_by_source.delete()returnself.writerdef_test()->None:importdoctestimportosimportpy4jfrompyspark.core.contextimportSparkContextfrompyspark.sqlimportSparkSessionimportpyspark.sql.mergeos.chdir(os.environ["SPARK_HOME"])globs=pyspark.sql.merge.__dict__.copy()sc=SparkContext("local[4]","PythonTest")try:spark=SparkSession._getActiveSessionOrCreate()exceptpy4j.protocol.Py4JError:spark=SparkSession(sc)globs["spark"]=spark(failure_count,test_count)=doctest.testmod(pyspark.sql.merge,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE|doctest.REPORT_NDIFF,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()